diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index b7ef76b..7df4ac8 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -286,12 +286,22 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, Object? txError; void maybeFireUpdates() { - if (updatedTables.isNotEmpty) { + // We keep buffering the set of updated tables until we are not + // in a transaction. Firing transactions inside a transaction + // has multiple issues: + // 1. Watched queries would detect changes to the underlying tables, + // but the data would not be visible to queries yet. + // 2. It would trigger many more notifications than required. + // + // This still includes updates for transactions that are rolled back. + // We could handle those better at a later stage. + + if (updatedTables.isNotEmpty && db.autocommit) { client.fire(UpdateNotification(updatedTables)); updatedTables.clear(); - updateDebouncer?.cancel(); - updateDebouncer = null; } + updateDebouncer?.cancel(); + updateDebouncer = null; } db.updates.listen((event) { @@ -301,11 +311,12 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). // 2. Long-running _SqliteIsolateClosure that should fire updates while running. updateDebouncer ??= - Timer(const Duration(milliseconds: 10), maybeFireUpdates); + Timer(const Duration(milliseconds: 1), maybeFireUpdates); }); server.open((data) async { if (data is _SqliteIsolateClose) { + // This is a transaction close message if (txId != null) { if (!db.autocommit) { db.execute('ROLLBACK'); diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart new file mode 100644 index 0000000..ea73bd6 --- /dev/null +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -0,0 +1,175 @@ +import 'dart:async'; + +import 'package:sqlite_async/sqlite3_common.dart'; + +/// Wrap a CommonDatabase to throttle its updates stream. +/// This is so that we can throttle the updates _within_ +/// the worker process, avoiding mass notifications over +/// the MessagePort. +class ThrottledCommonDatabase extends CommonDatabase { + final CommonDatabase _db; + final StreamController _transactionController = + StreamController.broadcast(); + + ThrottledCommonDatabase(this._db); + + @override + int get userVersion => _db.userVersion; + + @override + set userVersion(int userVersion) { + _db.userVersion = userVersion; + } + + @override + bool get autocommit => _db.autocommit; + + @override + DatabaseConfig get config => _db.config; + + @override + void createAggregateFunction( + {required String functionName, + required AggregateFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createAggregateFunction(functionName: functionName, function: function); + } + + @override + void createCollation( + {required String name, required CollatingFunction function}) { + _db.createCollation(name: name, function: function); + } + + @override + void createFunction( + {required String functionName, + required ScalarFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createFunction(functionName: functionName, function: function); + } + + @override + void dispose() { + _db.dispose(); + } + + @override + void execute(String sql, [List parameters = const []]) { + _db.execute(sql, parameters); + } + + @override + int getUpdatedRows() { + // ignore: deprecated_member_use + return _db.getUpdatedRows(); + } + + @override + int get lastInsertRowId => _db.lastInsertRowId; + + @override + CommonPreparedStatement prepare(String sql, + {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { + return _db.prepare(sql, + persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); + } + + @override + List prepareMultiple(String sql, + {bool persistent = false, bool vtab = true}) { + return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); + } + + @override + ResultSet select(String sql, [List parameters = const []]) { + bool preAutocommit = _db.autocommit; + final result = _db.select(sql, parameters); + bool postAutocommit = _db.autocommit; + if (!preAutocommit && postAutocommit) { + _transactionController.add(true); + } + return result; + } + + @override + int get updatedRows => _db.updatedRows; + + @override + Stream get updates { + return throttledUpdates(_db, _transactionController.stream); + } +} + +/// This throttles the database update stream to: +/// 1. Trigger max once every 1ms. +/// 2. Only trigger _after_ transactions. +Stream throttledUpdates( + CommonDatabase source, Stream transactionStream) { + StreamController? controller; + Set pendingUpdates = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!source.autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (pendingUpdates.isNotEmpty) { + for (var update in pendingUpdates) { + controller!.add(update); + } + + pendingUpdates.clear(); + } + } + + void collectUpdate(SqliteUpdate event) { + // We merge updates with the same kind and tableName. + // rowId is never used in sqlite_async. + pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = transactionStream.listen((event) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; +} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index b4657dd..1d8fb5c 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -4,6 +4,7 @@ import 'dart:js_util' as js_util; import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'throttled_common_database.dart'; import '../protocol.dart'; @@ -18,7 +19,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - return AsyncSqliteDatabase(database: db); + final throttled = ThrottledCommonDatabase(db); + + return AsyncSqliteDatabase(database: throttled); } @override diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index e0ef765..08a80cb 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -253,5 +253,53 @@ void main() { done = true; } }); + + test('watch with transaction', () async { + final db = await testUtils.setupDatabase(path: path); + await createTables(db); + + const baseTime = 10; + + const throttleDuration = Duration(milliseconds: baseTime); + // delay must be bigger than throttleDuration, and bigger + // than any internal throttles. + const delay = Duration(milliseconds: baseTime * 3); + + final stream = db.watch('SELECT count() AS count FROM assets', + throttle: throttleDuration); + + List counts = []; + + final subscription = stream.listen((e) { + counts.add(e.first['count']); + }); + await Future.delayed(delay); + + await db.writeTransaction((tx) async { + await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test1']); + await Future.delayed(delay); + await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test2']); + await Future.delayed(delay); + }); + await Future.delayed(delay); + + subscription.cancel(); + + expect( + counts, + equals([ + // one event when starting the subscription + 0, + // one event after the transaction + 2 + ])); + + // Other observed results (failure scenarios): + // [0, 0, 0]: The watch is triggered during the transaction + // and executes concurrently with the transaction. + // [0, 2, 2]: The watch is triggered during the transaction, + // but executes after the transaction (single connection). + // [0]: No updates triggered. + }); }); }