Skip to content

Commit e5c9853

Browse files
raffidahmadsuika
authored andcommitted
feat: implement global initial_snapshot_filters for all database modules
1 parent be0c7cd commit e5c9853

13 files changed

Lines changed: 237 additions & 52 deletions

File tree

docs/initial-snapshot-filters.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,79 @@ bucket_definitions:
288288
- SELECT * FROM events_2025
289289
```
290290

291+
## Special Characters and Identifiers
292+
293+
⚠️ **Important**: Filter expressions are embedded directly into SQL/MongoDB queries. You must properly quote identifiers and escape string literals according to your database's syntax rules.
294+
295+
### SQL Identifier Quoting
296+
297+
**PostgreSQL** - Use double quotes for identifiers with spaces or special characters:
298+
```yaml
299+
initial_snapshot_filters:
300+
users:
301+
sql: "\"User Status\" = 'active' AND \"created-at\" > NOW() - INTERVAL '30 days'"
302+
```
303+
304+
**MySQL** - Use backticks for identifiers with spaces or special characters:
305+
```yaml
306+
initial_snapshot_filters:
307+
users:
308+
sql: "`User Status` = 'active' AND `created-at` > NOW() - INTERVAL 30 DAY"
309+
```
310+
311+
**SQL Server** - Use square brackets for identifiers with spaces or special characters:
312+
```yaml
313+
initial_snapshot_filters:
314+
users:
315+
sql: "[User Status] = 'active' AND [created-at] > DATEADD(day, -30, GETDATE())"
316+
```
317+
318+
### String Literal Escaping
319+
320+
Always use proper escaping for string literals containing quotes:
321+
322+
```yaml
323+
initial_snapshot_filters:
324+
comments:
325+
# Single quotes must be escaped as '' in SQL
326+
sql: "content NOT LIKE '%can''t%' AND status = 'approved'"
327+
```
328+
329+
### Complex Expressions with OR Operators
330+
331+
PowerSync automatically wraps your filter in parentheses to prevent operator precedence issues:
332+
333+
```yaml
334+
initial_snapshot_filters:
335+
users:
336+
# This is wrapped as: WHERE (status = 'active' OR status = 'pending')
337+
sql: "status = 'active' OR status = 'pending'"
338+
```
339+
340+
### MongoDB Filters
341+
342+
MongoDB filters use native BSON query syntax, which is safer than string concatenation:
343+
344+
```yaml
345+
initial_snapshot_filters:
346+
users:
347+
mongo:
348+
$or:
349+
- status: 'active'
350+
- status: 'pending'
351+
"special field": { $exists: true }
352+
```
353+
354+
### Security Considerations
355+
356+
- Filters are defined in `sync_rules.yaml` by administrators, not by end users
357+
- Filters are static configuration, not dynamic user input
358+
- Still follow security best practices:
359+
- Avoid including sensitive data in filters
360+
- Test filters in development before production
361+
- Review filter changes carefully during deployment
362+
- PowerSync does not parameterize filters since they are arbitrary SQL expressions, similar to bucket query definitions
363+
291364
## Limitations
292365

293366
- Filters are applied **globally** across all buckets using that table

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ export class MongoSyncBucketStorage
239239
schema: schema,
240240
name: name,
241241
replicaIdColumns: replicaIdColumns,
242-
snapshotComplete: doc.snapshot_done ?? true
242+
snapshotComplete: doc.snapshot_done ?? true,
243+
initialSnapshotFilter: options.sync_rules.definition.getInitialSnapshotFilter(connection_tag, schema, name, 'mongo')
243244
});
244245
sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable);
245246
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,8 @@ export class ChangeStream {
483483
await using query = new ChunkedSnapshotQuery({
484484
collection,
485485
key: table.snapshotStatus?.lastKey,
486-
batchSize: this.snapshotChunkLength
486+
batchSize: this.snapshotChunkLength,
487+
filter: table.initialSnapshotFilter?.mongo
487488
});
488489
if (query.lastKey != null) {
489490
this.logger.info(
@@ -492,6 +493,9 @@ export class ChangeStream {
492493
} else {
493494
this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`);
494495
}
496+
if (table.initialSnapshotFilter?.mongo) {
497+
this.logger.info(`Applying initial snapshot filter: ${JSON.stringify(table.initialSnapshotFilter.mongo)}`);
498+
}
495499

496500
let lastBatch = performance.now();
497501
let nextChunkPromise = query.nextChunk();

modules/module-mongodb/src/replication/MongoSnapshotQuery.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,19 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
1313
private lastCursor: mongo.FindCursor | null = null;
1414
private collection: mongo.Collection;
1515
private batchSize: number;
16+
private snapshotFilter: any;
1617

