Skip to content

Commit a3df9c5

Browse files
committed
Rust: Crud upload on reconnect
1 parent 2688aea commit a3df9c5

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ typedef BucketDescription = ({
586586
final class _ActiveRustStreamingIteration {
587587
final StreamingSyncImplementation sync;
588588
var _isActive = true;
589+
var _hadSyncLine = false;
589590

590591
StreamSubscription<void>? _completedUploads;
591592
final Completer<void> _completedStream = Completer();
@@ -621,8 +622,10 @@ final class _ActiveRustStreamingIteration {
621622

622623
switch (event) {
623624
case ReceivedLine(line: final Uint8List line):
625+
_triggerCrudUploadOnFirstLine();
624626
await _control('line_binary', line);
625627
case ReceivedLine(line: final line as String):
628+
_triggerCrudUploadOnFirstLine();
626629
await _control('line_text', line);
627630
case UploadCompleted():
628631
await _control('completed_upload');
@@ -634,6 +637,17 @@ final class _ActiveRustStreamingIteration {
634637
}
635638
}
636639

640+
/// Triggers a local CRUD upload when the first sync line has been received.
641+
///
642+
/// This allows uploading local changes that have been made while offline or
643+
/// disconnected.
644+
void _triggerCrudUploadOnFirstLine() {
645+
if (!_hadSyncLine) {
646+
sync._internalCrudTriggerController.add(null);
647+
_hadSyncLine = true;
648+
}
649+
}
650+
637651
Future<void> _stop() {
638652
return _control('stop');
639653
}

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ void _declareTests(String name, SyncOptions options) {
104104
addTearDown(status.cancel);
105105

106106
syncService.addKeepAlive();
107-
await expectLater(
108-
status, emits(isSyncStatus(connected: true, hasSynced: false)));
107+
await expectLater(status,
108+
emitsThrough(isSyncStatus(connected: true, hasSynced: false)));
109109
return status;
110110
}
111111

@@ -774,6 +774,58 @@ void _declareTests(String name, SyncOptions options) {
774774
await Future<void>.delayed(const Duration(milliseconds: 500));
775775
expect(syncService.controller.hasListener, isTrue);
776776
});
777+
778+
test('uploads writes made while offline', () async {
779+
// Local write while not connected
780+
await database.execute(
781+
'insert into customers (id, name) values (uuid(), ?)',
782+
['local customer']);
783+
uploadData = (db) async {
784+
final batch = await db.getNextCrudTransaction();
785+
if (batch != null) {
786+
await batch.complete();
787+
}
788+
};
789+
syncService.writeCheckpoint = () => {
790+
'data': {'write_checkpoint': '1'}
791+
};
792+
793+
final query = StreamQueue(database
794+
.watch('SELECT name FROM customers')
795+
.map((e) => e.single['name']));
796+
expect(await query.next, 'local customer');
797+
798+
await waitForConnection();
799+
800+
syncService
801+
..addLine({
802+
'checkpoint': Checkpoint(
803+
lastOpId: '1',
804+
writeCheckpoint: '1',
805+
checksums: [BucketChecksum(bucket: 'a', priority: 3, checksum: 0)],
806+
)
807+
})
808+
..addLine({
809+
'data': {
810+
'bucket': 'a',
811+
'data': <Map<String, Object?>>[
812+
{
813+
'op_id': '1',
814+
'op': 'PUT',
815+
'object_type': 'customers',
816+
'object_id': '1',
817+
'checksum': 0,
818+
'data': json.encode({'name': 'from server'}),
819+
}
820+
],
821+
}
822+
})
823+
..addLine({
824+
'checkpoint_complete': {'last_op_id': '1'}
825+
});
826+
827+
expect(await query.next, 'from server');
828+
});
777829
});
778830
}
779831

0 commit comments

Comments
 (0)