Skip to content

Commit 17680a5

Browse files
fix multiple instance unit tests
1 parent 1f9ffad commit 17680a5

File tree

6 files changed

+62
-34
lines changed

6 files changed

+62
-34
lines changed

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
1+
import { BaseListener, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
22
import { CrudBatch } from './CrudBatch.js';
33
import { CrudEntry, OpId } from './CrudEntry.js';
44
import { SyncDataBatch } from './SyncDataBatch.js';
@@ -72,7 +72,7 @@ export interface BucketStorageListener extends BaseListener {
7272
crudUpdate: () => void;
7373
}
7474

75-
export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
75+
export interface BucketStorageAdapter extends BaseObserverInterface<BucketStorageListener>, Disposable {
7676
init(): Promise<void>;
7777
saveSyncData(batch: SyncDataBatch, fixedKeyFormat?: boolean): Promise<void>;
7878
removeBuckets(buckets: string[]): Promise<void>;

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import Logger, { ILogger } from 'js-logger';
22

3-
import { DataStream } from '../../../utils/DataStream.js';
4-
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
53
import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/crud/SyncProgress.js';
64
import * as sync_status from '../../../db/crud/SyncStatus.js';
5+
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
76
import { AbortOperation } from '../../../utils/AbortOperation.js';
8-
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
7+
import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js';
8+
import { DataStream } from '../../../utils/DataStream.js';
99
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
1010
import {
1111
BucketChecksum,
@@ -17,6 +17,7 @@ import {
1717
import { CrudEntry } from '../bucket/CrudEntry.js';
1818
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
1919
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
20+
import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js';
2021
import {
2122
BucketRequest,
2223
StreamingSyncLine,
@@ -28,7 +29,6 @@ import {
2829
isStreamingSyncCheckpointPartiallyComplete,
2930
isStreamingSyncData
3031
} from './streaming-sync-types.js';
31-
import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js';
3232

3333
export enum LockType {
3434
CRUD = 'crud',
@@ -170,7 +170,9 @@ export interface AdditionalConnectionOptions {
170170
/** @internal */
171171
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;
172172

173-
export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
173+
export interface StreamingSyncImplementation
174+
extends BaseObserverInterface<StreamingSyncImplementationListener>,
175+
Disposable {
174176
/**
175177
* Connects to the sync service
176178
*/
@@ -222,6 +224,9 @@ export abstract class AbstractStreamingSyncImplementation
222224
protected _lastSyncedAt: Date | null;
223225
protected options: AbstractStreamingSyncImplementationOptions;
224226
protected abortController: AbortController | null;
227+
// In rare cases, mostly for tests, uploads can be triggered without being properly connected.
228+
// This allows ensuring that all upload processes can be aborted.
229+
protected uploadAbortController: AbortController | null;
225230
protected crudUpdateListener?: () => void;
226231
protected streamingSyncPromise?: Promise<void>;
227232

@@ -315,8 +320,10 @@ export abstract class AbstractStreamingSyncImplementation
315320
}
316321

317322
async dispose() {
323+
super.dispose();
318324
this.crudUpdateListener?.();
319325
this.crudUpdateListener = undefined;
326+
this.uploadAbortController?.abort();
320327
}
321328

322329
abstract obtainLock<T>(lockOptions: LockOptions<T>): Promise<T>;
@@ -341,7 +348,17 @@ export abstract class AbstractStreamingSyncImplementation
341348
*/
342349
let checkedCrudItem: CrudEntry | undefined;
343350

344-
while (true) {
351+
const controller = new AbortController();
352+
this.uploadAbortController = controller;
353+
this.abortController?.signal.addEventListener(
354+
'abort',
355+
() => {
356+
controller.abort();
357+
},
358+
{ once: true }
359+
);
360+
361+
while (!controller.signal.aborted) {
345362
this.updateSyncStatus({
346363
dataFlow: {
347364
uploading: true
@@ -381,7 +398,7 @@ The next upload iteration will be delayed.`);
381398
uploadError: ex
382399
}
383400
});
384-
await this.delayRetry();
401+
await this.delayRetry(controller.signal);
385402
if (!this.isConnected) {
386403
// Exit the upload loop if the sync stream is no longer connected
387404
break;
@@ -397,6 +414,7 @@ The next upload iteration will be delayed.`);
397414
});
398415
}
399416
}
417+
this.uploadAbortController = null;
400418
}
401419
});
402420
}

packages/common/src/utils/BaseObserver.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
export interface Disposable {
2-
dispose: () => Promise<void>;
2+
dispose: () => Promise<void> | void;
33
}
44

55
export type BaseListener = Record<string, ((...event: any) => any) | undefined>;
@@ -13,6 +13,10 @@ export class BaseObserver<T extends BaseListener = BaseListener> implements Base
1313

1414
constructor() {}
1515

16+
dispose(): void {
17+
this.listeners.clear();
18+
}
19+
1620
/**
1721
* Register a listener for updates to the PowerSync client.
1822
*/

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
207207
async dispose(): Promise<void> {
208208
await this.waitForReady();
209209

210+
await super.dispose();
211+
210212
await new Promise<void>((resolve) => {
211213
// Listen for the close acknowledgment from the worker
212214
this.messagePort.addEventListener('message', (event) => {

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ export class SharedSyncImplementation
297297
});
298298

299299
const shouldReconnect = !!this.connectionManager.syncStreamImplementation && this.ports.length > 0;
300-
301300
return {
302301
shouldReconnect,
303302
trackedPort
@@ -473,10 +472,8 @@ export class SharedSyncImplementation
473472
*/
474473
private async _testUpdateAllStatuses(status: SyncStatusOptions) {
475474
if (!this.connectionManager.syncStreamImplementation) {
476-
// This is just for testing purposes
477-
this.connectionManager.syncStreamImplementation = this.generateStreamingImplementation();
475+
throw new Error('Cannot update status without a sync stream implementation');
478476
}
479-
480477
// Only assigning, don't call listeners for this test
481478
this.connectionManager.syncStreamImplementation!.syncStatus = new SyncStatus(status);
482479
this.updateAllStatuses(status);

packages/web/tests/multiple_instances.test.ts

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,12 @@ describe('Multiple Instances', { sequential: true }, () => {
132132
await connector1.uploadData(db);
133133
},
134134
identifier,
135+
retryDelayMs: 90_000, // Large delay to allow for testing
135136
db: db.database as WebDBAdapter
136137
};
137138

138139
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
139-
140+
await stream1.connect();
140141
// Generate the second streaming sync implementation
141142
const connector2 = new TestConnector();
142143
const syncOptions2: SharedWebStreamingSyncImplementationOptions = {
@@ -188,20 +189,25 @@ describe('Multiple Instances', { sequential: true }, () => {
188189
triggerUpload1 = resolve;
189190
});
190191

191-
// Create the first streaming client
192-
const stream1 = new SharedWebStreamingSyncImplementation({
192+
const sharedSyncOptions = {
193193
adapter: new SqliteBucketStorage(db.database, new Mutex()),
194194
remote: new WebRemote(connector1),
195-
uploadCrud: async () => {
196-
triggerUpload1();
197-
connector1.uploadData(db);
198-
},
199195
db: db.database as WebDBAdapter,
200196
identifier,
201-
retryDelayMs: 100,
197+
// The large delay here allows us to test between connection retries
198+
retryDelayMs: 90_000,
202199
flags: {
203200
broadcastLogs: true
204201
}
202+
};
203+
204+
// Create the first streaming client
205+
const stream1 = new SharedWebStreamingSyncImplementation({
206+
...sharedSyncOptions,
207+
uploadCrud: async () => {
208+
triggerUpload1();
209+
connector1.uploadData(db);
210+
}
205211
});
206212

207213
// Generate the second streaming sync implementation
@@ -216,18 +222,11 @@ describe('Multiple Instances', { sequential: true }, () => {
216222
});
217223

218224
const stream2 = new SharedWebStreamingSyncImplementation({
219-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
220-
remote: new WebRemote(connector1),
225+
...sharedSyncOptions,
221226
uploadCrud: async () => {
222227
triggerUpload2();
223228
connector2.uploadData(db);
224-
},
225-
identifier,
226-
retryDelayMs: 100,
227-
flags: {
228-
broadcastLogs: true
229-
},
230-
db: db.database as WebDBAdapter
229+
}
231230
});
232231

233232
// Waits for the stream to be marked as connected
@@ -243,7 +242,9 @@ describe('Multiple Instances', { sequential: true }, () => {
243242
});
244243

245244
// hack to set the status to connected for tests
246-
(stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true }));
245+
await stream1.connect();
246+
// Hack, set the status to connected in order to trigger the upload
247+
await (stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true }));
247248

248249
// The status in the second stream client should be updated
249250
await stream2UpdatedPromise;
@@ -256,7 +257,6 @@ describe('Multiple Instances', { sequential: true }, () => {
256257
257258
]);
258259

259-
// Manual trigger since tests don't entirely configure watches for ps_crud
260260
stream1.triggerCrudUpload();
261261
// The second connector should be called to upload
262262
await upload2TriggeredPromise;
@@ -265,14 +265,21 @@ describe('Multiple Instances', { sequential: true }, () => {
265265
expect(spy2).toHaveBeenCalledOnce();
266266

267267
// Close the second client, leaving only the first one
268+
/**
269+
* This test is a bit hacky. If we dispose the second client, the shared sync worker
270+
* will try and reconnect, but we don't actually want it to do that since that connection attempt
271+
* will fail and it will report as `connected:false`.
272+
* We can hack as disconnected for now.
273+
*/
268274
await stream2.dispose();
269275

276+
// Hack, set the status to connected in order to trigger the upload
277+
await (stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true }));
270278
stream1.triggerCrudUpload();
271279
// It should now upload from the first client
272280
await upload1TriggeredPromise;
273281

274282
expect(spy1).toHaveBeenCalledOnce();
275-
276283
await stream1.dispose();
277284
});
278285
});

0 commit comments

Comments
 (0)