Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
83955df
PoC convex adapter
kobiebotha Feb 6, 2026
2006bda
cleaner debug log for bucket storage entries
kobiebotha Feb 6, 2026
e113032
wip: global lsn
kobiebotha Feb 9, 2026
33ecf73
remove single table optimization
kobiebotha Feb 11, 2026
de63c81
expirementing with agents.md
kobiebotha Feb 11, 2026
ee1e5df
write checkpoints
kobiebotha Feb 12, 2026
facb78c
tighten convex LSN format
kobiebotha Feb 17, 2026
a25cd1b
cleanup
kobiebotha Feb 17, 2026
a34c0b5
snake_case fix
kobiebotha Feb 17, 2026
ec9e046
slow tests
kobiebotha Feb 17, 2026
950e3d7
fix test
kobiebotha Feb 18, 2026
1c8dd1f
remove streaming import cruft
kobiebotha Feb 19, 2026
817e6b2
simplify LSN representation
kobiebotha Feb 19, 2026
c688653
comments
kobiebotha Feb 19, 2026
6d4e11a
use BaseObserver
kobiebotha Feb 20, 2026
ceb3941
use regular convex mutations for powersync_checkpoints
kobiebotha Feb 20, 2026
d7e97ef
cleanup
kobiebotha Feb 20, 2026
0c7c370
remove agents.md entirelyu
kobiebotha Mar 6, 2026
565d4e7
simplify LSN representation
kobiebotha Mar 6, 2026
92f477e
simplify LSNs even further
kobiebotha Mar 6, 2026
84ba475
simplify convexLSN into oblivion
kobiebotha Mar 6, 2026
78f821a
remove configurable request timeout
kobiebotha Mar 6, 2026
596c1aa
resumable initial replication
kobiebotha Mar 6, 2026
f0c83b6
inline snapshotting for new tables
kobiebotha Mar 6, 2026
05945b9
clean up type mapping
kobiebotha Mar 6, 2026
9b1d02f
pnpmlock
kobiebotha Mar 7, 2026
3ba4d1e
Merge remote-tracking branch 'origin/main' into poc-convex
kobiebotha Mar 7, 2026
1831687
upstream api change
kobiebotha Mar 7, 2026
96d9be9
fix regression in snapshotting
kobiebotha Mar 9, 2026
8007ccf
Merge branch 'main' into poc-convex
kobiebotha Mar 10, 2026
c0dc0d8
fix test deps
kobiebotha Mar 10, 2026
39b7cd9
add missing dep
kobiebotha Mar 10, 2026
9f6f81c
update pnpm-lock.yaml
kobiebotha Mar 10, 2026
8cddab7
prettier -_-
kobiebotha Mar 10, 2026
f6e245d
README cleanup
kobiebotha Mar 11, 2026
c0e4e61
Merge branch 'main' into module-convex
kobiebotha Mar 11, 2026
b6c623b
stop using deprecated startBatch()
kobiebotha Mar 11, 2026
682f9a0
README updates
kobiebotha Mar 11, 2026
60c745b
Merge branch 'main' into module-convex
kobiebotha Mar 16, 2026
bcfcfde
update fake storage in tests for new createWriter() storage API
kobiebotha Mar 16, 2026
776f35d
document assumptions about convex LSN length
kobiebotha Mar 17, 2026
5b53f41
filter out additional metadata fields during replication
kobiebotha Mar 18, 2026
94a4ec2
Enforce reject_ip_ranges for write checkpoint creation
kobiebotha Mar 18, 2026
0738577
linting
kobiebotha Mar 18, 2026
2d87458
cast mock to fix overloaded signatures
kobiebotha Mar 18, 2026
4b39e8a
cleanup README
kobiebotha Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions modules/module-convex/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# PowerSync Convex Module

Convex replication module for PowerSync.

## Configuration

```yaml
replication:
connections:
- type: convex
deployment_url: https://<your-deployment>.convex.cloud
deploy_key: <your-deploy-key>
polling_interval_ms: 1000
request_timeout_ms: 30000
```

## Manual smoke test

