Skip to content

Commit a1419be

Browse files
authored
Sync streams evaluator (#465)
1 parent a02cc58 commit a1419be

21 files changed

Lines changed: 2179 additions & 31 deletions

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
SqliteJsonRow,
1616
SqliteRow
1717
} from './types.js';
18-
import { filterJsonRow } from './utils.js';
18+
import { filterJsonRow, idFromData } from './utils.js';
1919

2020
export interface RowValueExtractor {
2121
extract(tables: QueryParameters, into: SqliteRow): void;
@@ -183,16 +183,7 @@ export class BaseSqlDataQuery {
183183
}
184184

185185
const data = this.transformRow(tables);
186-
let id = data.id;
187-
if (typeof id != 'string') {
188-
// While an explicit cast would be better, this covers against very common
189-
// issues when initially testing out sync, for example when the id column is an
190-
// auto-incrementing integer.
191-
// If there is no id column, we use a blank id. This will result in the user syncing
192-
// a single arbitrary row for this table - better than just not being able to sync
193-
// anything.
194-
id = castAsText(id) ?? '';
195-
}
186+
const id = idFromData(data);
196187
const outputTable = this.getOutputName(table.name);
197188

198189
return resolvedBucketParameters.map((serializedBucketParameters) => {

packages/sync-rules/src/compiler/sqlite.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,12 @@ export class PostgresToSqlite {
178178
return this.invalidExpression(expr.function, 'DISTINCT, ORDER BY, FILTER and OVER clauses are not supported');
179179
}
180180

181-
const forbiddenReason = forbiddenFunctions[expr.function.name];
181+
const functionName = expr.function.name.toLowerCase();
182+
const forbiddenReason = forbiddenFunctions[functionName];
182183
if (forbiddenReason) {
183184
return this.invalidExpression(expr.function, `Forbidden call: ${forbiddenReason}`);
184185
}
185-
let allowedArgs = supportedFunctions[expr.function.name];
186+
let allowedArgs = supportedFunctions[functionName];
186187
if (allowedArgs == null) {
187188
return this.invalidExpression(expr.function, 'Unknown function');
188189
} else {
@@ -204,7 +205,7 @@ export class PostgresToSqlite {
204205

205206
return {
206207
type: 'function',
207-
function: expr.function.name,
208+
function: functionName,
208209
parameters: expr.args.map((a) => this.translateNodeWithLocation(a))
209210
};
210211
}
@@ -216,6 +217,9 @@ export class PostgresToSqlite {
216217
const left = this.translateNodeWithLocation(expr.left);
217218
const right = this.translateNodeWithLocation(expr.right);
218219
if (expr.op === 'LIKE') {
220+
// We don't support LIKE in the old bucket definition system, and want to make sure we're clear about ICU,
221+
// case sensitivity and changing the escape character first. TODO: Support later.
222+
this.options.errors.report('LIKE expressions are not currently supported.', expr);
219223
return { type: 'function', function: 'like', parameters: [left, right] };
220224
} else if (expr.op === 'NOT LIKE') {
221225
return {
@@ -243,8 +247,9 @@ export class PostgresToSqlite {
243247
let rightHandSideOfIs: SqlExpression<ExpressionInput>;
244248

245249
switch (expr.op) {
246-
case '+':
247250
case '-':
251+
return this.invalidExpression(expr, 'Unary minus is not currently supported');
252+
case '+':
248253
return { type: 'unary', operator: expr.op, operand: this.translateNodeWithLocation(expr.operand) };
249254
case 'NOT':
250255
return { type: 'unary', operator: 'not', operand: this.translateNodeWithLocation(expr.operand) };

packages/sync-rules/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ export * from './HydratedSyncRules.js';
3333
export * from './compiler/compiler.js';
3434
export * from './sync_plan/plan.js';
3535
export { serializeSyncPlan } from './sync_plan/serialize.js';
36+
export { addPrecompiledSyncPlanToRules } from './sync_plan/evaluator/index.js';
37+
export { javaScriptExpressionEngine } from './sync_plan/engine/javascript.js';
38+
export { nodeSqliteExpressionEngine } from './sync_plan/engine/sqlite.js';

packages/sync-rules/src/sql_functions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ const TYPE_ORDERING = {
11021102
blob: 3
11031103
};
11041104

1105-
function compare(a: SqliteValue, b: SqliteValue): number {
1105+
export function compare(a: SqliteValue, b: SqliteValue): number {
11061106
// https://www.sqlite.org/datatype3.html#comparisons
11071107
if (a == null && b == null) {
11081108
// Only for IS / IS NOT
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
import {
2+
compare,
3+
CompatibilityContext,
4+
generateSqlFunctions,
5+
SQLITE_FALSE,
6+
SQLITE_TRUE,
7+
sqliteBool,
8+
sqliteNot
9+
} from '../../index.js';
10+
import { cast, evaluateOperator, SqlFunction } from '../../sql_functions.js';
11+
import { cartesianProduct } from '../../streams/utils.js';
12+
import { generateTableValuedFunctions } from '../../TableValuedFunctions.js';
13+
import { SqliteRow, SqliteValue } from '../../types.js';
14+
import {
15+
ExternalData,
16+
UnaryExpression,
17+
BinaryExpression,
18+
BetweenExpression,
19+
ScalarInExpression,
20+
CaseWhenExpression,
21+
CastExpression,
22+
ScalarFunctionCallExpression,
23+
LiteralExpression,
24+
SqlExpression
25+
} from '../expression.js';
26+
import { ExpressionVisitor, visitExpr } from '../expression_visitor.js';
27+
import {
28+
ScalarExpressionEngine,
29+
ScalarExpressionEvaluator,
30+
TableValuedFunction,
31+
TableValuedFunctionOutput
32+
} from './scalar_expression_engine.js';
33+
34+
/**
35+
* Creates a {@link ScalarExpressionEngine} implemented by evaluating scalar expressions in JavaScript.
36+
*/
37+
export function javaScriptExpressionEngine(compatibility: CompatibilityContext): ScalarExpressionEngine {
38+
const tableValued = generateTableValuedFunctions(compatibility);
39+
const regularFunctions = generateSqlFunctions(compatibility);
40+
const compiler = new ExpressionToJavaScriptFunction({
41+
named: regularFunctions.named,
42+
jsonExtractJson: regularFunctions.operatorJsonExtractJson,
43+
jsonExtractSql: regularFunctions.operatorJsonExtractSql
44+
});
45+
46+
return {
47+
close() {},
48+
prepareEvaluator({ outputs = [], filters = [], tableValuedFunctions = [] }): ScalarExpressionEvaluator {
49+
function compileScalar(expr: SqlExpression<number | TableValuedFunctionOutput>) {
50+
return compiler.compile(expr);
51+
}
52+
53+
const resolvedTableValuedFunctions = tableValuedFunctions.map((fn) => {
54+
const found = tableValued[fn.name];
55+
if (found == null) {
56+
throw new Error(`Unknown table-valued function: ${fn.name}`);
57+
}
58+
59+
const inputs = fn.inputs.map(compileScalar);
60+
return {
61+
original: fn,
62+
evaluate: (input: PendingStatementEvaluation) => {
63+
return found.call(inputs.map((f) => f(input)));
64+
}
65+
};
66+
});
67+
68+
const columns = outputs.map(compileScalar);
69+
const compiledFilters = filters.map(compileScalar);
70+
71+
return {
72+
evaluate(inputs) {
73+
// First, evaluate table-valued functions (if any).
74+
const perFunctionResults: [TableValuedFunction, SqliteRow][][] = [];
75+
76+
for (const { original, evaluate } of resolvedTableValuedFunctions) {
77+
perFunctionResults.push(
78+
evaluate({ inputs }).map((row) => [original, row] satisfies [TableValuedFunction, SqliteRow])
79+
);
80+
}
81+
82+
const rows: SqliteValue[][] = [];
83+
// We're doing an inner join on all table-valued functions, which we implement as a cross join on which each
84+
// filter is evaluated. Having more than one table-valued function per statement would be very rare in
85+
// practice.
86+
row: for (const sourceRow of cartesianProduct(...perFunctionResults)) {
87+
const byFunction = new Map<TableValuedFunction, SqliteRow>();
88+
for (const [fn, output] of sourceRow) {
89+
byFunction.set(fn, output);
90+
}
91+
92+
const input: PendingStatementEvaluation = { inputs, row: byFunction };
93+
94+
for (const filter of compiledFilters) {
95+
if (!sqliteBool(filter(input))) {
96+
continue row;
97+
}
98+
}
99+
rows.push(columns.map((c) => c(input)));
100+
}
101+
102+
return rows;
103+
}
104+
};
105+
}
106+
};
107+
}
108+
109+
interface PendingStatementEvaluation {
110+
inputs: SqliteValue[];
111+
row?: Map<TableValuedFunction, SqliteRow>;
112+
}
113+
114+
type ExpressionImplementation = (input: PendingStatementEvaluation) => SqliteValue;
115+
116+
interface KnownFunctions {
117+
named: Record<string, SqlFunction>;
118+
jsonExtractJson: SqlFunction; // -> operator
119+
jsonExtractSql: SqlFunction; // ->> operator
120+
}
121+
122+
class ExpressionToJavaScriptFunction
123+
implements ExpressionVisitor<number | TableValuedFunctionOutput, ExpressionImplementation, null>
124+
{
125+
constructor(readonly functions: KnownFunctions) {}
126+
127+
compile(expr: SqlExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
128+
return visitExpr(this, expr, null);
129+
}
130+
131+
visitExternalData(expr: ExternalData<number | TableValuedFunctionOutput>, arg: null): ExpressionImplementation {
132+
if (typeof expr.source === 'number') {
133+
const index = expr.source;
134+
// -1 because variables in SQLite are 1-indexed.
135+
return ({ inputs }) => inputs[index - 1];
136+
} else {
137+
const { column, function: fn } = expr.source;
138+
139+
return ({ row }) => {
140+
const result = row!.get(fn)!;
141+
return result[column];
142+
};
143+
}
144+
}
145+
146+
visitUnaryExpression(expr: UnaryExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
147+
const operand = this.compile(expr.operand);
148+
149+
switch (expr.operator) {
150+
case '+':
151+
return operand;
152+
case 'not':
153+
return (input) => sqliteNot(operand(input));
154+
// case '~':
155+
// case '-':
156+
// throw new Error(`unary operator not supported: ${expr.operator}`);
157+
}
158+
}
159+
160+
visitBinaryExpression(expr: BinaryExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
161+
const left = this.compile(expr.left);
162+
const right = this.compile(expr.right);
163+
const operator = expr.operator.toUpperCase();
164+
165+
return (input) => evaluateOperator(operator, left(input), right(input));
166+
}
167+
168+
visitBetweenExpression(expr: BetweenExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
169+
const low = this.compile(expr.low);
170+
const high = this.compile(expr.high);
171+
const value = this.compile(expr.value);
172+
173+
return (input) => {
174+
const evaluatedValue = value(input);
175+
176+
const geqLow = evaluateOperator('>=', evaluatedValue, low(input));
177+
const leqHigh = evaluateOperator('<=', evaluatedValue, high(input));
178+
if (geqLow == null || leqHigh == null) {
179+
return null;
180+
}
181+
182+
return sqliteBool(geqLow) && sqliteBool(leqHigh);
183+
};
184+
}
185+
186+
visitScalarInExpression(expr: ScalarInExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
187+
const target = this.compile(expr.target);
188+
const inQuery = expr.in.map((q) => this.compile(q));
189+
190+
return (input) => {
191+
const evaluatedTarget = target(input);
192+
if (evaluatedTarget == null) {
193+
return null;
194+
}
195+
196+
let hasNullQuery = false;
197+
for (const q of inQuery) {
198+
const evaluated = q(input);
199+
if (evaluated == null) {
200+
hasNullQuery = true;
201+
continue;
202+
}
203+
204+
if (compare(evaluatedTarget, evaluated) == 0) {
205+
return SQLITE_TRUE;
206+
}
207+
}
208+
209+
return hasNullQuery ? null : SQLITE_FALSE;
210+
};
211+
}
212+
213+
visitCaseWhenExpression(expr: CaseWhenExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
214+
const compiledWhens = expr.whens.map((w) => ({ when: this.compile(w.when), then: this.compile(w.then) }));
215+
const compiledElse = expr.else && this.compile(expr.else);
216+
217+
if (expr.operand) {
218+
const operand = this.compile(expr.operand);
219+
return (input) => {
220+
const evaluatedOperand = operand(input);
221+
222+
for (const { when, then } of compiledWhens) {
223+
if (evaluateOperator('=', evaluatedOperand, when(input))) {
224+
return then(input);
225+
}
226+
}
227+
228+
return compiledElse ? compiledElse(input) : null;
229+
};
230+
} else {
231+
return (input) => {
232+
for (const { when, then } of compiledWhens) {
233+
if (sqliteBool(when(input))) {
234+
return then(input);
235+
}
236+
}
237+
238+
return compiledElse ? compiledElse(input) : null;
239+
};
240+
}
241+
}
242+
243+
visitCastExpression(expr: CastExpression<number | TableValuedFunctionOutput>): ExpressionImplementation {
244+
const operand = this.compile(expr.operand);
245+
return (input) => {
246+
return cast(operand(input), expr.cast_as);
247+
};
248+
}
249+
250+
visitScalarFunctionCallExpression(
251+
expr: ScalarFunctionCallExpression<number | TableValuedFunctionOutput>
252+
): ExpressionImplementation {
253+
let fnImpl: SqlFunction;
254+
if (expr.function === '->') {
255+
fnImpl = this.functions.jsonExtractJson;
256+
} else if (expr.function === '->>') {
257+
fnImpl = this.functions.jsonExtractSql;
258+
} else {
259+
fnImpl = this.functions.named[expr.function];
260+
if (!fnImpl) {
261+
throw new Error(`Function not implemented: ${expr.function}`);
262+
}
263+
}
264+
265+
const args = expr.parameters.map((p) => this.compile(p));
266+
return (input) => {
267+
return fnImpl.call(...args.map((f) => f(input)));
268+
};
269+
}
270+
271+
visitLiteralExpression(expr: LiteralExpression): ExpressionImplementation {
272+
switch (expr.type) {
273+
case 'lit_null':
274+
return () => null;
275+
case 'lit_double':
276+
return () => expr.value;
277+
case 'lit_int':
278+
return () => BigInt(expr.base10);
279+
case 'lit_string':
280+
return () => expr.value;
281+
}
282+
}
283+
}

0 commit comments

Comments
 (0)