diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/streaming_sync.dart index 716b38bf..4153ac33 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/streaming_sync.dart @@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync { bool _safeToClose = true; final Mutex syncMutex, crudMutex; + Completer? _activeCrudUpload; final Map _userAgentHeaders; @@ -135,7 +136,7 @@ class StreamingSyncImplementation implements StreamingSync { try { _abort = AbortController(); clientId = await adapter.getClientId(); - crudLoop(); + _crudLoop(); var invalidCredentials = false; while (!aborted) { _updateStatus(connecting: true); @@ -176,8 +177,8 @@ class StreamingSyncImplementation implements StreamingSync { } } - Future crudLoop() async { - await uploadAllCrud(); + Future _crudLoop() async { + await _uploadAllCrud(); // Trigger a CRUD upload whenever the upstream trigger fires // as-well-as whenever the sync stream reconnects. @@ -187,11 +188,13 @@ class StreamingSyncImplementation implements StreamingSync { // The stream here is closed on abort. await for (var _ in mergeStreams( [crudUpdateTriggerStream, _internalCrudTriggerController.stream])) { - await uploadAllCrud(); + await _uploadAllCrud(); } } - Future uploadAllCrud() async { + Future _uploadAllCrud() { + assert(_activeCrudUpload == null); + final completer = _activeCrudUpload = Completer(); return crudMutex.lock(() async { // Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration. CrudEntry? checkedCrudItem; @@ -244,7 +247,11 @@ class StreamingSyncImplementation implements StreamingSync { _updateStatus(uploading: false); } } - }, timeout: retryDelay); + }, timeout: retryDelay).whenComplete(() { + assert(identical(_activeCrudUpload, completer)); + _activeCrudUpload = null; + completer.complete(); + }); } Future getWriteCheckpoint() async { @@ -336,7 +343,7 @@ class StreamingSyncImplementation implements StreamingSync { return (initialRequests, localDescriptions); } - Future streamingSyncIteration( + Future streamingSyncIteration( {AbortController? abortController}) async { adapter.startSession(); @@ -379,40 +386,15 @@ class StreamingSyncImplementation implements StreamingSync { await adapter.removeBuckets([...bucketsToDelete]); _updateStatus(downloading: true); case StreamingSyncCheckpointComplete(): - final result = await adapter.syncLocalDatabase(targetCheckpoint!); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - // await new Promise((resolve) => setTimeout(resolve, 50)); - return false; - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } else { - appliedCheckpoint = targetCheckpoint; - - final now = DateTime.now(); - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: now, - priorityStatusEntries: [ - if (appliedCheckpoint.checksums.isNotEmpty) - ( - hasSynced: true, - lastSyncedAt: now, - priority: maxBy( - appliedCheckpoint.checksums - .map((cs) => BucketPriority(cs.priority)), - (priority) => priority, - compare: BucketPriority.comparator, - )!, - ) - ], - ); + final result = + await _applyCheckpoint(targetCheckpoint!, abortController); + if (result.abort) { + return; } - validatedCheckpoint = targetCheckpoint; + if (result.didApply) { + appliedCheckpoint = targetCheckpoint; + } case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority): final result = await adapter.syncLocalDatabase(targetCheckpoint!, forPriority: bucketPriority); @@ -420,10 +402,11 @@ class StreamingSyncImplementation implements StreamingSync { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off // await new Promise((resolve) => setTimeout(resolve, 50)); - return false; + return; } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. + // If we have pending uploads, we can't complete new checkpoints + // outside of priority 0. We'll resolve this for a complete + // checkpoint later. } else { _updateStatusForPriority(( priority: BucketPriority(bucketPriority), @@ -494,22 +477,13 @@ class StreamingSyncImplementation implements StreamingSync { downloadError: _noError, lastSyncedAt: DateTime.now()); } else if (validatedCheckpoint == targetCheckpoint) { - final result = await adapter.syncLocalDatabase(targetCheckpoint!); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - // await new Promise((resolve) => setTimeout(resolve, 50)); - return false; - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } else { + final result = + await _applyCheckpoint(targetCheckpoint!, abortController); + if (result.abort) { + return; + } + if (result.didApply) { appliedCheckpoint = targetCheckpoint; - - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); } } } @@ -519,7 +493,65 @@ class StreamingSyncImplementation implements StreamingSync { break; } } - return true; + } + + Future<({bool abort, bool didApply})> _applyCheckpoint( + Checkpoint targetCheckpoint, AbortController? abortController) async { + var result = await adapter.syncLocalDatabase(targetCheckpoint); + final pendingUpload = _activeCrudUpload; + + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + return const (abort: true, didApply: false); + } else if (!result.ready && pendingUpload != null) { + // We have pending entries in the local upload queue or are waiting to + // confirm a write checkpoint, which prevented this checkpoint from + // applying. Wait for that to complete and try again. + isolateLogger.fine('Could not apply checkpoint due to local data. ' + 'Waiting for in-progress upload before retrying...'); + await Future.any([ + pendingUpload.future, + if (abortController case final controller?) controller.onAbort, + ]); + + if (abortController?.aborted == true) { + return const (abort: true, didApply: false); + } + + // Try again now that uploads have completed. + result = await adapter.syncLocalDatabase(targetCheckpoint); + } + + if (result.checkpointValid && result.ready) { + isolateLogger.fine('validated checkpoint: $targetCheckpoint'); + final now = DateTime.now(); + _updateStatus( + downloading: false, + downloadError: _noError, + lastSyncedAt: now, + priorityStatusEntries: [ + if (targetCheckpoint.checksums.isNotEmpty) + ( + hasSynced: true, + lastSyncedAt: now, + priority: maxBy( + targetCheckpoint.checksums + .map((cs) => BucketPriority(cs.priority)), + (priority) => priority, + compare: BucketPriority.comparator, + )!, + ) + ], + ); + + return const (abort: false, didApply: true); + } else { + isolateLogger.fine( + 'Could not apply checkpoint. Waiting for next sync complete line'); + return const (abort: false, didApply: false); + } } Stream streamingSyncRequest( diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 359f68e7..95d81ea0 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; @@ -22,6 +24,7 @@ void main() { late MockSyncService syncService; late StreamingSync syncClient; var credentialsCallbackCount = 0; + Future Function(PowerSyncDatabase) uploadData = (db) async {}; setUp(() async { credentialsCallbackCount = 0; @@ -42,7 +45,7 @@ void main() { token: 'token$credentialsCallbackCount', expiresAt: DateTime.now(), ); - }), + }, uploadData: (db) => uploadData(db)), ); }); @@ -329,6 +332,94 @@ void main() { expect(nextRequest.headers['Authorization'], 'Token token2'); expect(credentialsCallbackCount, 2); }); + + test('handles checkpoints during the upload process', () async { + final status = await waitForConnection(); + + Future expectCustomerRows(dynamic matcher) async { + final rows = await database.getAll('SELECT * FROM customers'); + expect(rows, matcher); + } + + final uploadStarted = Completer(); + final uploadFinished = Completer(); + + uploadData = (db) async { + if (await db.getCrudBatch() case final batch?) { + uploadStarted.complete(); + await uploadFinished.future; + batch.complete(); + } + }; + + // Trigger an upload + await database.execute( + 'INSERT INTO customers (id, name, email) VALUES (uuid(), ?, ?)', + ['local', 'local@example.org']); + await expectCustomerRows(hasLength(1)); + await uploadStarted.future; + + // Pretend that the connector takes forever in uploadData, but the data + // gets uploaded before the method returns. + syncService.addLine({ + 'checkpoint': Checkpoint( + writeCheckpoint: '1', + lastOpId: '2', + checksums: [BucketChecksum(bucket: 'a', priority: 3, checksum: 0)], + ) + }); + await expectLater(status, emitsThrough(isSyncStatus(downloading: true))); + + syncService + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': [ + { + 'checksum': 0, + 'data': {'name': 'from local', 'email': 'local@example.org'}, + 'op': 'PUT', + 'op_id': '1', + 'object_id': '1', + 'object_type': 'customers' + }, + { + 'checksum': 0, + 'data': {'name': 'additional', 'email': ''}, + 'op': 'PUT', + 'op_id': '2', + 'object_id': '2', + 'object_type': 'customers' + } + ] + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '2'} + }); + + // Despite receiving a valid checkpoint with two rows, it should not be + // visible because we have local data pending. + await expectCustomerRows(hasLength(1)); + + // Mark the upload as completed, this should trigger a write_checkpoint + // request. + final sentCheckpoint = Completer(); + syncService.writeCheckpoint = () { + sentCheckpoint.complete(); + return { + 'data': {'write_checkpoint': '1'} + }; + }; + uploadFinished.complete(); + await sentCheckpoint.future; + + // This should apply the checkpoint. + await expectLater(status, emitsThrough(isSyncStatus(downloading: false))); + + // Meaning that the two rows are now visible. + await expectCustomerRows(hasLength(2)); + }); }); } diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart index dd4db237..a38863eb 100644 --- a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -10,6 +10,11 @@ final class MockSyncService { Completer _listener = Completer(); final router = Router(); + Object? Function() writeCheckpoint = () { + return { + 'data': {'write_checkpoint': '10'} + }; + }; MockSyncService() { router @@ -27,7 +32,7 @@ final class MockSyncService { }); }) ..get('/write-checkpoint2.json', (request) { - return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: { + return Response.ok(json.encode(writeCheckpoint()), headers: { 'Content-Type': 'application/json', }); });