1. Simplest is to run the convex demo in the self-host-demo [repo](https://github.com/powersync-ja/self-host-demo)

# Technical notes

The content below is written in an agents.md style describing the behavior of `module-convex`.

## 1) Scope

- This module replicates Convex data into PowerSync bucket storage.
- Source APIs used are Convex [Streaming Export](https://docs.convex.dev/streaming-export-api): (`json_schemas`, `list_snapshot`, `document_deltas`).
- Initial scope is default Convex component only, but we could consider support for custom components in the future if we can figure out consistency.
- Deploy keys grant root access (read/write on all tables), components could address this later.

## 2) Canonical Behavior

- Initial replication:
- Initial replication pins a global Convex snapshot boundary using `list_snapshot`. If this is omitted, it provides the global snapshot boundary [ref](https://docs.convex.dev/streaming-export-api#get-apilist_snapshot).
- Snapshot each selected Sync Streams table with that fixed `snapshot`.
- First per-table snapshot call omits `cursor`; pagination cursor is only for later pages in the same run.
- Commit snapshot LSN, then switch to deltas.
- Streaming replication:
- Start from persisted resume LSN.
- Poll `document_deltas` using frequency configured in `polling_interval_ms`
- Always stream globally (no `tableName` filter), then filter locally by selected Sync Streams tables.
- If a table is first seen in a `document_deltas` page and matches Sync Streams, snapshot it inline at that page boundary and skip that table's delta rows from the same page, because the snapshot already includes them.

## 3) Hard Invariants (Do Not Break)

- `snapshot` is the consistency boundary; page `cursor` is pagination state.
- All table snapshots in a run must use the same pinned `snapshot`; if response snapshot differs, fail fast.
- On restart during initial replication:
- Reuse persisted snapshot LSN boundary.
- Resume table page walk from the persisted per-table `lastKey` cursor when available.
- If the last page was already flushed before interruption, mark the table snapshot done without re-reading rows.
- Delta streaming starts from resume LSN (snapshot boundary), not from table page cursor.
- `tablePattern.connectionTag` and schema must match before table selection.
- Source table replica identity is `_id`.
- The overall system must ensure causal consistency of replicated data in bucket storage.

## 4) LSN and Cursor Rules

- Convex snapshot and delta cursors are always `i64` timestamps (serialized as decimal numeric strings in JSON).
- The `list_snapshot` pagination cursor is a separate JSON-serialized `{table, id}` string — it is pagination state, not a replication cursor.
- Persisted Convex LSNs must be canonical 19-digit numeric cursor strings. `ZERO_LSN = "0"` remains the internal sentinel.

## 5) API Client Contract

- Auth header: `Authorization: Convex <deploy_key>`.
- Always request `format=json`.
- Parse large numeric JSON using `JSONBig`.
- Retry classification:
- retryable: network, timeout, 429, 5xx.
- non-retryable: malformed responses, auth/config issues.

## 6) Schema Change Caveat

- Convex `json_schemas` does not provide a schema change token or revision cursor that can be checkpointed.
- Current behavior uses `json_schemas` for discovery/debug, but does not continuously diff source schema versions.
- Operational caveat: if Convex schema changes (tables or columns), developers must review and redeploy Sync Streams manually.
- Future improvement: cache a canonicalized `json_schemas` hash, poll periodically, and raise diagnostics when schema drift is detected.

## 7) Datatype Mapping

- Current runtime mapping in stream writer:

| Convex Type | TS/JS Type | SQLite type |
| ----------- | ----------- | ---------------------------------- |
| Id | string | text |
| Null | null | null |
| Int64 | bigint | integer |
| Float64 | number | real |
| Boolean | boolean | Up to developer - string or number |
| String | string | text |
| Bytes | ArrayBuffer | text |
| Array | Array | text |
| Object | Object | text |
| Record | Record | text |

- Convex does not expose a native `Date` wire type; timestamps arrive as `number` or `string`.
- BLOB values are valid row values but are not valid bucket parameter values.

## 8) Checkpointing and Consistency

- `createReplicationHead` must:
1. resolve global head cursor,
2. write a Convex checkpoint marker via `POST /api/mutation` (calls `powersync_checkpoints:createCheckpoint`),
3. then pass the head to callback.
- Source marker table: `powersync_checkpoints`
- Convex rejects table names starting with `_`, so no leading-underscore variant is used.
- The table has a single `last_updated` field; the mutation upserts one row (bounded to one row total).
- The developer must deploy the `powersync_checkpoints` schema and mutation to their Convex project.
- Stream handling requirement:
- checkpoint marker tables must always be excluded from replicated source tables and ignored in delta row application.
- marker-only delta pages must trigger immediate `keepalive` checkpoint advancement (do not wait for 60s throttle).

## 9) Other Convex-specific notes

- The default schema is `convex`
- On an idle system, multiple successive calls to `/api/document_deltas` will return the same cursor value i.e. the cursor is not wall clock based.

- **Mutation Transaction Atomicity in** `document_deltas`
- The `cursor` in `/api/document_deltas` is a Convex commit **timestamp** (`i64`), not a per-operation counter.
- Every Convex mutation is an ACID transaction that commits with a single timestamp; all writes within that mutation share the same `_ts` value in the delta stream.
- Therefore, the cursor advances **once per mutation**, not once per individual CRUD operation inside it.
- Example: a mutation that deletes 5 documents and updates 3 produces 8 entries in `document_deltas`, all with identical `_ts`.
- The Convex backend enforces this by never splitting a page mid-timestamp: when the row limit is reached mid-transaction, the page extends until all rows at that `_ts` are included before stopping.
- Consequence for replication: all writes from a single mutation always appear in the same `document_deltas` page and are committed to bucket storage atomically as one batch.
43 changes: 43 additions & 0 deletions modules/module-convex/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "@powersync/service-module-convex",
"repository": "https://github.com/powersync-ja/powersync-service",
"types": "dist/index.d.ts",
"version": "0.1.0",
"license": "FSL-1.1-ALv2",
"main": "dist/index.js",
"type": "module",
"publishConfig": {
"access": "public"
},
"scripts": {
"build": "tsc -b",
"build:tests": "tsc -b test/tsconfig.json",
"clean": "rm -rf ./dist && tsc -b --clean",
"test": "vitest"
},
"exports": {
".": {
"import": "./dist/index.js",
"require": "./dist/index.js",
"default": "./dist/index.js"
},
"./types": {
"import": "./dist/types/types.js",
"require": "./dist/types/types.js",
"default": "./dist/types/types.js"
}
},
"dependencies": {
"@powersync/lib-services-framework": "workspace:*",
"@powersync/service-core": "workspace:*",
"@powersync/service-jsonbig": "workspace:*",
"@powersync/service-sync-rules": "workspace:*",
"@powersync/service-types": "workspace:*",
"ts-codec": "^1.3.0"
},
"devDependencies": {
"@powersync/service-core-tests": "workspace:*",
"@powersync/service-module-mongodb-storage": "workspace:*",
"@powersync/service-module-postgres-storage": "workspace:*"
}
}
222 changes: 222 additions & 0 deletions modules/module-convex/src/api/ConvexRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import {
api,
ParseSyncRulesOptions,
ReplicationHeadCallback,
ReplicationLagOptions,
SourceTable
} from '@powersync/service-core';
import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
import { toConvexLsn } from '../common/ConvexLSN.js';
import { isConvexCheckpointTable } from '../common/ConvexCheckpoints.js';
import { extractProperties, readConvexFieldType, toExpressionTypeFromConvexType } from '../common/convex-to-sqlite.js';
import { ConvexConnectionManager } from '../replication/ConvexConnectionManager.js';
import * as types from '../types/types.js';

export class ConvexRouteAPIAdapter implements api.RouteAPI {
protected connectionManager: ConvexConnectionManager;

constructor(protected config: types.ResolvedConvexConnectionConfig) {
this.connectionManager = new ConvexConnectionManager(config);
}

async getSourceConfig(): Promise<service_types.configFile.ResolvedDataSourceConfig> {
return this.config;
}

async getConnectionStatus(): Promise<service_types.ConnectionStatusV2> {
const base = {
id: this.config.id,
uri: types.baseUri(this.config)
};

try {
await this.connectionManager.client.getJsonSchemas();
return {
...base,
connected: true,
errors: []
};
} catch (error) {
return {
...base,
connected: false,
errors: [{ level: 'fatal', message: error instanceof Error ? error.message : `${error}` }]
};
}
}

async getDebugTablesInfo(
tablePatterns: sync_rules.TablePattern[],
sqlSyncRules: sync_rules.SyncConfig
): Promise<api.PatternResult[]> {
const schema = await this.connectionManager.client.getJsonSchemas();
const tablesByName = new Map(schema.tables.map((table) => [table.tableName, table]));

const result: api.PatternResult[] = [];

for (const tablePattern of tablePatterns) {
const patternResult: api.PatternResult = {
schema: tablePattern.schema,
pattern: tablePattern.tablePattern,
wildcard: tablePattern.isWildcard
};

result.push(patternResult);

if (tablePattern.connectionTag != this.connectionManager.connectionTag) {
if (tablePattern.isWildcard) {
patternResult.tables = [];
} else {
patternResult.table = createTableInfo({
tablePattern,
connectionTag: this.connectionManager.connectionTag,
syncRules: sqlSyncRules,
errors: [{ level: 'warning', message: 'Skipped: connection tag does not match Convex connection tag' }]
});
}
continue;
}

const matchedTableNames = [...tablesByName.keys()]
.filter((name) => {
//Convex doesn't support user-defined schemas, so this is more of a forwards compatibility check for when multiple connections are supported
if (tablePattern.schema != this.connectionManager.schema) {
return false;
}
if (isConvexCheckpointTable(name)) {
return false;
}
if (tablePattern.isWildcard) {
return name.startsWith(tablePattern.tablePrefix);
}
return name == tablePattern.name;
})
.sort();

if (tablePattern.isWildcard) {
patternResult.tables = matchedTableNames.map((tableName) =>
createTableInfo({
tablePattern,
connectionTag: this.connectionManager.connectionTag,
syncRules: sqlSyncRules,
tableName
})
);
} else {
const tableName = matchedTableNames[0] ?? tablePattern.name;
patternResult.table = createTableInfo({
tablePattern,
connectionTag: this.connectionManager.connectionTag,
syncRules: sqlSyncRules,
tableName,
errors:
matchedTableNames.length == 0
? [{ level: 'warning', message: `Table ${tablePattern.schema}.${tablePattern.name} not found` }]
: []
});
}
}

return result;
}

//for convex we can calculate time-based lag, but not byte-based lag
async getReplicationLagBytes(options: ReplicationLagOptions): Promise<number | undefined> {
return undefined;
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
const head = await this.connectionManager.client.getHeadCursor();
await this.connectionManager.client.createWriteCheckpointMarker();
return await callback(toConvexLsn(head));
}

async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
const schema = await this.connectionManager.client.getJsonSchemas();

return [
{
name: this.connectionManager.schema,
tables: schema.tables
.filter((table) => !isConvexCheckpointTable(table.tableName))
.map((table) => ({
name: table.tableName,
columns: Object.entries({
...extractProperties(table.schema),
_id: { type: 'id' }
})
.sort(([a], [b]) => a.localeCompare(b))
.map(([columnName, property]) => {
const jsonType = readConvexFieldType(property);
const sqliteType = toExpressionTypeFromConvexType(jsonType);

return {
name: columnName,
type: jsonType,
sqlite_type: sqliteType.typeFlags,
internal_type: jsonType,
pg_type: jsonType
};
})
}))
}
];
}

async executeQuery(query: string, params: any[]): Promise<service_types.internal_routes.ExecuteSqlResponse> {
return service_types.internal_routes.ExecuteSqlResponse.encode({
results: {
columns: [],
rows: []
},
success: false,
error: 'SQL querying is not supported for Convex'
});
}

async shutdown(): Promise<void> {
await this.connectionManager.end();
}

async [Symbol.asyncDispose]() {
await this.shutdown();
}

getParseSyncRulesOptions(): ParseSyncRulesOptions {
return {
defaultSchema: this.connectionManager.schema
};
}
}

function createTableInfo(options: {
tablePattern: sync_rules.TablePattern;
connectionTag: string;
syncRules: sync_rules.SqlSyncRules;
tableName?: string;
errors?: service_types.ReplicationError[];
}) {
const tableName =
options.tableName ??
(options.tablePattern.isWildcard ? options.tablePattern.tablePrefix : options.tablePattern.name);
const sourceTable = new SourceTable({
id: tableName,
connectionTag: options.connectionTag,
objectId: tableName,
schema: options.tablePattern.schema,
name: tableName,
replicaIdColumns: [{ name: '_id' }],
snapshotComplete: true
});

return {
schema: options.tablePattern.schema,
name: tableName,
pattern: options.tablePattern.isWildcard ? options.tablePattern.tablePattern : undefined,
replication_id: ['_id'],
data_queries: options.syncRules.tableSyncsData(sourceTable),
parameter_queries: options.syncRules.tableSyncsParameters(sourceTable),
errors: options.errors ?? []
};
}
Loading
Loading