Skip to content

Commit 3a28499

Browse files
committed
Fix tracking sync progress
1 parent 3123bad commit 3a28499

File tree

6 files changed

+234
-9
lines changed

6 files changed

+234
-9
lines changed

crates/core/src/json_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl JsonWriter {
2222
}
2323
self.buffer.push('"');
2424
format_escaped_str_contents(&mut self.buffer, key);
25-
self.buffer.push('"');
25+
self.buffer.push_str("\": ");
2626
}
2727

2828
pub fn write_str(&mut self, key: &str, value: &str) {

crates/core/src/sync/storage_adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl StorageAdapter {
9191
if stmt.step()? == ResultCode::ROW {
9292
let bucket = stmt.column_text(0)?;
9393
let count_at_last = stmt.column_int64(1);
94-
let count_since_last = stmt.column_int64(1);
94+
let count_since_last = stmt.column_int64(2);
9595

9696
return Ok(Some(PersistedBucketProgress {
9797
bucket,

crates/core/src/sync/streaming_sync.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,11 @@ impl StreamingSyncIteration {
222222
SyncEvent::Initialize { .. } => {
223223
panic!("Initialize should only be emited once")
224224
}
225-
SyncEvent::TearDown => break,
225+
SyncEvent::TearDown => {
226+
self.status
227+
.update(|s| s.disconnect(), &mut event.instructions);
228+
break;
229+
}
226230
SyncEvent::TextLine { data } => serde_json::from_str(data)?,
227231
SyncEvent::BinaryLine { data } => bson::from_bytes(data)?,
228232
SyncEvent::UploadFinished => {
@@ -257,8 +261,7 @@ impl StreamingSyncIteration {
257261
}
258262
};
259263

260-
self.status
261-
.update(|s| s.mark_connected(), &mut event.instructions);
264+
self.status.update_only(|s| s.mark_connected());
262265

263266
match line {
264267
SyncLine::Checkpoint(checkpoint) => {
@@ -396,6 +399,8 @@ impl StreamingSyncIteration {
396399
}
397400
}
398401
}
402+
403+
self.status.emit_changes(&mut event.instructions);
399404
}
400405

401406
Ok(())

crates/core/src/sync/sync_status.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ impl DownloadSyncStatus {
2727
.is_sorted_by(|a, b| a.priority >= b.priority))
2828
}
2929

30+
pub fn disconnect(&mut self) {
31+
self.connected = false;
32+
self.connecting = false;
33+
self.downloading = None;
34+
}
35+
3036
pub fn start_connecting(&mut self) {
3137
self.connected = false;
3238
self.downloading = None;
@@ -112,11 +118,20 @@ impl SyncStatusContainer {
112118
apply: F,
113119
instructions: &mut Vec<Instruction>,
114120
) {
121+
self.update_only(apply);
122+
self.emit_changes(instructions);
123+
}
124+
125+
/// Invokes a function to update the sync status without emitting a status event.
126+
pub fn update_only<F: FnOnce(&mut DownloadSyncStatus) -> ()>(&self, apply: F) {
115127
let mut status = self.status.borrow_mut();
116128
apply(&mut *status);
129+
}
117130

118-
// If apply() actually changed something (we compare hash codes to avoid copying), emit an
119-
// instructions for clients to update the public sync status.
131+
/// If the status has been changed since the last time an [Instruction::UpdateSyncStatus] event
132+
/// was emitted, emit such an event now.
133+
pub fn emit_changes(&mut self, instructions: &mut Vec<Instruction>) {
134+
let status = self.status.borrow();
120135
let hash = FxBuildHasher.hash_one(&*status);
121136
if hash != self.last_published_hash {
122137
self.last_published_hash = hash;

dart/test/goldens/simple_iteration.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,20 @@
120120
{
121121
"LogLine": {
122122
"severity": "DEBUG",
123-
"line": "Validated checkpoint"
123+
"line": "Validated and applied checkpoint"
124124
}
125125
},
126+
{
127+
"DidCompleteSync": {}
128+
},
126129
{
127130
"UpdateSyncStatus": {
128131
"status": {
129132
"connected": true,
130133
"connecting": false,
131134
"priority_status": [
132135
{
133-
"priority": 3,
136+
"priority": 2147483647,
134137
"last_synced_at": 1740819600,
135138
"has_synced": true
136139
}

dart/test/sync_test.dart

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,188 @@ void _syncTests<T>({
355355
]);
356356
});
357357

358+
group('progress', () {
359+
Map<String, BucketProgress>? progress = null;
360+
var lastOpId = 0;
361+
362+
setUp(() {
363+
lastOpId = 0;
364+
return progress = null;
365+
});
366+
367+
(int, int) totalProgress() {
368+
return progress!.values.downloadAndTargetCount();
369+
}
370+
371+
(int, int) priorityProgress(int priority) {
372+
return progress!.values
373+
.where((e) => e.priority <= priority)
374+
.downloadAndTargetCount();
375+
}
376+
377+
void applyInstructions(List<Object?> instructions) {
378+
for (final instruction in instructions.cast<Map>()) {
379+
if (instruction['UpdateSyncStatus'] case final updateStatus?) {
380+
final downloading = updateStatus['status']['downloading'];
381+
if (downloading == null) {
382+
progress = null;
383+
} else {
384+
progress = {
385+
for (final MapEntry(:key, :value)
386+
in downloading['buckets'].entries)
387+
key: (
388+
atLast: value['at_last'] as int,
389+
sinceLast: value['since_last'] as int,
390+
targetCount: value['target_count'] as int,
391+
priority: value['priority'] as int,
392+
),
393+
};
394+
}
395+
}
396+
}
397+
}
398+
399+
void pushSyncData(String bucket, int amount) {
400+
final instructions = syncLine({
401+
'data': {
402+
'bucket': bucket,
403+
'has_more': false,
404+
'after': null,
405+
'next_after': null,
406+
'data': [
407+
for (var i = 0; i < amount; i++)
408+
{
409+
'op_id': (++lastOpId).toString(),
410+
'op': 'PUT',
411+
'object_type': 'items',
412+
'object_id': '$lastOpId',
413+
'checksum': 0,
414+
'data': '{}',
415+
}
416+
],
417+
},
418+
});
419+
420+
applyInstructions(instructions);
421+
}
422+
423+
void addCheckpointComplete({int? priority}) {
424+
applyInstructions(
425+
pushCheckpointComplete(priority: priority, lastOpId: '$lastOpId'));
426+
}
427+
428+
test('without priorities', () {
429+
applyInstructions(invokeControl('start', null));
430+
expect(progress, isNull);
431+
432+
applyInstructions(pushCheckpoint(
433+
buckets: [bucketDescription('a', count: 10)], lastOpId: 10));
434+
expect(totalProgress(), (0, 10));
435+
436+
pushSyncData('a', 10);
437+
expect(totalProgress(), (10, 10));
438+
439+
addCheckpointComplete();
440+
expect(progress, isNull);
441+
442+
// Emit new data, progress should be 0/2 instead of 10/12
443+
applyInstructions(pushCheckpoint(
444+
lastOpId: 12, buckets: [bucketDescription('a', count: 12)]));
445+
expect(totalProgress(), (0, 2));
446+
447+
pushSyncData('a', 2);
448+
expect(totalProgress(), (2, 2));
449+
450+
addCheckpointComplete();
451+
expect(progress, isNull);
452+
});
453+
454+
test('interrupted sync', () {
455+
applyInstructions(invokeControl('start', null));
456+
applyInstructions(pushCheckpoint(
457+
buckets: [bucketDescription('a', count: 10)], lastOpId: 10));
458+
expect(totalProgress(), (0, 10));
459+
460+
pushSyncData('a', 5);
461+
expect(totalProgress(), (5, 10));
462+
463+
// Emulate stream closing
464+
applyInstructions(invokeControl('stop', null));
465+
expect(progress, isNull);
466+
467+
applyInstructions(invokeControl('start', null));
468+
applyInstructions(pushCheckpoint(
469+
buckets: [bucketDescription('a', count: 10)], lastOpId: 10));
470+
expect(totalProgress(), (5, 10));
471+
472+
pushSyncData('a', 5);
473+
expect(totalProgress(), (10, 10));
474+
addCheckpointComplete();
475+
expect(progress, isNull);
476+
});
477+
478+
test('interrupted sync with new checkpoint', () {
479+
applyInstructions(invokeControl('start', null));
480+
applyInstructions(pushCheckpoint(
481+
buckets: [bucketDescription('a', count: 10)], lastOpId: 10));
482+
expect(totalProgress(), (0, 10));
483+
484+
pushSyncData('a', 5);
485+
expect(totalProgress(), (5, 10));
486+
487+
// Emulate stream closing
488+
applyInstructions(invokeControl('stop', null));
489+
expect(progress, isNull);
490+
491+
applyInstructions(invokeControl('start', null));
492+
applyInstructions(pushCheckpoint(
493+
buckets: [bucketDescription('a', count: 12)], lastOpId: 12));
494+
expect(totalProgress(), (5, 12));
495+
496+
pushSyncData('a', 7);
497+
expect(totalProgress(), (12, 12));
498+
addCheckpointComplete();
499+
expect(progress, isNull);
500+
});
501+
502+
test('different priorities', () {
503+
void expectProgress((int, int) prio0, (int, int) prio2) {
504+
expect(priorityProgress(0), prio0);
505+
expect(priorityProgress(1), prio0);
506+
expect(priorityProgress(2), prio2);
507+
expect(totalProgress(), prio2);
508+
}
509+
510+
applyInstructions(invokeControl('start', null));
511+
applyInstructions(pushCheckpoint(buckets: [
512+
bucketDescription('a', count: 5, priority: 0),
513+
bucketDescription('b', count: 5, priority: 2),
514+
], lastOpId: 10));
515+
expectProgress((0, 5), (0, 10));
516+
517+
pushSyncData('a', 5);
518+
expectProgress((5, 5), (5, 10));
519+
520+
pushSyncData('b', 2);
521+
expectProgress((5, 5), (7, 10));
522+
523+
// Before syncing b fully, send a new checkpoint
524+
applyInstructions(pushCheckpoint(buckets: [
525+
bucketDescription('a', count: 8, priority: 0),
526+
bucketDescription('b', count: 6, priority: 2),
527+
], lastOpId: 14));
528+
expectProgress((5, 8), (7, 14));
529+
530+
pushSyncData('a', 3);
531+
expectProgress((8, 8), (10, 14));
532+
pushSyncData('b', 4);
533+
expectProgress((8, 8), (14, 14));
534+
535+
addCheckpointComplete();
536+
expect(progress, isNull);
537+
});
538+
});
539+
358540
group('errors', () {
359541
syncTest('diff without prior checkpoint', (_) {
360542
invokeControl('start', null);
@@ -445,6 +627,26 @@ final priorityBuckets = [
445627
for (var i = 0; i < 4; i++) bucketDescription('prio$i', priority: i)
446628
];
447629

630+
typedef BucketProgress = ({
631+
int priority,
632+
int atLast,
633+
int sinceLast,
634+
int targetCount
635+
});
636+
637+
extension on Iterable<BucketProgress> {
638+
(int, int) downloadAndTargetCount() {
639+
return fold((0, 0), (counters, entry) {
640+
final (downloaded, total) = counters;
641+
642+
return (
643+
downloaded + entry.sinceLast,
644+
total + entry.targetCount - entry.atLast
645+
);
646+
});
647+
}
648+
}
649+
448650
extension on Uint8List {
449651
// ignore: unused_element
450652
String get asRustByteString {

0 commit comments

Comments
 (0)