Skip to content

Commit 14c3404

Browse files
committed
Add test for partial syncs
1 parent 53d0d47 commit 14c3404

File tree

14 files changed

+194
-122
lines changed

14 files changed

+194
-122
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,30 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
260260
}
261261

262262
/**
263+
* Wait for the first sync operation to complete.
264+
*
265+
* @argument request Either an abort signal (after which the promise will complete regardless of
266+
* whether a full sync was completed) or an object providing an abort signal and a priority target.
267+
* When a priority target is set, the promise may complete when all buckets with the given (or higher)
268+
* priorities have been synchronized. This can be earlier than a complete sync.
263269
* @returns A promise which will resolve once the first full sync has completed.
264270
*/
265-
async waitForFirstSync(signal?: AbortSignal): Promise<void> {
266-
if (this.currentStatus.hasSynced) {
271+
async waitForFirstSync(request?: AbortSignal | { signal?: AbortSignal; priority?: number }): Promise<void> {
272+
const signal = request instanceof AbortSignal ? request : request?.signal;
273+
const priority = request && 'priority' in request ? request.priority : undefined;
274+
275+
const statusMatches =
276+
priority === undefined
277+
? (status: SyncStatus) => status.hasSynced
278+
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;
279+
280+
if (statusMatches(this.currentStatus)) {
267281
return;
268282
}
269283
return new Promise((resolve) => {
270284
const dispose = this.registerListener({
271285
statusChanged: (status) => {
272-
if (status.hasSynced) {
286+
if (statusMatches(status)) {
273287
dispose();
274288
resolve();
275289
}
@@ -379,7 +393,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
379393
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
380394
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
381395
return {
382-
retryDelayMs: options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
396+
retryDelayMs:
397+
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
383398
crudUploadThrottleMs:
384399
options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs ?? DEFAULT_CRUD_UPLOAD_THROTTLE_MS
385400
};
@@ -401,7 +416,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
401416

402417
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
403418
retryDelayMs,
404-
crudUploadThrottleMs,
419+
crudUploadThrottleMs
405420
});
406421
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
407422
statusChanged: (status) => {

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
6666

6767
getBucketStates(): Promise<BucketState[]>;
6868

69-
syncLocalDatabase(checkpoint: Checkpoint, priority?: number): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
69+
syncLocalDatabase(
70+
checkpoint: Checkpoint,
71+
priority?: number
72+
): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
7073

7174
nextCrudItem(): Promise<CrudEntry | undefined>;
7275
hasCrud(): Promise<boolean>;

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,20 +186,22 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
186186
}
187187
}
188188

189-
arg = JSON.stringify({priority, buckets: affectedBuckets});
189+
arg = JSON.stringify({ priority, buckets: affectedBuckets });
190190
}
191191

192192
return this.writeTransaction(async (tx) => {
193193
const { insertId: result } = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
194194
'sync_local',
195-
arg,
195+
arg
196196
]);
197197
return result == 1;
198198
});
199199
}
200200

201201
async validateChecksums(checkpoint: Checkpoint, priority: number | undefined): Promise<SyncLocalDatabaseResult> {
202-
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [JSON.stringify({...checkpoint, priority})]);
202+
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [
203+
JSON.stringify({ ...checkpoint, priority })
204+
]);
203205

204206
const resultItem = rs.rows?.item(0);
205207
this.logger.debug('validateChecksums result item', resultItem);

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncI
126126
triggerCrudUpload: () => void;
127127
waitForReady(): Promise<void>;
128128
waitForStatus(status: SyncStatusOptions): Promise<void>;
129+
waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise<void>;
129130
}
130131

131132
export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;
@@ -184,24 +185,35 @@ export abstract class AbstractStreamingSyncImplementation
184185
async waitForReady() {}
185186

