diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 08401374..1f8091b3 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -586,6 +586,7 @@ typedef BucketDescription = ({ final class _ActiveRustStreamingIteration { final StreamingSyncImplementation sync; var _isActive = true; + var _hadSyncLine = false; StreamSubscription? _completedUploads; final Completer _completedStream = Completer(); @@ -621,8 +622,10 @@ final class _ActiveRustStreamingIteration { switch (event) { case ReceivedLine(line: final Uint8List line): + _triggerCrudUploadOnFirstLine(); await _control('line_binary', line); case ReceivedLine(line: final line as String): + _triggerCrudUploadOnFirstLine(); await _control('line_text', line); case UploadCompleted(): await _control('completed_upload'); @@ -634,6 +637,17 @@ final class _ActiveRustStreamingIteration { } } + /// Triggers a local CRUD upload when the first sync line has been received. + /// + /// This allows uploading local changes that have been made while offline or + /// disconnected. + void _triggerCrudUploadOnFirstLine() { + if (!_hadSyncLine) { + sync._internalCrudTriggerController.add(null); + _hadSyncLine = true; + } + } + Future _stop() { return _control('stop'); } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 8bce8130..b6432014 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -104,8 +104,8 @@ void _declareTests(String name, SyncOptions options) { addTearDown(status.cancel); syncService.addKeepAlive(); - await expectLater( - status, emits(isSyncStatus(connected: true, hasSynced: false))); + await expectLater(status, + emitsThrough(isSyncStatus(connected: true, hasSynced: false))); return status; } @@ -774,6 +774,58 @@ void _declareTests(String name, SyncOptions options) { await Future.delayed(const Duration(milliseconds: 500)); expect(syncService.controller.hasListener, isTrue); }); + + test('uploads writes made while offline', () async { + // Local write while not connected + await database.execute( + 'insert into customers (id, name) values (uuid(), ?)', + ['local customer']); + uploadData = (db) async { + final batch = await db.getNextCrudTransaction(); + if (batch != null) { + await batch.complete(); + } + }; + syncService.writeCheckpoint = () => { + 'data': {'write_checkpoint': '1'} + }; + + final query = StreamQueue(database + .watch('SELECT name FROM customers') + .map((e) => e.single['name'])); + expect(await query.next, 'local customer'); + + await waitForConnection(); + + syncService + ..addLine({ + 'checkpoint': Checkpoint( + lastOpId: '1', + writeCheckpoint: '1', + checksums: [BucketChecksum(bucket: 'a', priority: 3, checksum: 0)], + ) + }) + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'customers', + 'object_id': '1', + 'checksum': 0, + 'data': json.encode({'name': 'from server'}), + } + ], + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '1'} + }); + + expect(await query.next, 'from server'); + }); }); }