17-
public constructor(options: { collection: mongo.Collection; batchSize: number; key?: Uint8Array | null }) {
18+
public constructor(options: {
19+
collection: mongo.Collection;
20+
batchSize: number;
21+
key?: Uint8Array | null;
22+
filter?: any;
23+
}) {
1824
this.lastKey = options.key ? bson.deserialize(options.key, { useBigInt64: true })._id : null;
1925
this.lastCursor = null;
2026
this.collection = options.collection;
2127
this.batchSize = options.batchSize;
28+
this.snapshotFilter = options.filter;
2229
}
2330

2431
async nextChunk(): Promise<{ docs: mongo.Document[]; lastKey: Uint8Array } | { docs: []; lastKey: null }> {
@@ -35,8 +42,23 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
3542
// any parsing as an operator.
3643
// Starting in MongoDB 5.0, this filter can use the _id index. Source:
3744
// https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements
38-
const filter: mongo.Filter<mongo.Document> =
45+
46+
// Build base filter for _id
47+
const idFilter: mongo.Filter<mongo.Document> =
3948
this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } };
49+
50+
// Combine with snapshot filter if present
51+
let filter: mongo.Filter<mongo.Document>;
52+
if (this.snapshotFilter) {
53+
if (this.lastKey == null) {
54+
filter = this.snapshotFilter;
55+
} else {
56+
filter = { $and: [idFilter, this.snapshotFilter] };
57+
}
58+
} else {
59+
filter = idFilter;
60+
}
61+
4062
cursor = this.collection.find(filter, {
4163
readConcern: 'majority',
4264
limit: this.batchSize,

modules/module-mssql/src/replication/CDCStream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,11 @@ export class CDCStream {
356356
query = new SimpleSnapshotQuery(transaction, table);
357357
replicatedCount = 0;
358358
}
359+
360+
if (table.sourceTable.initialSnapshotFilter?.sql) {
361+
this.logger.info(`Applying initial snapshot filter: ${table.sourceTable.initialSnapshotFilter.sql}`);
362+
}
363+
359364
await query.initialize();
360365

361366
let hasRemainingData = true;

modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ export class SimpleSnapshotQuery implements MSSQLSnapshotQuery {
4444
const request = this.transaction.request();
4545
const stream = request.toReadableStream();
4646

47-
request.query(`SELECT * FROM ${this.table.toQualifiedName()}`);
47+
let query = `SELECT * FROM ${this.table.toQualifiedName()}`;
48+
if (this.table.sourceTable.initialSnapshotFilter?.sql) {
49+
query += ` WHERE (${this.table.sourceTable.initialSnapshotFilter.sql})`;
50+
}
51+
request.query(query);
4852

4953
// MSSQL only streams one row at a time
5054
for await (const row of stream) {
@@ -141,17 +145,26 @@ export class BatchedSnapshotQuery implements MSSQLSnapshotQuery {
141145

142146
const request = this.transaction.request();
143147
const stream = request.toReadableStream();
148+
const snapshotFilter = this.table.sourceTable.initialSnapshotFilter?.sql;
149+
144150
if (this.lastKey == null) {
145-
request.query(`SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} ORDER BY ${escapedKeyName}`);
151+
let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()}`;
152+
if (snapshotFilter) {
153+
query += ` WHERE (${snapshotFilter})`;
154+
}
155+
query += ` ORDER BY ${escapedKeyName}`;
156+
request.query(query);
146157
} else {
147158
if (this.key.typeId == null) {
148159
throw new Error(`typeId required for primary key ${this.key.name}`);
149160
}
150-
request
151-
.input('lastKey', this.lastKey)
152-
.query(
153-
`SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey ORDER BY ${escapedKeyName}`
154-
);
161+
request.input('lastKey', this.lastKey);
162+
let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey`;
163+
if (snapshotFilter) {
164+
query += ` AND (${snapshotFilter})`;
165+
}
166+
query += ` ORDER BY ${escapedKeyName}`;
167+
request.query(query);
155168
}
156169

157170
// MSSQL only streams one row at a time

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,17 +307,21 @@ export class BinLogStream {
307307
batch: storage.BucketStorageBatch,
308308
table: storage.SourceTable
309309
) {
310-
this.logger.info(`Replicating ${qualifiedMySQLTable(table)}`);
310+
const qualifiedMySQLTableName = qualifiedMySQLTable(table);
311+
312+
this.logger.info(`Replicating ${qualifiedMySQLTableName}`);
311313
// TODO count rows and log progress at certain batch sizes
312314

313315
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
314-
let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`;
316+
let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTableName}`;
315317

316318
// Apply snapshot filter if it exists. This allows us to do partial snapshots,
317319
// for example for large tables where we only want to snapshot recent data.
318320
if (table.initialSnapshotFilter?.sql) {
319-
query += ` WHERE ${table.initialSnapshotFilter.sql}`;
321+
query += ` WHERE (${table.initialSnapshotFilter.sql})`;
320322
this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`);
323+
} else {
324+
this.logger.info(`No initial snapshot filter applied for ${qualifiedMySQLTableName}`);
321325
}
322326

323327
const queryStream = connection.query(query);

modules/module-postgres/src/replication/SnapshotQuery.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ export class SimpleSnapshotQuery implements SnapshotQuery {
3636
) {}
3737

3838
public async initialize(): Promise<void> {
39-
await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.qualifiedName}`);
39+
let query = `SELECT * FROM ${this.table.qualifiedName}`;
40+
if (this.table.initialSnapshotFilter?.sql) {
41+
query += ` WHERE (${this.table.initialSnapshotFilter.sql})`;
42+
}
43+
await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR ${query}`);
4044
}
4145

4246
public nextChunk(): AsyncIterableIterator<PgChunk> {
@@ -119,17 +123,27 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
119123
public async *nextChunk(): AsyncIterableIterator<PgChunk> {
120124
let stream: AsyncIterableIterator<PgChunk>;
121125
const escapedKeyName = escapeIdentifier(this.key.name);
126+
const snapshotFilter = this.table.initialSnapshotFilter?.sql;
127+
122128
if (this.lastKey == null) {
123-
stream = this.connection.stream(
124-
`SELECT * FROM ${this.table.qualifiedName} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`
125-
);
129+
let query = `SELECT * FROM ${this.table.qualifiedName}`;
130+
if (snapshotFilter) {
131+
query += ` WHERE (${snapshotFilter})`;
132+
}
133+
query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`;
134+
stream = this.connection.stream(query);
126135
} else {
127136
if (this.key.typeId == null) {
128137
throw new Error(`typeId required for primary key ${this.key.name}`);
129138
}
130139
const type = Number(this.key.typeId);
140+
let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1`;
141+
if (snapshotFilter) {
142+
query += ` AND (${snapshotFilter})`;
143+
}
144+
query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`;
131145
stream = this.connection.stream({
132-
statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`,
146+
statement: query,
133147
params: [{ value: this.lastKey, type }]
134148
});
135149
}
@@ -200,8 +214,14 @@ export class IdSnapshotQuery implements SnapshotQuery {
200214
if (type == null) {
201215
throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`);
202216
}
217+
218+
let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`;
219+
if (this.table.initialSnapshotFilter?.sql) {
220+
query += ` AND (${this.table.initialSnapshotFilter.sql})`;
221+
}
222+
203223
yield* this.connection.stream({
204-
statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`,
224+
statement: query,
205225
params: [
206226
{
207227
type: type,

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,11 @@ WHERE oid = $1::regclass`,
558558
q = new SimpleSnapshotQuery(db, table, this.snapshotChunkLength);
559559
at = 0;
560560
}
561+
562+
if (table.initialSnapshotFilter?.sql) {
563+
this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`);
564+
}
565+
561566
await q.initialize();
562567

563568
let hasRemainingData = true;

packages/sync-rules/src/SyncConfig.ts

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -144,41 +144,44 @@ export abstract class SyncConfig {
144144
/**
145145
* Get the initial snapshot filter for a given table.
146146
* Filters are applied globally and support wildcard matching.
147-
*
148-
* @param connectionTag Connection tag (e.g., 'default')
147+
*
148+
* @param connectionTag Connection tag for the active source connection
149149
* @param schema Schema name
150-
* @param tableName Table name (without wildcard %)
151-
* @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB)
152-
* @returns WHERE clause/query, or undefined if no filter is specified
150+
* @param tableName Concrete table name (wildcards are allowed in the filter patterns, not here)
151+
* @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB) - currently unused, returns full filter object
152+
* @returns Filter object with sql/mongo properties, or undefined if no filter is specified
153153
*/
154-
getInitialSnapshotFilter(connectionTag: string, schema: string, tableName: string, dbType: DatabaseType = 'sql'): string | any | undefined {
155-
const fullTableName = `${schema}.${tableName}`;
156-
157-
// Check for exact matches first
154+
getInitialSnapshotFilter(
155+
connectionTag: string,
156+
schema: string,
157+
tableName: string,
158+
dbType: DatabaseType = 'sql'
159+
): InitialSnapshotFilter | undefined {
158160
for (const [pattern, filterDef] of this.initialSnapshotFilters) {
159161
const tablePattern = this.parseTablePattern(connectionTag, schema, pattern);
160162
if (tablePattern.matches({ connectionTag, schema, name: tableName })) {
161-
// Return the appropriate filter based on database type
162-
return filterDef[dbType];
163+
return filterDef;
163164
}
164165
}
165-
166+
166167
return undefined;
167168
}
168169

169170
/**
170171
* Helper to parse a table pattern string into a TablePattern object
171172
*/
172173
private parseTablePattern(connectionTag: string, defaultSchema: string, pattern: string): TablePattern {
173-
// Split on '.' to extract schema and table parts
174174
const parts = pattern.split('.');
175175
if (parts.length === 1) {
176-
// Just table name, use default schema
177-
return new TablePattern(defaultSchema, parts[0]);
178-
} else {
179-
// schema.table format
180-
return new TablePattern(parts[0], parts[1]);
176+
return new TablePattern(`${connectionTag}.${defaultSchema}`, parts[0]);
177+
}
178+
if (parts.length === 2) {
179+
return new TablePattern(`${connectionTag}.${parts[0]}`, parts[1]);
181180
}
181+
const tag = parts[0];
182+
const schema = parts[1];
183+
const tableName = parts.slice(2).join('.');
184+
return new TablePattern(`${tag}.${schema}`, tableName);
182185
}
183186
}
184187
export interface SyncConfigWithErrors {

0 commit comments

Comments
 (0)