Skip to content

Commit 3522910

Browse files
committed
Evaluate queriers
1 parent ab4626f commit 3522910

7 files changed

Lines changed: 701 additions & 20 deletions

File tree

packages/sync-rules/src/sync_plan/evaluator.ts

Lines changed: 110 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1-
import { UnscopedParameterLookup } from '../BucketParameterQuerier.js';
2-
import { BucketDataSource, ParameterIndexLookupCreator } from '../BucketSource.js';
1+
import { BucketInclusionReason } from '../BucketDescription.js';
2+
import { PendingQueriers, UnscopedParameterLookup } from '../BucketParameterQuerier.js';
3+
import {
4+
BucketDataSource,
5+
BucketSource,
6+
BucketSourceType,
7+
CreateSourceParams,
8+
HydratedBucketSource,
9+
ParameterIndexLookupCreator
10+
} from '../BucketSource.js';
311
import { ColumnDefinition } from '../ExpressionType.js';
412
import { ParameterLookupScope } from '../HydrationState.js';
513
import { SourceTableInterface } from '../SourceTableInterface.js';
6-
import { SqlSyncRules } from '../SqlSyncRules.js';
14+
import { GetQuerierOptions, RequestedStream, SqlSyncRules } from '../SqlSyncRules.js';
715
import { TablePattern } from '../TablePattern.js';
816
import {
917
EvaluateRowOptions,
1018
idFromData,
19+
RequestParameters,
1120
SourceSchema,
1221
SqliteJsonRow,
1322
SqliteJsonValue,
@@ -17,8 +26,10 @@ import {
1726
UnscopedEvaluationResult
1827
} from '../types.js';
1928
import { filterJsonRow, isJsonValue, JSONBucketNameSerialize } from '../utils.js';
29+
import { isValidParameterValue, isValidParameterValueRow, RequestParameterEvaluators } from './lookup_stages.js';
2030
import * as plan from './plan.js';
21-
import { prepareRowEvaluator, RowEvaluator, SqlEngine } from './sql_engine.js';
31+
import { PreparedQuerier, StreamInput } from './querier_impl.js';
32+
import { prepareRowEvaluator, RowEvaluator, SqlBuilder, SqlEngine } from './sql_engine.js';
2233

2334
export interface StreamEvaluationContext {
2435
engine: SqlEngine;
@@ -43,6 +54,15 @@ export function addPrecompiledSyncPlanToRules(
4354
preparedLookups.set(parameter, prepared);
4455
rules.bucketParameterLookupSources.push(prepared);
4556
}
57+
58+
const streamInput: StreamInput = {
59+
...context,
60+
preparedBuckets,
61+
preparedLookups
62+
};
63+
for (const stream of plan.streams) {
64+
rules.bucketSources.push(new StreamBucketSource(stream, streamInput));
65+
}
4666
}
4767

4868
class PreparedStreamDataSource {
@@ -85,9 +105,7 @@ class PreparedStreamDataSource {
85105
const id = idFromData(record);
86106

87107
for (const bucketParameter of source.partitionValues) {
88-
if (bucketParameter == null || !isJsonValue(bucketParameter)) {
89-
// All parameters are compared via the equals operator, and null is not equal to anything. Also, we can only
90-
// persist JSON values
108+
if (!isValidParameterValue(bucketParameter)) {
91109
continue row;
92110
}
93111
}
@@ -195,23 +213,18 @@ class PreparedParameterIndexLookupCreator implements ParameterIndexLookupCreator
195213
}
196214

197215
try {
198-
row: for (const outputRow of this.evaluator.evaluate(row)) {
216+
for (const outputRow of this.evaluator.evaluate(row)) {
217+
if (!isValidParameterValueRow(outputRow.outputs) || !isValidParameterValueRow(outputRow.partitionValues)) {
218+
continue;
219+
}
220+
199221
const bucketParameters: Record<string, SqliteJsonValue> = {};
200222
for (let i = 0; i < outputRow.outputs.length; i++) {
201223
const value = outputRow.outputs[i];
202-
if (value == null || !isJsonValue(value)) {
203-
continue row;
204-
}
205-
206224
bucketParameters[i.toString()] = value;
207225
}
208226

209-
for (const parameter of outputRow.partitionValues) {
210-
if (parameter == null || !isJsonValue(parameter)) {
211-
continue row;
212-
}
213-
}
214-
const lookup = UnscopedParameterLookup.normalized(outputRow.partitionValues as SqliteJsonValue[]);
227+
const lookup = UnscopedParameterLookup.normalized(outputRow.partitionValues);
215228
results.push({ lookup, bucketParameters: [bucketParameters] });
216229
}
217230
} catch (e) {
@@ -225,3 +238,82 @@ class PreparedParameterIndexLookupCreator implements ParameterIndexLookupCreator
225238
return this.source.sourceTable.matches(table);
226239
}
227240
}
241+
242+
class StreamBucketSource implements BucketSource {
243+
readonly dataSources: BucketDataSource[] = [];
244+
readonly parameterIndexLookupCreators: ParameterIndexLookupCreator[] = [];
245+
246+
constructor(
247+
readonly stream: plan.CompiledSyncStream,
248+
private readonly input: StreamInput
249+
) {
250+
for (const querier of stream.queriers) {
251+
const mappedSource = input.preparedBuckets.get(querier.bucket)!;
252+
this.dataSources.push(mappedSource);
253+
}
254+
}
255+
256+
get name(): string {
257+
return this.stream.stream.name;
258+
}
259+
260+
get subscribedToByDefault(): boolean {
261+
return this.stream.stream.isSubscribedByDefault;
262+
}
263+
264+
get type(): BucketSourceType {
265+
return BucketSourceType.SYNC_STREAM;
266+
}
267+
268+
debugRepresentation() {
269+
throw new Error('debugRepresentation not implemented.');
270+
}
271+
272+
hydrate(params: CreateSourceParams): HydratedBucketSource {
273+
const queriers = this.stream.queriers.map((q) => new PreparedQuerier(this.stream.stream, q, this.input));
274+
275+
return {
276+
definition: this,
277+
pushBucketParameterQueriers: (result, options) => {
278+
const subscriptions = options.streams[this.name] ?? [];
279+
if (!this.subscribedToByDefault && !subscriptions.length) {
280+
// The client is not subscribing to this stream, so don't query buckets related to it.
281+
return;
282+
}
283+
284+
let hasExplicitDefaultSubscription = false;
285+
const activeQueriers: { querier: PreparedQuerier; partialInstantiation: RequestParameterEvaluators }[] = [];
286+
for (const querier of queriers) {
287+
const partialInstantiation = querier.partialInstantiationForGlobalRequestdata(result, options);
288+
if (partialInstantiation == null) {
289+
// Nothing in this request can fullfil the querier, continue
290+
continue;
291+
}
292+
293+
activeQueriers.push({ querier, partialInstantiation });
294+
}
295+
296+
for (const subscription of subscriptions) {
297+
let subscriptionParams = options.globalParameters;
298+
if (subscription.parameters != null) {
299+
subscriptionParams = subscriptionParams.withAddedStreamParameters(subscription.parameters);
300+
} else {
301+
hasExplicitDefaultSubscription = true;
302+
}
303+
304+
for (const { querier, partialInstantiation } of activeQueriers) {
305+
querier.querierForSubscription(params, result, subscriptionParams, subscription, partialInstantiation);
306+
}
307+
}
308+
309+
// If the stream is subscribed to by default and there is no explicit subscription that would match the default
310+
// subscription, also include the default querier.
311+
if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) {
312+
for (const { querier, partialInstantiation } of activeQueriers) {
313+
querier.querierForSubscription(params, result, options.globalParameters, null, partialInstantiation);
314+
}
315+
}
316+
}
317+
};
318+
}
319+
}

0 commit comments

Comments
 (0)