186187
waitForStatus(status: SyncStatusOptions): Promise<void> {
188+
return this.waitUntilStatusMatches((currentStatus) => {
189+
/**
190+
* Match only the partial status options provided in the
191+
* matching status
192+
*/
193+
const matchPartialObject = (compA: object, compB: object) => {
194+
return Object.entries(compA).every(([key, value]) => {
195+
const comparisonBValue = compB[key];
196+
if (typeof value == 'object' && typeof comparisonBValue == 'object') {
197+
return matchPartialObject(value, comparisonBValue);
198+
}
199+
return value == comparisonBValue;
200+
});
201+
};
202+
203+
return matchPartialObject(status, currentStatus);
204+
});
205+
}
206+
207+
waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise<void> {
187208
return new Promise((resolve) => {
209+
if (predicate(this.syncStatus)) {
210+
resolve();
211+
return;
212+
}
213+
188214
const l = this.registerListener({
189215
statusChanged: (updatedStatus) => {
190-
/**
191-
* Match only the partial status options provided in the
192-
* matching status
193-
*/
194-
const matchPartialObject = (compA: object, compB: object) => {
195-
return Object.entries(compA).every(([key, value]) => {
196-
const comparisonBValue = compB[key];
197-
if (typeof value == 'object' && typeof comparisonBValue == 'object') {
198-
return matchPartialObject(value, comparisonBValue);
199-
}
200-
return value == comparisonBValue;
201-
});
202-
};
203-
204-
if (matchPartialObject(status, updatedStatus.toJSON())) {
216+
if (predicate(updatedStatus)) {
205217
resolve();
206218
l?.();
207219
}
@@ -524,6 +536,7 @@ The next upload iteration will be delayed.`);
524536
// The stream has closed while waiting
525537
return { retry: true };
526538
}
539+
527540
// A connection is active and messages are being received
528541
if (!this.syncStatus.connected) {
529542
// There is a connection now
@@ -538,7 +551,7 @@ The next upload iteration will be delayed.`);
538551
const bucketsToDelete = new Set<string>(bucketMap.keys());
539552
const newBuckets = new Map<string, BucketDescription>();
540553
for (const checksum of line.checkpoint.buckets) {
541-
newBuckets.set(checksum.bucket, { name: checksum.bucket, priority: checksum.priority })
554+
newBuckets.set(checksum.bucket, { name: checksum.bucket, priority: checksum.priority });
542555
bucketsToDelete.delete(checksum.bucket);
543556
}
544557
if (bucketsToDelete.size > 0) {
@@ -567,14 +580,14 @@ The next upload iteration will be delayed.`);
567580
lastSyncedAt: new Date(),
568581
dataFlow: {
569582
downloading: false
570-
},
583+
}
571584
});
572585
}
573586

574587
validatedCheckpoint = targetCheckpoint;
575588
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
576-
this.logger.debug('Partial checkpoint complete', targetCheckpoint);
577589
const priority = line.partial_checkpoint_complete.priority;
590+
this.logger.debug('Partial checkpoint complete', priority);
578591
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!, priority);
579592
if (!result.checkpointValid) {
580593
// This means checksums failed. Start again with a new checkpoint.
@@ -592,16 +605,12 @@ The next upload iteration will be delayed.`);
592605
priorityStates.push({
593606
priority,
594607
lastSyncedAt: new Date(),
595-
hasSynced: true,
608+
hasSynced: true
596609
});
597610

598611
this.updateSyncStatus({
599612
connected: true,
600-
lastSyncedAt: new Date(),
601-
statusInPriority: priorityStates,
602-
dataFlow: {
603-
downloading: false
604-
},
613+
statusInPriority: priorityStates
605614
});
606615
}
607616
} else if (isStreamingSyncCheckpointDiff(line)) {
@@ -628,10 +637,13 @@ The next upload iteration will be delayed.`);
628637
};
629638
targetCheckpoint = newCheckpoint;
630639

631-
bucketMap = new Map(newBuckets.entries().map(([name, checksum]) => [name, {
632-
name: checksum.bucket,
633-
priority: checksum.priority,
634-
}]));
640+
bucketMap = new Map();
641+
newBuckets.forEach((checksum, name) =>
642+
bucketMap.set(name, {
643+
name: checksum.bucket,
644+
priority: checksum.priority
645+
})
646+
);
635647

636648
const bucketsToDelete = diff.removed_buckets;
637649
if (bucketsToDelete.length > 0) {
@@ -667,7 +679,7 @@ The next upload iteration will be delayed.`);
667679
this.updateSyncStatus({
668680
connected: true,
669681
lastSyncedAt: new Date(),
670-
statusInPriority: [],
682+
statusInPriority: []
671683
});
672684
} else if (validatedCheckpoint === targetCheckpoint) {
673685
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
@@ -709,7 +721,7 @@ The next upload iteration will be delayed.`);
709721
...this.syncStatus.dataFlowStatus,
710722
...options.dataFlow
711723
},
712-
statusInPriority: options.statusInPriority,
724+
statusInPriority: options.statusInPriority ?? this.syncStatus.statusInPriority
713725
});
714726

715727
if (!this.syncStatus.isEqual(updatedStatus)) {

packages/common/src/client/sync/stream/streaming-sync-types.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ export function isStreamingSyncCheckpointComplete(line: StreamingSyncLine): line
161161
return (line as StreamingSyncCheckpointComplete).checkpoint_complete != null;
162162
}
163163

164-
export function isStreamingSyncCheckpointPartiallyComplete(line: StreamingSyncLine): line is StreamingSyncCheckpointPartiallyComplete {
164+
export function isStreamingSyncCheckpointPartiallyComplete(
165+
line: StreamingSyncLine
166+
): line is StreamingSyncCheckpointPartiallyComplete {
165167
return (line as StreamingSyncCheckpointPartiallyComplete).partial_checkpoint_complete != null;
166168
}
167169

packages/common/src/db/DBAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBG
103103
writeTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
104104
/**
105105
* This method refreshes the schema information across all connections. This is for advanced use cases, and should generally not be needed.
106-
*/
106+
*/
107107
refreshSchema: () => Promise<void>;
108108
}
109109

packages/common/src/db/crud/SyncStatus.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ export type SyncDataFlowStatus = Partial<{
55

66
export interface SyncPriorityStatus {
77
priority: number;
8-
lastSyncedAt?: Date,
9-
hasSynced?: boolean,
10-
};
8+
lastSyncedAt?: Date;
9+
hasSynced?: boolean;
10+
}
1111

1212
export type SyncStatusOptions = {
1313
connected?: boolean;
1414
connecting?: boolean;
1515
dataFlow?: SyncDataFlowStatus;
1616
lastSyncedAt?: Date;
1717
hasSynced?: boolean;
18-
statusInPriority?: SyncPriorityStatus[],
18+
statusInPriority?: SyncPriorityStatus[];
1919
};
2020

2121
export class SyncStatus {
@@ -77,7 +77,7 @@ export class SyncStatus {
7777
/**
7878
* Reports a pair of {@link SyncStatus#hasSynced} and {@link SyncStatus#lastSyncedAt} fields that apply
7979
* to a specific bucket priority instead of the entire sync operation.
80-
*
80+
*
8181
* When buckets with different priorities are declared, PowerSync may choose to synchronize higher-priority
8282
* buckets first. When a consistent view over all buckets for all priorities up until the given priority is
8383
* reached, PowerSync makes data from those buckets available before lower-priority buckets have finished
@@ -86,7 +86,7 @@ export class SyncStatus {
8686
* be consistent with that checkpoint. For this reason, this method may also return the status for lower priorities.
8787
* In a state where the PowerSync just finished synchronizing buckets in priority level 3, calling this method
8888
* with a priority of 1 may return information for priority level 3.
89-
*
89+
*
9090
* @param priority The bucket priority for which the status should be reported.
9191
*/
9292
statusForPriority(priority: number): SyncPriorityStatus {
@@ -102,7 +102,7 @@ export class SyncStatus {
102102
return {
103103
priority,
104104
lastSyncedAt: this.lastSyncedAt,
105-
hasSynced: this.hasSynced,
105+
hasSynced: this.hasSynced
106106
};
107107
}
108108

@@ -122,7 +122,7 @@ export class SyncStatus {
122122
dataFlow: this.dataFlowStatus,
123123
lastSyncedAt: this.lastSyncedAt,
124124
hasSynced: this.hasSynced,
125-
statusInPriority: this.options.statusInPriority,
125+
statusInPriority: this.statusInPriority
126126
};
127127
}
128128

packages/common/src/utils/DataStream.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
9999
* @returns a Data payload or Null if the stream closed.
100100
*/
101101
async read(): Promise<Data | null> {
102-
if (this.dataQueue.length <= this.lowWatermark) {
103-
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
104-
}
105-
106102
if (this.closed) {
107103
return null;
108104
}
@@ -181,14 +177,15 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
181177
}
182178

183179
protected async _processQueue() {
184-
if (!this.dataQueue.length || this.isClosed || !this.hasDataReader()) {
180+
if (this.isClosed || !this.hasDataReader()) {
185181
Promise.resolve().then(() => (this.processingPromise = null));
186182
return;
187183
}
188184

189-
const data = this.dataQueue.shift()!;
190-
191-
await this.iterateAsyncErrored(async (l) => l.data?.(data));
185+
if (this.dataQueue.length) {
186+
const data = this.dataQueue.shift()!;
187+
await this.iterateAsyncErrored(async (l) => l.data?.(data));
188+
}
192189

193190
if (this.dataQueue.length <= this.lowWatermark) {
194191
await this.iterateAsyncErrored(async (l) => l.lowWater?.());

packages/common/tests/db/schema/Schema.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ describe('Schema', () => {
3535
}),
3636
posts: new Table({
3737
title: column.text,
38-
content: column.text,
38+
content: column.text
3939
})
4040
};
4141
const schema = new Schema(schemaDefinition);
@@ -62,7 +62,7 @@ describe('Schema', () => {
6262

6363
const invalidSchema = new Schema({
6464
invalidTable: new Table({
65-
'invalid name': column.text,
65+
'invalid name': column.text
6666
})
6767
});
6868

@@ -77,7 +77,7 @@ describe('Schema', () => {
7777
}),
7878
posts: new Table({
7979
title: column.text,
80-
content: column.text,
80+
content: column.text
8181
})
8282
});
8383

packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,14 @@ export class SSRStreamingSyncImplementation extends BaseObserver implements Stre
5252
* This will never resolve in SSR Mode.
5353
*/
5454
async waitForStatus(status: SyncStatusOptions) {
55-
return new Promise<void>((r) => {});
55+
return this.waitUntilStatusMatches(() => false);
56+
}
57+
58+
/**
59+
* This will never resolve in SSR Mode.
60+
*/
61+
waitUntilStatusMatches(_predicate: (status: SyncStatus) => boolean): Promise<void> {
62+
return new Promise<void>(() => {});
5663
}
5764

5865
/**

0 commit comments

Comments
 (0)