Skip to content

Commit 51541c9

Browse files
committed
Track progress for sync lines
1 parent c4181c6 commit 51541c9

File tree

5 files changed

+76
-7
lines changed

5 files changed

+76
-7
lines changed

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ export interface SyncLocalDatabaseResult {
3030
checkpointFailures?: string[];
3131
}
3232

33+
export type BucketOperationProgress = Record<string, {
34+
atLast: number;
35+
sinceLast: number;
36+
}>;
37+
3338
export interface BucketChecksum {
3439
bucket: string;
3540
priority?: number;
@@ -65,6 +70,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
6570
startSession(): void;
6671

6772
getBucketStates(): Promise<BucketState[]>;
73+
getBucketOperationProgress(): Promise<BucketOperationProgress>;
6874

6975
syncLocalDatabase(
7076
checkpoint: Checkpoint,

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { BaseObserver } from '../../../utils/BaseObserver.js';
55
import { MAX_OP_ID } from '../../constants.js';
66
import {
77
BucketChecksum,
8+
BucketOperationProgress,
89
BucketState,
910
BucketStorageAdapter,
1011
BucketStorageListener,
@@ -91,6 +92,11 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
9192
return result;
9293
}
9394

95+
async getBucketOperationProgress(): Promise<BucketOperationProgress> {
96+
const rows = await this.db.getAll<{name: string, count_at_last: number, count_since_last: number}>("SELECT name, count_at_last, count_since_last FROM ps_buckets");
97+
return Object.fromEntries(rows.map((r) => [r.name, {atLast: r.count_at_last, sinceLast: r.count_since_last}]));
98+
}
99+
94100
async saveSyncData(batch: SyncDataBatch) {
95101
await this.writeTransaction(async (tx) => {
96102
let count = 0;
@@ -199,7 +205,16 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
199205
'sync_local',
200206
arg
201207
]);
202-
return result == 1;
208+
if (result == 1) {
209+
if (priority == null) {
210+
const bucketToCount = Object.fromEntries(checkpoint.buckets.map((b) => [b.bucket, b.count]));
211+
await tx.execute('UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name WHERE ?1->name IS NOT NULL', [JSON.stringify(bucketToCount)]);
212+
}
213+
214+
return true;
215+
} else {
216+
return false;
217+
}
203218
});
204219
}
205220

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
isStreamingSyncData
2121
} from './streaming-sync-types.js';
2222
import { DataStream } from 'src/utils/DataStream.js';
23+
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';
2324

2425
export enum LockType {
2526
CRUD = 'crud',
@@ -163,21 +164,23 @@ export abstract class AbstractStreamingSyncImplementation
163164
protected streamingSyncPromise?: Promise<void>;
164165

165166
syncStatus: SyncStatus;
167+
private syncStatusOptions: SyncStatusOptions;
166168
triggerCrudUpload: () => void;
167169

168170
constructor(options: AbstractStreamingSyncImplementationOptions) {
169171
super();
170172
this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options };
171173

172-
this.syncStatus = new SyncStatus({
174+
this.syncStatusOptions = {
173175
connected: false,
174176
connecting: false,
175177
lastSyncedAt: undefined,
176178
dataFlow: {
177179
uploading: false,
178180
downloading: false
179181
}
180-
});
182+
};
183+
this.syncStatus = new SyncStatus(this.syncStatusOptions);
181184
this.abortController = null;
182185

183186
this.triggerCrudUpload = throttleLeadingTrailing(() => {
@@ -417,7 +420,8 @@ The next upload iteration will be delayed.`);
417420
connected: false,
418421
connecting: false,
419422
dataFlow: {
420-
downloading: false
423+
downloading: false,
424+
downloadProgress: null,
421425
}
422426
});
423427
});
@@ -581,6 +585,7 @@ The next upload iteration will be delayed.`);
581585
bucketMap = newBuckets;
582586
await this.options.adapter.removeBuckets([...bucketsToDelete]);
583587
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
588+
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
584589
} else if (isStreamingSyncCheckpointComplete(line)) {
585590
this.logger.debug('Checkpoint complete', targetCheckpoint);
586591
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
@@ -601,6 +606,7 @@ The next upload iteration will be delayed.`);
601606
lastSyncedAt: new Date(),
602607
dataFlow: {
603608
downloading: false,
609+
downloadProgress: null,
604610
downloadError: undefined
605611
}
606612
});
@@ -658,6 +664,7 @@ The next upload iteration will be delayed.`);
658664
write_checkpoint: diff.write_checkpoint
659665
};
660666
targetCheckpoint = newCheckpoint;
667+
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
661668

