Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/powersync/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Deprecate re-exports of other packages (`package:powersync/sqlite_async.dart`, `package:powersync/sqlite3_common.dart`,
`package:powersync/sqlite3.dart`). Instead, add a dependency on the respective package and import it directly.
- Remove `powersync_sync.worker.js`. `powersync_db.worker.js` now covers both database access and sync.
- Fix sync isolate retry loop ([#397](https://github.com/powersync-ja/powersync.dart/issues/397)).

## 1.18.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final class NativePowerSyncDatabase extends BasePowerSyncDatabase {
required PowerSyncBackendConnector connector,
required ResolvedSyncOptions options,
required List<SubscribedStream> initiallyActiveStreams,
required Stream<List<({String name, String parameters})>> activeStreams,
required Stream<List<SubscribedStream>> activeStreams,
required AbortController abort,
required Zone asyncWorkZone,
}) async {
Expand Down
12 changes: 9 additions & 3 deletions packages/powersync/lib/src/sync/connection_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,18 @@ final class ConnectionManager {

late void Function() retryHandler;

final subscriptionsChanged = StreamController<void>();

Future<void> connectWithSyncLock() async {
// Ensure there has not been a subsequent connect() call installing a new
// sync client.
assert(identical(_abortActiveSync, thisConnectAborter));
assert(!thisConnectAborter.aborted);

// This needs to be a single-subscription controller to ensure we won't
// miss any events between the current snapshot and when the
// implementation actually starts listening. That also means that we'll
// need a new controller per internal connect attempt.
final subscriptionsChanged = _subscriptionsChanged = StreamController();

// ignore: invalid_use_of_protected_member
await db.connectInternal(
connector: connector,
Expand All @@ -144,6 +148,9 @@ final class ConnectionManager {
// If the sync encounters a failure without being aborted, retry
retryHandler = Zone.current.bindCallback(() async {
_activeGroup.syncConnectMutex.lock(() async {
_subscriptionsChanged?.close();
_subscriptionsChanged = null;

// Is this still supposed to be active? (abort is only called within
// mutex)
if (!thisConnectAborter.aborted) {
Expand All @@ -164,7 +171,6 @@ final class ConnectionManager {
// Disconnect a previous sync client, if one is active.
await _abortCurrentSync();
assert(_abortActiveSync == null);
_subscriptionsChanged = subscriptionsChanged;

// Install the abort controller for this particular connect call, allowing
// it to be disconnected.
Expand Down