Skip to content

Commit 911b3fd

Browse files
committed
fix case sensitivity
1 parent d8f4d44 commit 911b3fd

2 files changed

Lines changed: 129 additions & 1 deletion

File tree

packages/core/src/streaming/query-stream-manager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ export class QueryStreamManager {
3232
* Notify all streams that depend on a table that data has changed
3333
*/
3434
notifyTableChanged(tableName: string): void {
35+
const tableNameLower = tableName.toLowerCase();
3536
for (const stream of this.streams.values()) {
3637
const dependencies = stream.getDependencies();
37-
if (dependencies.includes(tableName)) {
38+
if (dependencies.some(dep => dep.toLowerCase() === tableNameLower)) {
3839
stream.refresh();
3940
}
4041
}

packages/core/src/streaming/streaming.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,4 +413,131 @@ describe('Streaming Queries', () => {
413413
expect(emissions[1]).toHaveLength(1);
414414
expect(emissions[1][0].name).toBe('Bob');
415415
});
416+
417+
it('automatically pushes updates on bulkLoad operations', async () => {
418+
// Create stream first
419+
const stream = db.stream('users');
420+
421+
const emissions: any[][] = [];
422+
stream.subscribe(data => {
423+
emissions.push(data);
424+
});
425+
426+
// Wait for initial empty emission
427+
await new Promise(resolve => setTimeout(resolve, 50));
428+
expect(emissions).toHaveLength(1);
429+
expect(emissions[0]).toEqual([]);
430+
431+
// Bulk load data - should auto-notify stream
432+
await db.bulkLoad('users', [
433+
{ system_id: 'u1', id: 'u1', name: 'Alice', age: 30, system_version: '1-0', system_created_at: '1-0', system_is_local_origin: 0 },
434+
{ system_id: 'u2', id: 'u2', name: 'Bob', age: 25, system_version: '2-0', system_created_at: '2-0', system_is_local_origin: 0 },
435+
{ system_id: 'u3', id: 'u3', name: 'Charlie', age: 35, system_version: '3-0', system_created_at: '3-0', system_is_local_origin: 0 }
436+
]);
437+
438+
// Wait for automatic refresh
439+
await new Promise(resolve => setTimeout(resolve, 50));
440+
441+
// Should have automatically received update with bulk loaded data
442+
expect(emissions).toHaveLength(2);
443+
expect(emissions[1]).toHaveLength(3);
444+
expect(emissions[1][0].name).toBe('Alice');
445+
expect(emissions[1][1].name).toBe('Bob');
446+
expect(emissions[1][2].name).toBe('Charlie');
447+
});
448+
449+
it('automatically pushes updates on bulkLoad updates', async () => {
450+
// Insert initial data
451+
await db.insert('users', { id: 'u1', name: 'Alice', age: 30 });
452+
453+
// Create stream
454+
const stream = db.stream('users');
455+
456+
const emissions: any[][] = [];
457+
stream.subscribe(data => {
458+
emissions.push(data);
459+
});
460+
461+
// Wait for initial emission
462+
await new Promise(resolve => setTimeout(resolve, 50));
463+
expect(emissions).toHaveLength(1);
464+
expect(emissions[0]).toHaveLength(1);
465+
expect(emissions[0][0].name).toBe('Alice');
466+
467+
// Get the system_id and version from the inserted record
468+
const existing = await db.queryOne('users', { where: 'id = ?', whereArgs: ['u1'] });
469+
const systemId = existing!.system_id;
470+
const existingVersion = existing!.system_version;
471+
472+
// Create a newer HLC by using db.hlc
473+
const hlc = db.hlc.now();
474+
const newerVersion = db.hlc.constructor.toString(hlc);
475+
476+
// Bulk load with updated data - should auto-notify stream
477+
await db.bulkLoad('users', [
478+
{ system_id: systemId, id: 'u1', name: 'Alice Updated', age: 31, system_version: newerVersion, system_created_at: existingVersion, system_is_local_origin: 0 }
479+
]);
480+
481+
// Wait for automatic refresh
482+
await new Promise(resolve => setTimeout(resolve, 50));
483+
484+
// Should have automatically received update
485+
expect(emissions).toHaveLength(2);
486+
expect(emissions[1]).toHaveLength(1);
487+
expect(emissions[1][0].name).toBe('Alice Updated');
488+
expect(emissions[1][0].age).toBe(31);
489+
});
490+
491+
it('handles case-insensitive table names for stream notifications', async () => {
492+
// Create stream with lowercase table name
493+
const stream = db.stream('users');
494+
495+
const emissions: any[][] = [];
496+
stream.subscribe(data => {
497+
emissions.push(data);
498+
});
499+
500+
// Wait for initial empty emission
501+
await new Promise(resolve => setTimeout(resolve, 50));
502+
expect(emissions).toHaveLength(1);
503+
expect(emissions[0]).toEqual([]);
504+
505+
// Insert with UPPERCASE table name - should still notify lowercase stream
506+
await db.insert('USERS', { id: 'u1', name: 'Alice', age: 30 });
507+
508+
// Wait for automatic refresh
509+
await new Promise(resolve => setTimeout(resolve, 50));
510+
511+
// Should have automatically received update despite case mismatch
512+
expect(emissions).toHaveLength(2);
513+
expect(emissions[1]).toHaveLength(1);
514+
expect(emissions[1][0].name).toBe('Alice');
515+
});
516+
517+
it('handles mixed case in bulkLoad notifications', async () => {
518+
// Create stream with UPPERCASE
519+
const stream = db.stream('USERS');
520+
521+
const emissions: any[][] = [];
522+
stream.subscribe(data => {
523+
emissions.push(data);
524+
});
525+
526+
// Wait for initial empty emission
527+
await new Promise(resolve => setTimeout(resolve, 50));
528+
expect(emissions).toHaveLength(1);
529+
530+
// BulkLoad with lowercase - should notify UPPERCASE stream
531+
await db.bulkLoad('users', [
532+
{ system_id: 'u1', id: 'u1', name: 'Bob', age: 25, system_version: '1-0', system_created_at: '1-0', system_is_local_origin: 0 }
533+
]);
534+
535+
// Wait for automatic refresh
536+
await new Promise(resolve => setTimeout(resolve, 50));
537+
538+
// Should have received update
539+
expect(emissions).toHaveLength(2);
540+
expect(emissions[1]).toHaveLength(1);
541+
expect(emissions[1][0].name).toBe('Bob');
542+
});
416543
});

0 commit comments

Comments
 (0)