From 20b99d20112dc0b126491fedd0df0a80b8ed4e44 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 14 Aug 2025 17:55:48 +0200 Subject: [PATCH 1/5] Add nextCrudTransactions() stream --- .../lib/src/database/powersync_db_mixin.dart | 128 ++++++++++-------- packages/powersync_core/test/crud_test.dart | 33 +++++ 2 files changed, 106 insertions(+), 55 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index dc4b2ddb..a9ba96ff 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/sqlite3_common.dart'; @@ -508,23 +509,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { } final last = all[all.length - 1]; return CrudBatch( - crud: all, - haveMore: haveMore, - complete: ({String? writeCheckpoint}) async { - await writeTransaction((db) async { - await db - .execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); - if (writeCheckpoint != null && - await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) { - await db.execute( - 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'', - [writeCheckpoint]); - } else { - await db.execute( - 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\''); - } - }); - }); + crud: all, + haveMore: haveMore, + complete: _crudCompletionCallback(last.clientId), + ); } /// Get the next recorded transaction to upload. @@ -538,46 +526,76 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// /// Unlike [getCrudBatch], this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. - Future getNextCrudTransaction() async { - return await readTransaction((tx) async { - final first = await tx.getOptional( - 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1'); - if (first == null) { - return null; - } - final txId = first['tx_id'] as int?; - List all; - if (txId == null) { - all = [CrudEntry.fromRow(first)]; - } else { - final rows = await tx.getAll( - 'SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', - [txId]); - all = [for (var row in rows) CrudEntry.fromRow(row)]; + Future getNextCrudTransaction() { + return nextCrudTransactions().firstOrNull; + } + + /// Returns a stream of completed transactions with local writes against the + /// database. + /// + /// This is typically used from the [PowerSyncBackendConnector.uploadData] + /// method. Each entry emitted by the stream is a full transaction containing + /// all local writes made while that transaction was active. + /// + /// Unlike [getNextCrudTransaction], which awalys returns the oldest + /// transaction that hasn't been [CrudTransaction.complete]d yet, this stream + /// can be used to receive multiple transactions. Calling + /// [CrudTransaction.complete] will mark _all_ transactions emitted by the + /// stream until that point as completed. + /// + /// This can be used to upload multiple transactions in a single batch, e.g. + /// with:AbortController + /// + /// If there is no local data to upload, the stream emits a single `onDone` + /// event. + Stream nextCrudTransactions() async* { + var lastCrudItemId = -1; + const sql = ''' +WITH RECURSIVE crud_entries AS ( + SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?) + UNION ALL + SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud + INNER JOIN crud_entries ON crud_entries.id + 1 = rowid + WHERE crud_entries.tx_id = ps_crud.tx_id +) +SELECT * FROM crud_entries; +'''; + + while (true) { + final nextTransaction = await getAll(sql, [lastCrudItemId]); + if (nextTransaction.isEmpty) { + break; } - final last = all[all.length - 1]; - - return CrudTransaction( - transactionId: txId, - crud: all, - complete: ({String? writeCheckpoint}) async { - await writeTransaction((db) async { - await db.execute( - 'DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); - if (writeCheckpoint != null && - await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == - null) { - await db.execute( - 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'', - [writeCheckpoint]); - } else { - await db.execute( - 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\''); - } - }); - }); - }); + final items = [for (var row in nextTransaction) CrudEntry.fromRow(row)]; + final last = items.last; + final txId = last.transactionId; + + yield CrudTransaction( + crud: items, + complete: _crudCompletionCallback(last.clientId), + transactionId: txId, + ); + lastCrudItemId = last.clientId; + } + } + + Future Function({String? writeCheckpoint}) _crudCompletionCallback( + int lastClientId) { + return ({String? writeCheckpoint}) async { + await writeTransaction((db) async { + await db.execute('DELETE FROM ps_crud WHERE id <= ?', [lastClientId]); + if (writeCheckpoint != null && + await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) { + await db.execute( + 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'', + [writeCheckpoint]); + } else { + await db.execute( + 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\''); + } + }); + }; } /// Takes a read lock, without starting a transaction. diff --git a/packages/powersync_core/test/crud_test.dart b/packages/powersync_core/test/crud_test.dart index 4c18a4b2..f08ae022 100644 --- a/packages/powersync_core/test/crud_test.dart +++ b/packages/powersync_core/test/crud_test.dart @@ -271,6 +271,39 @@ void main() { expect(await powersync.getNextCrudTransaction(), equals(null)); }); + test('nextCrudTransactions', () async { + Future createTransaction(int size) { + return powersync.writeTransaction((tx) async { + for (var i = 0; i < size; i++) { + await tx.execute('INSERT INTO assets (id) VALUES (uuid())'); + } + }); + } + + await expectLater(powersync.nextCrudTransactions(), emitsDone); + + await createTransaction(5); + await createTransaction(10); + await createTransaction(15); + + CrudTransaction? lastTransaction; + final batch = []; + await for (final transaction in powersync.nextCrudTransactions()) { + batch.addAll(transaction.crud); + lastTransaction = transaction; + + if (batch.length > 10) { + break; + } + } + + expect(batch, hasLength(15)); + await lastTransaction!.complete(); + + final remainingTransaction = await powersync.getNextCrudTransaction(); + expect(remainingTransaction?.crud, hasLength(15)); + }); + test('include metadata', () async { await powersync.updateSchema(Schema([ Table( From 72c495100a5fce133a5e8dafc94adde2c1022094 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 14 Aug 2025 17:59:01 +0200 Subject: [PATCH 2/5] Add example to docs --- .../lib/src/database/powersync_db_mixin.dart | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index a9ba96ff..9a595c65 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -544,7 +544,26 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// stream until that point as completed. /// /// This can be used to upload multiple transactions in a single batch, e.g. - /// with:AbortController + /// with: + /// + /// ```dart + /// CrudTransaction? lastTransaction; + /// final batch = []; + /// + /// await for (final transaction in powersync.nextCrudTransactions()) { + /// batch.addAll(transaction.crud); + /// lastTransaction = transaction; + /// + /// if (batch.length > 100) { + /// break; + /// } + /// } + /// + /// if (batch.isNotEmpty) { + /// await uploadBatch(batch); + /// lastTransaction!.complete(); + /// } + /// ``` /// /// If there is no local data to upload, the stream emits a single `onDone` /// event. From 1e5b8bd31c616cc9b3d220ae6b5522248c9d6668 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 14 Aug 2025 23:10:56 +0200 Subject: [PATCH 3/5] Rename to getCrudTransactions --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 4 ++-- packages/powersync_core/test/crud_test.dart | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 9a595c65..78059962 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -527,7 +527,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Unlike [getCrudBatch], this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. Future getNextCrudTransaction() { - return nextCrudTransactions().firstOrNull; + return getCrudTransactions().firstOrNull; } /// Returns a stream of completed transactions with local writes against the @@ -567,7 +567,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// /// If there is no local data to upload, the stream emits a single `onDone` /// event. - Stream nextCrudTransactions() async* { + Stream getCrudTransactions() async* { var lastCrudItemId = -1; const sql = ''' WITH RECURSIVE crud_entries AS ( diff --git a/packages/powersync_core/test/crud_test.dart b/packages/powersync_core/test/crud_test.dart index f08ae022..0e188075 100644 --- a/packages/powersync_core/test/crud_test.dart +++ b/packages/powersync_core/test/crud_test.dart @@ -280,7 +280,7 @@ void main() { }); } - await expectLater(powersync.nextCrudTransactions(), emitsDone); + await expectLater(powersync.getCrudTransactions(), emitsDone); await createTransaction(5); await createTransaction(10); @@ -288,7 +288,7 @@ void main() { CrudTransaction? lastTransaction; final batch = []; - await for (final transaction in powersync.nextCrudTransactions()) { + await for (final transaction in powersync.getCrudTransactions()) { batch.addAll(transaction.crud); lastTransaction = transaction; From 3ff760c605929efe0678c92f3c2fcc624b12ba64 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 18 Aug 2025 11:58:48 +0200 Subject: [PATCH 4/5] Clarify docs --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 78059962..f8a36858 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -540,8 +540,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Unlike [getNextCrudTransaction], which awalys returns the oldest /// transaction that hasn't been [CrudTransaction.complete]d yet, this stream /// can be used to receive multiple transactions. Calling - /// [CrudTransaction.complete] will mark _all_ transactions emitted by the - /// stream until that point as completed. + /// [CrudTransaction.complete] will mark that transaction and all prior + /// transactions emitted by the stream as completed. /// /// This can be used to upload multiple transactions in a single batch, e.g. /// with: From e0cc79084b9aa8bc2f191727bc1606793fee6ae9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 18 Aug 2025 13:43:39 +0200 Subject: [PATCH 5/5] Update packages/powersync_core/lib/src/database/powersync_db_mixin.dart Co-authored-by: Ralf Kistner --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index f8a36858..a53b5049 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -537,7 +537,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// method. Each entry emitted by the stream is a full transaction containing /// all local writes made while that transaction was active. /// - /// Unlike [getNextCrudTransaction], which awalys returns the oldest + /// Unlike [getNextCrudTransaction], which always returns the oldest /// transaction that hasn't been [CrudTransaction.complete]d yet, this stream /// can be used to receive multiple transactions. Calling /// [CrudTransaction.complete] will mark that transaction and all prior