From e086339be1b12644c7ff1703c92c83fdb108359c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 13 Apr 2026 11:15:26 +0200 Subject: [PATCH] Fix retry loop due to single-subscription stream --- packages/powersync/CHANGELOG.md | 1 + .../database/native/native_powersync_database.dart | 2 +- .../powersync/lib/src/sync/connection_manager.dart | 12 +++++++++--- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/powersync/CHANGELOG.md b/packages/powersync/CHANGELOG.md index 4c4e4b42..d11200e6 100644 --- a/packages/powersync/CHANGELOG.md +++ b/packages/powersync/CHANGELOG.md @@ -25,6 +25,7 @@ - Deprecate re-exports of other packages (`package:powersync/sqlite_async.dart`, `package:powersync/sqlite3_common.dart`, `package:powersync/sqlite3.dart`). Instead, add a dependency on the respective package and import it directly. - Remove `powersync_sync.worker.js`. `powersync_db.worker.js` now covers both database access and sync. +- Fix sync isolate retry loop ([#397](https://github.com/powersync-ja/powersync.dart/issues/397)). ## 1.18.0 diff --git a/packages/powersync/lib/src/database/native/native_powersync_database.dart b/packages/powersync/lib/src/database/native/native_powersync_database.dart index eace6e08..699c390f 100644 --- a/packages/powersync/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync/lib/src/database/native/native_powersync_database.dart @@ -42,7 +42,7 @@ final class NativePowerSyncDatabase extends BasePowerSyncDatabase { required PowerSyncBackendConnector connector, required ResolvedSyncOptions options, required List initiallyActiveStreams, - required Stream> activeStreams, + required Stream> activeStreams, required AbortController abort, required Zone asyncWorkZone, }) async { diff --git a/packages/powersync/lib/src/sync/connection_manager.dart b/packages/powersync/lib/src/sync/connection_manager.dart index a016d530..0a107728 100644 --- a/packages/powersync/lib/src/sync/connection_manager.dart +++ b/packages/powersync/lib/src/sync/connection_manager.dart @@ -116,14 +116,18 @@ final class ConnectionManager { late void Function() retryHandler; - final subscriptionsChanged = StreamController(); - Future connectWithSyncLock() async { // Ensure there has not been a subsequent connect() call installing a new // sync client. assert(identical(_abortActiveSync, thisConnectAborter)); assert(!thisConnectAborter.aborted); + // This needs to be a single-subscription controller to ensure we won't + // miss any events between the current snapshot and when the + // implementation actually starts listening. That also means that we'll + // need a new controller per internal connect attempt. + final subscriptionsChanged = _subscriptionsChanged = StreamController(); + // ignore: invalid_use_of_protected_member await db.connectInternal( connector: connector, @@ -144,6 +148,9 @@ final class ConnectionManager { // If the sync encounters a failure without being aborted, retry retryHandler = Zone.current.bindCallback(() async { _activeGroup.syncConnectMutex.lock(() async { + _subscriptionsChanged?.close(); + _subscriptionsChanged = null; + // Is this still supposed to be active? (abort is only called within // mutex) if (!thisConnectAborter.aborted) { @@ -164,7 +171,6 @@ final class ConnectionManager { // Disconnect a previous sync client, if one is active. await _abortCurrentSync(); assert(_abortActiveSync == null); - _subscriptionsChanged = subscriptionsChanged; // Install the abort controller for this particular connect call, allowing // it to be disconnected.