11import 'dart:async' ;
22
3+ import 'package:async/async.dart' ;
34import 'package:logging/logging.dart' ;
45import 'package:meta/meta.dart' ;
56import 'package:powersync_core/sqlite3_common.dart' ;
@@ -508,23 +509,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
508509 }
509510 final last = all[all.length - 1 ];
510511 return CrudBatch (
511- crud: all,
512- haveMore: haveMore,
513- complete: ({String ? writeCheckpoint}) async {
514- await writeTransaction ((db) async {
515- await db
516- .execute ('DELETE FROM ps_crud WHERE id <= ?' , [last.clientId]);
517- if (writeCheckpoint != null &&
518- await db.getOptional ('SELECT 1 FROM ps_crud LIMIT 1' ) == null ) {
519- await db.execute (
520- 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$ local\' ' ,
521- [writeCheckpoint]);
522- } else {
523- await db.execute (
524- 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$ local\' ' );
525- }
526- });
527- });
512+ crud: all,
513+ haveMore: haveMore,
514+ complete: _crudCompletionCallback (last.clientId),
515+ );
528516 }
529517
530518 /// Get the next recorded transaction to upload.
@@ -538,46 +526,76 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
538526 ///
539527 /// Unlike [getCrudBatch] , this only returns data from a single transaction at a time.
540528 /// All data for the transaction is loaded into memory.
541- Future <CrudTransaction ?> getNextCrudTransaction () async {
542- return await readTransaction ((tx) async {
543- final first = await tx.getOptional (
544- 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1' );
545- if (first == null ) {
546- return null ;
547- }
548- final txId = first['tx_id' ] as int ? ;
549- List <CrudEntry > all;
550- if (txId == null ) {
551- all = [CrudEntry .fromRow (first)];
552- } else {
553- final rows = await tx.getAll (
554- 'SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC' ,
555- [txId]);
556- all = [for (var row in rows) CrudEntry .fromRow (row)];
529+ Future <CrudTransaction ?> getNextCrudTransaction () {
530+ return nextCrudTransactions ().firstOrNull;
531+ }
532+
533+ /// Returns a stream of completed transactions with local writes against the
534+ /// database.
535+ ///
536+ /// This is typically used from the [PowerSyncBackendConnector.uploadData]
537+ /// method. Each entry emitted by the stream is a full transaction containing
538+ /// all local writes made while that transaction was active.
539+ ///
540+ /// Unlike [getNextCrudTransaction] , which awalys returns the oldest
541+ /// transaction that hasn't been [CrudTransaction.complete] d yet, this stream
542+ /// can be used to receive multiple transactions. Calling
543+ /// [CrudTransaction.complete] will mark _all_ transactions emitted by the
544+ /// stream until that point as completed.
545+ ///
546+ /// This can be used to upload multiple transactions in a single batch, e.g.
547+ /// with:AbortController
548+ ///
549+ /// If there is no local data to upload, the stream emits a single `onDone`
550+ /// event.
551+ Stream <CrudTransaction > nextCrudTransactions () async * {
552+ var lastCrudItemId = - 1 ;
553+ const sql = '''
554+ WITH RECURSIVE crud_entries AS (
555+ SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
556+ UNION ALL
557+ SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
558+ INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
559+ WHERE crud_entries.tx_id = ps_crud.tx_id
560+ )
561+ SELECT * FROM crud_entries;
562+ ''' ;
563+
564+ while (true ) {
565+ final nextTransaction = await getAll (sql, [lastCrudItemId]);
566+ if (nextTransaction.isEmpty) {
567+ break ;
557568 }
558569
559- final last = all[all.length - 1 ];
560-
561- return CrudTransaction (
562- transactionId: txId,
563- crud: all,
564- complete: ({String ? writeCheckpoint}) async {
565- await writeTransaction ((db) async {
566- await db.execute (
567- 'DELETE FROM ps_crud WHERE id <= ?' , [last.clientId]);
568- if (writeCheckpoint != null &&
569- await db.getOptional ('SELECT 1 FROM ps_crud LIMIT 1' ) ==
570- null ) {
571- await db.execute (
572- 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$ local\' ' ,
573- [writeCheckpoint]);
574- } else {
575- await db.execute (
576- 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$ local\' ' );
577- }
578- });
579- });
580- });
570+ final items = [for (var row in nextTransaction) CrudEntry .fromRow (row)];
571+ final last = items.last;
572+ final txId = last.transactionId;
573+
574+ yield CrudTransaction (
575+ crud: items,
576+ complete: _crudCompletionCallback (last.clientId),
577+ transactionId: txId,
578+ );
579+ lastCrudItemId = last.clientId;
580+ }
581+ }
582+
583+ Future <void > Function ({String ? writeCheckpoint}) _crudCompletionCallback (
584+ int lastClientId) {
585+ return ({String ? writeCheckpoint}) async {
586+ await writeTransaction ((db) async {
587+ await db.execute ('DELETE FROM ps_crud WHERE id <= ?' , [lastClientId]);
588+ if (writeCheckpoint != null &&
589+ await db.getOptional ('SELECT 1 FROM ps_crud LIMIT 1' ) == null ) {
590+ await db.execute (
591+ 'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$ local\' ' ,
592+ [writeCheckpoint]);
593+ } else {
594+ await db.execute (
595+ 'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$ local\' ' );
596+ }
597+ });
598+ };
581599 }
582600
583601 /// Takes a read lock, without starting a transaction.
0 commit comments