@@ -9,6 +9,7 @@ import 'package:sqlite_async/src/update_notification.dart';
99
1010import 'common/connection/sync_sqlite_connection.dart' ;
1111import 'common/mutex.dart' ;
12+ import 'utils/shared_utils.dart' ;
1213
1314/// Abstract class representing calls available in a read-only or read-write context.
1415abstract interface class SqliteReadContext {
@@ -98,11 +99,35 @@ abstract interface class SqliteWriteContext extends SqliteReadContext {
9899 Future <T > Function (SqliteWriteContext tx) callback);
99100}
100101
102+ /// Interface representing a SQLite connection or pool from which inner
103+ /// connection contexts can be obtained.
104+ abstract interface class SqliteLocks {
105+ /// Takes a read lock, without starting a transaction.
106+ ///
107+ /// The lock only applies to a single [SqliteConnection] , and multiple
108+ /// connections may hold read locks at the same time.
109+ ///
110+ /// In most cases, [SqliteConnection.readTransaction] should be used instead.
111+ Future <T > readLock <T >(Future <T > Function (SqliteReadContext tx) callback,
112+ {Duration ? lockTimeout, String ? debugContext});
113+
114+ /// Takes a global lock, without starting a transaction.
115+ ///
116+ /// In most cases, [SqliteConnection.writeTransaction] should be used instead.
117+ ///
118+ /// The lock applies to all [SqliteConnection] instances for a [SqliteDatabase] .
119+ /// Locks for separate [SqliteDatabase] instances on the same database file
120+ /// may be held concurrently.
121+ Future <T > writeLock <T >(Future <T > Function (SqliteWriteContext tx) callback,
122+ {Duration ? lockTimeout, String ? debugContext});
123+ }
124+
101125/// Abstract class representing a connection to the SQLite database.
102126///
103127/// This package typically pools multiple [SqliteConnection] instances into a
104128/// managed [SqliteDatabase] automatically.
105- abstract interface class SqliteConnection extends SqliteWriteContext {
129+ abstract base class SqliteConnection
130+ implements SqliteWriteContext , SqliteLocks {
106131 /// Default constructor for subclasses.
107132 SqliteConnection ();
108133
@@ -124,7 +149,7 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
124149 }
125150
126151 /// Reports table change update notifications
127- Stream <UpdateNotification >? get updates;
152+ Stream <UpdateNotification > get updates;
128153
129154 /// Open a read-only transaction.
130155 ///
@@ -133,7 +158,11 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
133158 /// instance will error.
134159 Future <T > readTransaction <T >(
135160 Future <T > Function (SqliteReadContext tx) callback,
136- {Duration ? lockTimeout});
161+ {Duration ? lockTimeout}) {
162+ return readLock ((ctx) async {
163+ return await internalReadTransaction (ctx, callback);
164+ }, lockTimeout: lockTimeout, debugContext: 'readTransaction()' );
165+ }
137166
138167 /// Open a read-write transaction.
139168 ///
@@ -147,43 +176,138 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
147176 @override
148177 Future <T > writeTransaction <T >(
149178 Future <T > Function (SqliteWriteContext tx) callback,
150- {Duration ? lockTimeout});
179+ {Duration ? lockTimeout}) {
180+ return writeLock ((ctx) async {
181+ return ctx.writeTransaction (callback);
182+ }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()' );
183+ }
151184
152- /// Execute a read query every time the source tables are modified .
185+ /// Create a Stream of changes to any of the specified tables .
153186 ///
154- /// Use [throttle] to specify the minimum interval between queries.
155- ///
156- /// Source tables are automatically detected using `EXPLAIN QUERY PLAN` .
157- Stream <sqlite.ResultSet > watch (String sql,
158- {List <Object ?> parameters = const [],
159- Duration throttle = const Duration (milliseconds: 30 )});
160-
161- /// Takes a read lock, without starting a transaction.
187+ /// This is preferred over [watch] when multiple queries need to be performed
188+ /// together when data is changed, e.g. like this:
162189 ///
163- /// The lock only applies to a single [SqliteConnection] , and multiple
164- /// connections may hold read locks at the same time.
190+ /// ```dart
191+ /// var subscription = db.onChange({'users', 'groups'}).asyncMap((event) async {
192+ /// await db.readTransaction((tx) async {
193+ /// var data = await tx.getAll('SELECT * ROM users');
194+ /// var moreData = await tx.getAll('SELECT * ROM groups');
165195 ///
166- /// In most cases, [readTransaction] should be used instead.
167- Future <T > readLock <T >(Future <T > Function (SqliteReadContext tx) callback,
168- {Duration ? lockTimeout, String ? debugContext});
196+ /// // Handle data here...
197+ /// });
198+ /// });
199+ /// ```
200+ Stream <UpdateNotification > onChange (Iterable <String >? tables,
201+ {Duration throttle = const Duration (milliseconds: 30 ),
202+ bool triggerImmediately = true }) {
203+ final filteredStream = tables != null
204+ ? updates.transform (UpdateNotification .filterTablesTransformer (tables))
205+ : updates;
206+ final throttledStream = UpdateNotification .throttleStream (
207+ filteredStream, throttle,
208+ addOne: triggerImmediately ? UpdateNotification .empty () : null );
209+ return throttledStream;
210+ }
169211
170- /// Takes a global lock, without starting a transaction .
212+ /// Execute a read query every time the source tables are modified .
171213 ///
172- /// In most cases, [writeTransaction] should be used instead .
214+ /// Use [throttle] to specify the minimum interval between queries .
173215 ///
174- /// The lock applies to all [SqliteConnection] instances for a [SqliteDatabase] .
175- /// Locks for separate [SqliteDatabase] instances on the same database file
176- /// may be held concurrently.
177- Future <T > writeLock <T >(Future <T > Function (SqliteWriteContext tx) callback,
178- {Duration ? lockTimeout, String ? debugContext});
216+ /// Source tables are automatically detected using `EXPLAIN QUERY PLAN` .
217+ Stream <sqlite.ResultSet > watch (
218+ String sql, {
219+ List <Object ?> parameters = const [],
220+ Duration throttle = const Duration (milliseconds: 30 ),
221+ Iterable <String >? triggerOnTables,
222+ }) {
223+ Stream <sqlite.ResultSet > watchInner (Iterable <String > trigger) {
224+ return onChange (
225+ trigger,
226+ throttle: throttle,
227+ triggerImmediately: true ,
228+ ).asyncMap ((_) => getAll (sql, parameters));
229+ }
230+
231+ if (triggerOnTables case final knownTrigger? ) {
232+ return watchInner (knownTrigger);
233+ } else {
234+ return Stream .fromFuture (getSourceTables (this , sql, parameters))
235+ .asyncExpand (watchInner);
236+ }
237+ }
179238
180239 Future <void > close ();
181240
182241 /// Ensures that all connections are aware of the latest schema changes applied (if any).
183242 /// Queries and watch calls can potentially use outdated schema information after a schema update.
184- Future <void > refreshSchema ();
243+ Future <void > refreshSchema () {
244+ return getAll ("PRAGMA table_info('sqlite_master')" );
245+ }
185246
186247 /// Returns true if the connection is closed
187248 @override
188249 bool get closed;
250+
251+ @override
252+ Future <sqlite.ResultSet > execute (String sql,
253+ [List <Object ?> parameters = const []]) async {
254+ return writeLock ((ctx) async {
255+ return ctx.execute (sql, parameters);
256+ }, debugContext: 'execute()' );
257+ }
258+
259+ @override
260+ Future <sqlite.ResultSet > getAll (String sql,
261+ [List <Object ?> parameters = const []]) {
262+ return readLock ((ctx) async {
263+ return ctx.getAll (sql, parameters);
264+ }, debugContext: 'getAll()' );
265+ }
266+
267+ @override
268+ Future <sqlite.Row > get (String sql, [List <Object ?> parameters = const []]) {
269+ return readLock ((ctx) async {
270+ return ctx.get (sql, parameters);
271+ }, debugContext: 'get()' );
272+ }
273+
274+ @override
275+ Future <sqlite.Row ?> getOptional (String sql,
276+ [List <Object ?> parameters = const []]) {
277+ return readLock ((ctx) async {
278+ return ctx.getOptional (sql, parameters);
279+ }, debugContext: 'getOptional()' );
280+ }
281+
282+ /// See [SqliteReadContext.computeWithDatabase] .
283+ ///
284+ /// When called here directly on the connection, the call is wrapped in a
285+ /// write transaction.
286+ @override
287+ Future <T > computeWithDatabase <T >(
288+ Future <T > Function (sqlite.CommonDatabase db) compute) {
289+ return writeTransaction ((tx) async {
290+ return tx.computeWithDatabase (compute);
291+ });
292+ }
293+
294+ /// Execute a write query (INSERT, UPDATE, DELETE) multiple times with each
295+ /// parameter set. This is more faster than executing separately with each
296+ /// parameter set.
297+ ///
298+ /// When called here directly on the connection, the batch is wrapped in a
299+ /// write transaction.
300+ @override
301+ Future <void > executeBatch (String sql, List <List <Object ?>> parameterSets) {
302+ return writeTransaction ((tx) async {
303+ return tx.executeBatch (sql, parameterSets);
304+ });
305+ }
306+
307+ @override
308+ Future <void > executeMultiple (String sql) {
309+ return writeTransaction ((tx) async {
310+ return tx.executeMultiple (sql);
311+ });
312+ }
189313}
0 commit comments