Skip to content

Commit 881e732

Browse files
author
suika
committed
Adds snapshot filtering for binlog replication
Enables filtering of rows during the initial snapshot phase of binlog replication, based on a configurable SQL WHERE clause. This allows for partial snapshots, replicating only a subset of data based on specified criteria, which is particularly useful for large tables or scenarios where only recent data is needed. The commit also includes tests to verify the functionality of snapshot filtering, including handling of CDC changes and multiple bucket filters. Only for source: Mysql and PostgreSQL storage
1 parent 0019417 commit 881e732

File tree

8 files changed

+572
-4
lines changed

8 files changed

+572
-4
lines changed

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,17 @@ export class BinLogStream {
311311
// TODO count rows and log progress at certain batch sizes
312312

313313
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
314-
const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`);
315-
const stream = query.stream();
314+
let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`;
315+
316+
// Apply snapshot filter if it exists. This allows us to do partial snapshots,
317+
// for example for large tables where we only want to snapshot recent data.
318+
if (table.snapshotFilter) {
319+
query += ` WHERE ${table.snapshotFilter}`;
320+
this.logger.info(`Applying snapshot filter: ${table.snapshotFilter}`);
321+
}
322+
323+
const queryStream = connection.query(query);
324+
const stream = queryStream.stream();
316325

317326
let columns: Map<string, ColumnDescriptor> | undefined = undefined;
318327
stream.on('fields', (fields: mysql.FieldPacket[]) => {

modules/module-mysql/test/src/BinLogStream.test.ts

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,4 +401,208 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) {
401401
]);
402402
}
403403
});
404+
405+
test('Snapshot filter - replicate only filtered rows', async () => {
406+
await using context = await BinlogStreamTestContext.open(factory);
407+
const { connectionManager } = context;
408+
await context.updateSyncRules(`
409+
bucket_definitions:
410+
active_users:
411+
data:
412+
- SELECT id, name, status FROM "users"
413+
source_tables:
414+
users:
415+
snapshot_filter: "status = 'active'"`);
416+
417+
await connectionManager.query(
418+
`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))`
419+
);
420+
421+
// Insert rows before snapshot
422+
const activeId = uuid();
423+
const inactiveId = uuid();
424+
await connectionManager.query(
425+
`INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')`
426+
);
427+
await connectionManager.query(
428+
`INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')`
429+
);
430+
431+
await context.replicateSnapshot();
432+
433+
const data = await context.getBucketData('active_users[]');
434+
435+
// Should only have the active user, not the inactive one
436+
expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Active User', status: 'active' })]);
437+
expect(data.length).toBe(1);
438+
});
439+
440+
test('Snapshot filter - ORs multiple bucket filters', async () => {
441+
await using context = await BinlogStreamTestContext.open(factory);
442+
const { connectionManager } = context;
443+
await context.updateSyncRules(`
444+
bucket_definitions:
445+
active_users:
446+
data:
447+
- SELECT id, name, status FROM "users" WHERE status = 'active'
448+
source_tables:
449+
users:
450+
snapshot_filter: "status = 'active'"
451+
452+
admin_users:
453+
data:
454+
- SELECT id, name, is_admin FROM "users" WHERE is_admin = true
455+
source_tables:
456+
users:
457+
snapshot_filter: "is_admin = true"`);
458+
459+
await connectionManager.query(
460+
`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), is_admin BOOLEAN)`
461+
);
462+
463+
// Insert test data
464+
const activeUserId = uuid();
465+
const adminUserId = uuid();
466+
const regularUserId = uuid();
467+
468+
await connectionManager.query(
469+
`INSERT INTO users(id, name, status, is_admin) VALUES('${activeUserId}', 'Active User', 'active', false)`
470+
);
471+
await connectionManager.query(
472+
`INSERT INTO users(id, name, status, is_admin) VALUES('${adminUserId}', 'Admin User', 'inactive', true)`
473+
);
474+
await connectionManager.query(
475+
`INSERT INTO users(id, name, status, is_admin) VALUES('${regularUserId}', 'Regular User', 'inactive', false)`
476+
);
477+
478+
await context.replicateSnapshot();
479+
480+
const activeData = await context.getBucketData('active_users[]');
481+
const adminData = await context.getBucketData('admin_users[]');
482+
483+
// Active bucket should have the active user
484+
expect(activeData).toMatchObject([
485+
putOp('users', { id: activeUserId, name: 'Active User', status: 'active', is_admin: 0n })
486+
]);
487+
488+
// Admin bucket should have the admin user
489+
expect(adminData).toMatchObject([
490+
putOp('users', { id: adminUserId, name: 'Admin User', status: 'inactive', is_admin: 1n })
491+
]);
492+
493+
// Regular user should not be in either bucket (filtered out by snapshot filter)
494+
});
495+
496+
test('Snapshot filter - CDC changes only affect filtered rows', async () => {
497+
await using context = await BinlogStreamTestContext.open(factory);
498+
const { connectionManager } = context;
499+
await context.updateSyncRules(`
500+
bucket_definitions:
501+
active_users:
502+
data:
503+
- SELECT id, name, status FROM "users" WHERE status = 'active'
504+
source_tables:
505+
users:
506+
snapshot_filter: "status = 'active'"`);
507+
508+
await connectionManager.query(
509+
`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))`
510+
);
511+
512+
// Insert an active user before snapshot
513+
const activeId = uuid();
514+
await connectionManager.query(
515+
`INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')`
516+
);
517+
518+
await context.replicateSnapshot();
519+
await context.startStreaming();
520+
521+
// Insert an inactive user - should not appear in bucket
522+
const inactiveId = uuid();
523+
await connectionManager.query(
524+
`INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')`
525+
);
526+
527+
// Update the active user - should appear in bucket
528+
await connectionManager.query(`UPDATE users SET name = 'Updated Active' WHERE id = '${activeId}'`);
529+
530+
const data = await context.getBucketData('active_users[]');
531+
532+
// Should only have the active user with updated name
533+
expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Updated Active', status: 'active' })]);
534+
expect(data.length).toBe(1);
535+
});
536+
537+
test('Snapshot filter - complex WHERE clause', async () => {
538+
await using context = await BinlogStreamTestContext.open(factory);
539+
const { connectionManager } = context;
540+
await context.updateSyncRules(`
541+
bucket_definitions:
542+
recent_active_users:
543+
data:
544+
- SELECT id, name, created_at FROM "users"
545+
source_tables:
546+
users:
547+
snapshot_filter: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY) AND status = 'active'"`);
548+
549+
await connectionManager.query(
550+
`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), created_at DATETIME)`
551+
);
552+
553+
// Insert recent active user
554+
const recentActiveId = uuid();
555+
await connectionManager.query(
556+
`INSERT INTO users(id, name, status, created_at) VALUES('${recentActiveId}', 'Recent Active', 'active', NOW())`
557+
);
558+
559+
// Insert old active user
560+
const oldActiveId = uuid();
561+
await connectionManager.query(
562+
`INSERT INTO users(id, name, status, created_at) VALUES('${oldActiveId}', 'Old Active', 'active', DATE_SUB(NOW(), INTERVAL 30 DAY))`
563+
);
564+
565+
// Insert recent inactive user
566+
const recentInactiveId = uuid();
567+
await connectionManager.query(
568+
`INSERT INTO users(id, name, status, created_at) VALUES('${recentInactiveId}', 'Recent Inactive', 'inactive', NOW())`
569+
);
570+
571+
await context.replicateSnapshot();
572+
573+
const data = await context.getBucketData('recent_active_users[]');
574+
575+
// Should only have the recent active user
576+
expect(data.length).toBe(1);
577+
expect(data[0]).toMatchObject({
578+
op: 'PUT',
579+
object_type: 'users',
580+
object_id: recentActiveId
581+
});
582+
});
583+
584+
test('Snapshot filter - no filter means all rows replicated', async () => {
585+
await using context = await BinlogStreamTestContext.open(factory);
586+
const { connectionManager } = context;
587+
await context.updateSyncRules(`
588+
bucket_definitions:
589+
all_users:
590+
data:
591+
- SELECT id, name FROM "users"`);
592+
593+
await connectionManager.query(`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT)`);
594+
595+
// Insert multiple users
596+
const user1Id = uuid();
597+
const user2Id = uuid();
598+
await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user1Id}', 'User 1')`);
599+
await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user2Id}', 'User 2')`);
600+
601+
await context.replicateSnapshot();
602+
603+
const data = await context.getBucketData('all_users[]');
604+
605+
// Should have both users when no filter is specified
606+
expect(data.length).toBe(2);
607+
});
404608
}

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ export class PostgresSyncRulesStorage
249249
schema: schema,
250250
name: table,
251251
replicaIdColumns: replicaIdColumns,
252-
snapshotComplete: sourceTableRow!.snapshot_done ?? true
252+
snapshotComplete: sourceTableRow!.snapshot_done ?? true,
253+
snapshotFilter: options.sync_rules.definition.getSnapshotFilter(connection_tag, schema, table)
253254
});
254255
if (!sourceTable.snapshotComplete) {
255256
sourceTable.snapshotStatus = {

packages/service-core/src/storage/SourceTable.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export interface SourceTableOptions {
1010
name: string;
1111
replicaIdColumns: ColumnDescriptor[];
1212
snapshotComplete: boolean;
13+
snapshotFilter?: string; // SQL WHERE clause in source DB syntax
1314
}
1415

1516
export interface TableSnapshotStatus {
@@ -84,6 +85,10 @@ export class SourceTable implements SourceEntityDescriptor {
8485
return this.options.replicaIdColumns;
8586
}
8687

88+
get snapshotFilter() {
89+
return this.options.snapshotFilter;
90+
}
91+
8792
/**
8893
* Sanitized name of the entity in the format of "{schema}.{entity name}"
8994
* Suitable for safe use in Postgres queries.
@@ -107,7 +112,8 @@ export class SourceTable implements SourceEntityDescriptor {
107112
schema: this.schema,
108113
name: this.name,
109114
replicaIdColumns: this.replicaIdColumns,
110-
snapshotComplete: this.snapshotComplete
115+
snapshotComplete: this.snapshotComplete,
116+
snapshotFilter: this.snapshotFilter
111117
});
112118
copy.syncData = this.syncData;
113119
copy.syncParameters = this.syncParameters;

packages/sync-rules/src/SyncConfig.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ export abstract class SyncConfig {
2121
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
2222
eventDescriptors: SqlEventDescriptor[] = [];
2323

24+
/**
25+
* Snapshot filters for source tables, organized by bucket/stream.
26+
* Map structure: bucketName -> tableName -> snapshotFilter
27+
* Multiple buckets can specify different filters for the same table.
28+
*/
29+
snapshotFilters: Map<string, Map<string, string>> = new Map();
30+
2431
/**
2532
* The (YAML-based) source contents from which these sync rules have been derived.
2633
*/
@@ -122,6 +129,57 @@ export abstract class SyncConfig {
122129
debugRepresentation() {
123130
return this.bucketSources.map((rules) => rules.debugRepresentation());
124131
}
132+
133+
/**
134+
* Get the aggregated snapshot filter for a given table.
135+
* If multiple buckets specify filters for the same table, they are ORed together.
136+
*
137+
* @param connectionTag Connection tag (e.g., 'default')
138+
* @param schema Schema name
139+
* @param tableName Table name (without wildcard %)
140+
* @returns Aggregated SQL WHERE clause, or undefined if no filters are specified
141+
*/
142+
getSnapshotFilter(connectionTag: string, schema: string, tableName: string): string | undefined {
143+
const filters: string[] = [];
144+
const fullTableName = `${schema}.${tableName}`;
145+
146+
// Collect all filters for this specific table from all buckets/streams
147+
for (const [_bucketName, tableFilters] of this.snapshotFilters) {
148+
// Check for filters matching this table (could be exact match or wildcard)
149+
for (const [pattern, filter] of tableFilters) {
150+
const tablePattern = this.parseTablePattern(connectionTag, schema, pattern);
151+
if (tablePattern.matches({ connectionTag, schema, name: tableName })) {
152+
filters.push(filter);
153+
}
154+
}
155+
}
156+
157+
if (filters.length === 0) {
158+
return undefined;
159+
}
160+
161+
if (filters.length === 1) {
162+
return filters[0];
163+
}
164+
165+
// OR multiple filters together, wrapping each in parentheses
166+
return filters.map(f => `(${f})`).join(' OR ');
167+
}
168+
169+
/**
170+
* Helper to parse a table pattern string into a TablePattern object
171+
*/
172+
private parseTablePattern(connectionTag: string, defaultSchema: string, pattern: string): TablePattern {
173+
// Split on '.' to extract schema and table parts
174+
const parts = pattern.split('.');
175+
if (parts.length === 1) {
176+
// Just table name, use default schema
177+
return new TablePattern(defaultSchema, parts[0]);
178+
} else {
179+
// schema.table format
180+
return new TablePattern(parts[0], parts[1]);
181+
}
182+
}
125183
}
126184
export interface SyncConfigWithErrors {
127185
config: SyncConfig;

packages/sync-rules/src/from_yaml.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,25 @@ export class SyncConfigFromYaml {
266266
};
267267
const parameters = value.get('parameters', true) as unknown;
268268
const dataQueries = value.get('data', true) as unknown;
269+
const source_tables = value.get('source_tables', true) as unknown;
270+
271+
// Parse source_tables configuration for this bucket
272+
if (source_tables instanceof YAMLMap) {
273+
const tableFilters = new Map<string, string>();
274+
for (const entry of source_tables.items) {
275+
const { key: tableKey, value: tableValue } = entry as { key: Scalar; value: YAMLMap };
276+
const tableName = tableKey.toString();
277+
if (tableValue instanceof YAMLMap) {
278+
const snapshot_filter = tableValue.get('snapshot_filter', true) as Scalar | null;
279+
if (snapshot_filter) {
280+
tableFilters.set(tableName, snapshot_filter.toString());
281+
}
282+
}
283+
}
284+
if (tableFilters.size > 0) {
285+
rules.snapshotFilters.set(key, tableFilters);
286+
}
287+
}
269288

270289
const descriptor = new SqlBucketDescriptor(key);
271290

@@ -333,6 +352,25 @@ export class SyncConfigFromYaml {
333352
compatibility
334353
};
335354

355+
// Parse source_tables configuration for this stream
356+
const source_tables = value.get('source_tables', true) as unknown;
357+
if (source_tables instanceof YAMLMap) {
358+
const tableFilters = new Map<string, string>();
359+
for (const entry of source_tables.items) {
360+
const { key: tableKey, value: tableValue } = entry as { key: Scalar; value: YAMLMap };
361+
const tableName = tableKey.toString();
362+
if (tableValue instanceof YAMLMap) {
363+
const snapshot_filter = tableValue.get('snapshot_filter', true) as Scalar | null;
364+
if (snapshot_filter) {
365+
tableFilters.set(tableName, snapshot_filter.toString());
366+
}
367+
}
368+
}
369+
if (tableFilters.size > 0) {
370+
rules.snapshotFilters.set(key, tableFilters);
371+
}
372+
}
373+
336374
const data = value.get('query', true) as unknown;
337375
if (data instanceof Scalar) {
338376
this.#withScalar(data, (q) => {

0 commit comments

Comments
 (0)