Skip to content

Commit 7a322fc

Browse files
authored
Merge pull request #21 from happy-game/replication
ci: 为逻辑复制添加示例和测试
2 parents 2e1e4c3 + b2cc522 commit 7a322fc

File tree

6 files changed

+1022
-1
lines changed

6 files changed

+1022
-1
lines changed

.github/workflows/ci.yml

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,20 @@ jobs:
6060
SHA256_TEST_GAUSSUSER: sha256_test
6161
SHA256_TEST_GAUSSPASSWORD: test4@scram
6262
DB_TYPE: opengauss
63+
LOGICAL_REPLICATION_TEST: '1'
6364
steps:
6465
- name: Show OS
6566
run: |
6667
uname -a
6768
- name: Wait for GaussDB to be ready
6869
run: |
69-
timeout 60 bash -c 'until pg_isready -h localhost -p 5432; do sleep 2; done'
70+
# Using gs_ctl status (native OpenGauss check) instead of pg_isready
71+
CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}")
72+
timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || {
73+
echo "Timeout waiting for GaussDB to start"
74+
docker logs $CONTAINER_ID
75+
exit 1
76+
}
7077
- name: Setup SHA256 authentication
7178
run: |
7279
# Wait for database to be fully started
@@ -82,6 +89,53 @@ jobs:
8289
sleep 5
8390
8491
PGPASSWORD=openGauss@123 psql -h localhost -U ci_user -d ci_db_test -c "CREATE ROLE sha256_test login password 'test4@scram';"
92+
- name: Setup logical replication
93+
if: env.LOGICAL_REPLICATION_TEST == '1'
94+
run: |
95+
# Get container ID
96+
CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}")
97+
98+
# Set wal_level to logical
99+
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'wal_level = logical'"
100+
101+
# Set max_replication_slots
102+
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_replication_slots = 10'"
103+
104+
# Set max_wal_senders
105+
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_wal_senders = 10'"
106+
107+
# Add replication connection rule to pg_hba.conf
108+
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -h 'host replication all 0.0.0.0/0 md5'"
109+
110+
# Restart container to apply logical replication settings
111+
docker restart $CONTAINER_ID
112+
113+
# Wait for database to be ready again
114+
sleep 10
115+
timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || {
116+
echo "Timeout waiting for GaussDB to restart"
117+
docker logs $CONTAINER_ID
118+
exit 1
119+
}
120+
121+
sleep 5
122+
123+
# Verify configuration
124+
echo "Verifying logical replication configuration..."
125+
WAL_LEVEL=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW wal_level;\"" | tr -d ' \n')
126+
MAX_REPL_SLOTS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_replication_slots;\"" | tr -d ' \n')
127+
MAX_WAL_SENDERS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_wal_senders;\"" | tr -d ' \n')
128+
129+
echo " wal_level: ${WAL_LEVEL}"
130+
echo " max_replication_slots: ${MAX_REPL_SLOTS}"
131+
echo " max_wal_senders: ${MAX_WAL_SENDERS}"
132+
133+
if [ "$WAL_LEVEL" != "logical" ]; then
134+
echo "Warning: wal_level is not set to 'logical'"
135+
exit 1
136+
fi
137+
138+
echo "Logical Replication Setup Complete!"
85139
- uses: actions/checkout@v4
86140
with:
87141
persist-credentials: false
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import { Client, LogicalReplicationService, MppdbDecodingPlugin } from 'gaussdb-node'
2+
3+
const slotName = 'demo_replication_slot'
4+
const tableName = 'demo_replication_table'
5+
6+
// Connection config from environment variables or defaults
7+
const config = {
8+
host: process.env.GAUSSHOST,
9+
port: parseInt(process.env.GAUSSPORT),
10+
database: process.env.GAUSSDATABASE,
11+
user: process.env.GAUSSUSER,
12+
password: process.env.GAUSSPASSWORD,
13+
}
14+
15+
async function checkSlotExists(client) {
16+
const res = await client.query(
17+
`SELECT 1 FROM pg_replication_slots WHERE slot_name = $1`,
18+
[slotName]
19+
)
20+
return res.rowCount > 0
21+
}
22+
23+
async function createSlot(client) {
24+
await client.query(
25+
`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`
26+
)
27+
console.log(`slot "${slotName}" created`)
28+
}
29+
30+
async function dropSlot(client) {
31+
await client.query(`SELECT pg_drop_replication_slot('${slotName}')`)
32+
console.log(`slot "${slotName}" dropped`)
33+
}
34+
35+
async function createTable(client) {
36+
await client.query(`
37+
CREATE TABLE IF NOT EXISTS ${tableName} (
38+
id SERIAL PRIMARY KEY,
39+
name TEXT,
40+
value INT
41+
)
42+
`)
43+
console.log(`table "${tableName}" created`)
44+
}
45+
46+
async function dropTable(client) {
47+
await client.query(`DROP TABLE IF EXISTS ${tableName}`)
48+
console.log(`table "${tableName}" dropped`)
49+
}
50+
51+
async function performDML(client) {
52+
console.log('performing DML operations...')
53+
54+
await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100])
55+
console.log(' INSERT alice')
56+
57+
await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200])
58+
console.log(' INSERT bob')
59+
60+
await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice'])
61+
console.log(' UPDATE alice')
62+
63+
await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob'])
64+
console.log(' DELETE bob')
65+
66+
console.log('DML operations completed')
67+
}
68+
69+
async function main() {
70+
let clientB = null
71+
let service = null
72+
73+
try {
74+
clientB = new Client(config)
75+
await clientB.connect()
76+
console.log('client B connected')
77+
78+
// setup: create table and slot if needed
79+
await createTable(clientB)
80+
81+
const slotExists = await checkSlotExists(clientB)
82+
if (!slotExists) {
83+
await createSlot(clientB)
84+
} else {
85+
console.log(`slot "${slotName}" already exists`)
86+
}
87+
88+
// start replication service (client A uses config, not Client instance)
89+
service = new LogicalReplicationService(config, {
90+
acknowledge: { auto: true, timeoutSeconds: 10 },
91+
})
92+
93+
const plugin = new MppdbDecodingPlugin({
94+
includeXids: false,
95+
skipEmptyXacts: true,
96+
})
97+
98+
const receivedMessages = []
99+
100+
service.on('start', () => {
101+
console.log('replication started')
102+
})
103+
104+
service.on('data', (lsn, msg) => {
105+
console.log('[data]', lsn, msg)
106+
receivedMessages.push(msg)
107+
})
108+
109+
service.on('error', (err) => {
110+
console.error('[error]', err)
111+
})
112+
113+
// start replication in background
114+
service.subscribe(plugin, slotName)
115+
116+
// wait for replication to start
117+
await new Promise((resolve) => service.once('start', resolve))
118+
119+
// perform DML operations (client B)
120+
await performDML(clientB)
121+
122+
// wait a bit for replication to catch up
123+
await new Promise((resolve) => setTimeout(resolve, 2000))
124+
125+
// stop replication first
126+
await service.stop()
127+
service = null
128+
console.log('replication stopped')
129+
130+
console.log(`received ${receivedMessages.length} messages`)
131+
132+
// cleanup while clientB is still connected
133+
await dropTable(clientB)
134+
await dropSlot(clientB)
135+
} catch (err) {
136+
console.error('error:', err)
137+
} finally {
138+
// stop service if still running
139+
if (service) {
140+
try {
141+
await service.stop()
142+
} catch (err) {
143+
// ignore
144+
}
145+
}
146+
147+
// cleanup with a new connection if needed
148+
if (clientB) {
149+
try {
150+
await clientB.end()
151+
} catch (err) {
152+
// ignore
153+
}
154+
}
155+
156+
// ensure cleanup with fresh connection
157+
const cleanupClient = new Client(config)
158+
try {
159+
await cleanupClient.connect()
160+
await cleanupClient.query(`DROP TABLE IF EXISTS ${tableName}`)
161+
await cleanupClient.query(`SELECT pg_drop_replication_slot('${slotName}')`).catch(() => {})
162+
} catch (err) {
163+
// ignore cleanup errors
164+
} finally {
165+
await cleanupClient.end().catch(() => {})
166+
}
167+
168+
console.log('cleanup done')
169+
}
170+
}
171+
172+
main().catch((err) => {
173+
console.error(err)
174+
process.exit(1)
175+
})

0 commit comments

Comments
 (0)