662669
bucketMap = new Map();
663670
newBuckets.forEach((checksum, name) =>
@@ -675,9 +682,23 @@ The next upload iteration will be delayed.`);
675682
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
676683
} else if (isStreamingSyncData(line)) {
677684
const { data } = line;
685+
const previousProgress = this.syncStatusOptions.dataFlow?.downloadProgress;
686+
let updatedProgress: InternalProgressInformation | null = null;
687+
if (previousProgress) {
688+
updatedProgress = {...previousProgress};
689+
const progressForBucket = updatedProgress[data.bucket];
690+
if (progressForBucket) {
691+
updatedProgress[data.bucket] = {
692+
...progressForBucket,
693+
sinceLast: progressForBucket.sinceLast + data.data.length,
694+
};
695+
}
696+
}
697+
678698
this.updateSyncStatus({
679699
dataFlow: {
680-
downloading: true
700+
downloading: true,
701+
downloadProgress: updatedProgress,
681702
}
682703
});
683704
await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] });
@@ -724,6 +745,7 @@ The next upload iteration will be delayed.`);
724745
priorityStatusEntries: [],
725746
dataFlow: {
726747
downloading: false,
748+
downloadProgress: null,
727749
downloadError: undefined
728750
}
729751
});
@@ -738,6 +760,30 @@ The next upload iteration will be delayed.`);
738760
});
739761
}
740762

763+
private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) {
764+
const localProgress = await this.options.adapter.getBucketOperationProgress();
765+
const progress: InternalProgressInformation = {};
766+
767+
for (const bucket of checkpoint.buckets) {
768+
const savedProgress = localProgress[bucket.bucket];
769+
progress[bucket.bucket] = {
770+
// The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service
771+
// will use by default.
772+
priority: bucket.priority ?? 3,
773+
atLast: savedProgress?.atLast ?? 0,
774+
sinceLast: savedProgress.sinceLast ?? 0,
775+
targetCount: bucket.count ?? 0,
776+
};
777+
}
778+
779+
this.updateSyncStatus({
780+
dataFlow: {
781+
downloading: true,
782+
downloadProgress: progress,
783+
}
784+
});
785+
}
786+
741787
protected updateSyncStatus(options: SyncStatusOptions) {
742788
const updatedStatus = new SyncStatus({
743789
connected: options.connected ?? this.syncStatus.connected,
@@ -751,6 +797,7 @@ The next upload iteration will be delayed.`);
751797
});
752798

753799
if (!this.syncStatus.isEqual(updatedStatus)) {
800+
this.syncStatusOptions = options;
754801
this.syncStatus = updatedStatus;
755802
// Only trigger this is there was a change
756803
this.iterateListeners((cb) => cb.statusChanged?.(updatedStatus));

packages/common/src/db/crud/SyncProgress.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { SyncStatus } from "./SyncStatus.js";
22

33
// (bucket, progress) pairs
4+
/** @internal */
45
export type InternalProgressInformation = Record<string, {
56
priority: number, // Priority of the associated buckets
67
atLast: number, // Total ops at last completed sync, or 0
@@ -9,7 +10,7 @@ export type InternalProgressInformation = Record<string, {
910
}>;
1011

1112
/**
12-
* The priority used by the core extension to indicate that a full sync was completed.
13+
* @internal The priority used by the core extension to indicate that a full sync was completed.
1314
*/
1415
export const FULL_SYNC_PRIORITY = 2147483647;
1516

packages/common/src/db/crud/SyncStatus.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export type SyncDataFlowStatus = Partial<{
1919
*
2020
* Please use the {@link SyncStatus#downloadProgress} property to track sync progress.
2121
*/
22-
downloadProgress: InternalProgressInformation,
22+
downloadProgress: InternalProgressInformation | null,
2323
}>;
2424

2525
export interface SyncPriorityStatus {

0 commit comments

Comments
 (0)