Skip to content

Commit 2480604

Browse files
committed
Port sync streams API
1 parent afca6f5 commit 2480604

File tree

9 files changed

+435
-65
lines changed

9 files changed

+435
-65
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ import {
99
UpdateNotification,
1010
isBatchedUpdateNotification
1111
} from '../db/DBAdapter.js';
12-
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
13-
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
12+
import { SyncStatus } from '../db/crud/SyncStatus.js';
1413
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1514
import { Schema } from '../db/schema/Schema.js';
1615
import { BaseObserver } from '../utils/BaseObserver.js';
1716
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1817
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
19-
import { ConnectionManager } from './ConnectionManager.js';
18+
import { ConnectionManager, CreateSyncImplementationOptions } from './ConnectionManager.js';
2019
import { CustomQuery } from './CustomQuery.js';
2120
import { ArrayQueryDefinition, Query } from './Query.js';
2221
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
@@ -40,6 +39,7 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
4039
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
4140
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
4241
import { WatchedQueryComparator } from './watched/processors/comparators.js';
42+
import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js';
4343

4444
export interface DisconnectAndClearOptions {
4545
/** When set to false, data in local-only tables is preserved. */
@@ -237,9 +237,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
237237

238238
// Start async init
239239
this.connectionManager = new ConnectionManager({
240+
firstStatusMatching: (predicate) => this.waitForStatus(predicate),
241+
resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(),
242+
rustSubscriptionsCommand: async (payload) => {
243+
await this.writeTransaction((tx) => {
244+
return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]);
245+
});
246+
},
240247
createSyncImplementation: async (connector, options) => {
241248
await this.waitForReady();
242-
243249
return this.runExclusive(async () => {
244250
const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options));
245251
const onDispose = sync.registerListener({
@@ -304,7 +310,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
304310

305311
protected abstract generateSyncStreamImplementation(
306312
connector: PowerSyncBackendConnector,
307-
options: RequiredAdditionalConnectionOptions
313+
options: CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions
308314
): StreamingSyncImplementation;
309315

310316
protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
@@ -338,13 +344,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
338344
? (status: SyncStatus) => status.hasSynced
339345
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;
340346

341-
if (statusMatches(this.currentStatus)) {
347+
return this.waitForStatus(statusMatches, signal);
348+
}
349+
350+
private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
351+
if (predicate(this.currentStatus)) {
342352
return;
343353
}
354+
344355
return new Promise((resolve) => {
345356
const dispose = this.registerListener({
346357
statusChanged: (status) => {
347-
if (statusMatches(status)) {
358+
if (predicate(status)) {
348359
dispose();
349360
resolve();
350361
}
@@ -373,7 +384,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
373384
await this.bucketStorageAdapter.init();
374385
await this._loadVersion();
375386
await this.updateSchema(this.options.schema);
376-
await this.updateHasSynced();
387+
await this.resolveOfflineSyncStatus();
377388
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
378389
this.ready = true;
379390
this.iterateListeners((cb) => cb.initialized?.());
@@ -403,30 +414,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
403414
}
404415
}
405416

406-
protected async updateHasSynced() {
407-
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
408-
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
409-
);
410-
let lastCompleteSync: Date | undefined;
411-
const priorityStatusEntries: SyncPriorityStatus[] = [];
417+
protected async resolveOfflineSyncStatus() {
418+
const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r');
419+
const parsed = JSON.parse(result.r) as CoreSyncStatus;
412420

413-
for (const { priority, last_synced_at } of result) {
414-
const parsedDate = new Date(last_synced_at + 'Z');
415-
416-
if (priority == FULL_SYNC_PRIORITY) {
417-
// This lowest-possible priority represents a complete sync.
418-
lastCompleteSync = parsedDate;
419-
} else {
420-
priorityStatusEntries.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
421-
}
422-
}
423-
424-
const hasSynced = lastCompleteSync != null;
425421
const updatedStatus = new SyncStatus({
426422
...this.currentStatus.toJSON(),
427-
hasSynced,
428-
priorityStatusEntries,
429-
lastSyncedAt: lastCompleteSync
423+
...coreStatusToJs(parsed)
430424
});
431425

432426
if (!updatedStatus.isEqual(this.currentStatus)) {
@@ -471,7 +465,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
471465
}
472466

473467
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
474-
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
468+
protected resolvedConnectionOptions(
469+
options: CreateSyncImplementationOptions
470+
): CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions {
475471
return {
476472
...options,
477473
retryDelayMs:
@@ -540,6 +536,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
540536
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
541537
}
542538

539+
syncStream(name: string, params?: Record<string, any>) {}
540+
543541
/**
544542
* Close the database, releasing resources.
545543
*

packages/common/src/client/ConnectionManager.ts

Lines changed: 141 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,18 @@ import { ILogger } from 'js-logger';
22
import { BaseListener, BaseObserver } from '../utils/BaseObserver.js';
33
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
44
import {
5+
AdditionalConnectionOptions,
56
InternalConnectionOptions,
6-
StreamingSyncImplementation
7+
StreamingSyncImplementation,
8+
SubscribedStream
79
} from './sync/stream/AbstractStreamingSyncImplementation.js';
10+
import {
11+
SyncStream,
12+
SyncStreamDescription,
13+
SyncStreamSubscribeOptions,
14+
SyncStreamSubscription
15+
} from './sync/sync-streams.js';
16+
import { SyncStatus } from '../db/crud/SyncStatus.js';
817

918
/**
1019
* @internal
@@ -18,14 +27,26 @@ export interface ConnectionManagerSyncImplementationResult {
1827
onDispose: () => Promise<void> | void;
1928
}
2029

30+
/**
31+
* The subset of {@link AbstractStreamingSyncImplementationOptions} managed by the connection manager.
32+
*
33+
* @internal
34+
*/
35+
export interface CreateSyncImplementationOptions extends AdditionalConnectionOptions {
36+
subscriptions: SubscribedStream[];
37+
}
38+
2139
/**
2240
* @internal
2341
*/
2442
export interface ConnectionManagerOptions {
2543
createSyncImplementation(
2644
connector: PowerSyncBackendConnector,
27-
options: InternalConnectionOptions
45+
options: CreateSyncImplementationOptions
2846
): Promise<ConnectionManagerSyncImplementationResult>;
47+
firstStatusMatching(predicate: (status: SyncStatus) => any): Promise<void>;
48+
resolveOfflineSyncStatus(): Promise<void>;
49+
rustSubscriptionsCommand(payload: any): Promise<void>;
2950
logger: ILogger;
3051
}
3152

@@ -76,6 +97,13 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
7697
*/
7798
protected syncDisposer: (() => Promise<void> | void) | null;
7899

100+
/**
101+
* Subscriptions managed in this connection manager.
102+
*
103+
* On the web, these local subscriptions are merged across tabs by a shared worker.
104+
*/
105+
private locallyActiveSubscriptions = new Map<string, ActiveSubscription>();
106+
79107
constructor(protected options: ConnectionManagerOptions) {
80108
super();
81109
this.connectingPromise = null;
@@ -102,7 +130,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
102130
// Update pending options to the latest values
103131
this.pendingConnectionOptions = {
104132
connector,
105-
options: options ?? {}
133+
options
106134
};
107135

108136
// Disconnecting here provides aborting in progress connection attempts.
@@ -169,7 +197,11 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
169197
appliedOptions = options;
170198

171199
this.pendingConnectionOptions = null;
172-
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);
200+
201+
const { sync, onDispose } = await this.options.createSyncImplementation(connector, {
202+
subscriptions: this.activeStreams,
203+
...options
204+
});
173205
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
174206
this.syncStreamImplementation = sync;
175207
this.syncDisposer = onDispose;
@@ -236,4 +268,109 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
236268
await sync?.dispose();
237269
await disposer?.();
238270
}
271+
272+
stream(name: string, parameters: Record<string, any> | null): SyncStream {
273+
const desc = { name, parameters } satisfies SyncStreamDescription;
274+
275+
const waitForFirstSync = () => {
276+
return this.options.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced ?? false);
277+
};
278+
279+
return {
280+
...desc,
281+
subscribe: async (options?: SyncStreamSubscribeOptions) => {
282+
// NOTE: We also run this command if a subscription already exists, because this increases the expiry date
283+
// (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl).
284+
await this.options.rustSubscriptionsCommand({
285+
subscribe: {
286+
stream: {
287+
name,
288+
params: parameters
289+
},
290+
ttl: options?.ttl,
291+
priority: options?.priority
292+
}
293+
});
294+
295+
if (!this.pendingConnectionOptions) {
296+
// We're not connected. So, update the offline sync status to reflect the new subscription.
297+
// (With an active iteration, the sync client would include it in its state).
298+
await this.options.resolveOfflineSyncStatus();
299+
}
300+
301+
const key = `${name}|${JSON.stringify(parameters)}`;
302+
let subscription = this.locallyActiveSubscriptions.get(key);
303+
if (subscription == null) {
304+
const clearSubscription = () => {
305+
this.locallyActiveSubscriptions.delete(key);
306+
this.subscriptionsMayHaveChanged();
307+
};
308+
309+
subscription = new ActiveSubscription(name, parameters, waitForFirstSync, clearSubscription);
310+
this.locallyActiveSubscriptions.set(key, subscription);
311+
this.subscriptionsMayHaveChanged();
312+
}
313+
314+
return new SyncStreamSubscriptionHandle(subscription);
315+
},
316+
unsubscribeAll: async () => {
317+
await this.options.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } });
318+
this.subscriptionsMayHaveChanged();
319+
}
320+
};
321+
}
322+
323+
private get activeStreams() {
324+
return [...this.locallyActiveSubscriptions.values().map((a) => ({ name: a.name, params: a.parameters }))];
325+
}
326+
327+
private subscriptionsMayHaveChanged() {
328+
if (this.syncStreamImplementation) {
329+
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
330+
}
331+
}
332+
}
333+
334+
class ActiveSubscription {
335+
refcount: number = 0;
336+
337+
constructor(
338+
readonly name: string,
339+
readonly parameters: Record<string, any> | null,
340+
readonly waitForFirstSync: () => Promise<void>,
341+
private clearSubscription: () => void
342+
) {}
343+
344+
decrementRefCount() {
345+
this.refcount--;
346+
if (this.refcount == 0) {
347+
this.clearSubscription();
348+
}
349+
}
239350
}
351+
352+
class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
353+
constructor(readonly subscription: ActiveSubscription) {
354+
_finalizer?.register(this, subscription);
355+
}
356+
357+
get name() {
358+
return this.subscription.name;
359+
}
360+
361+
get parameters() {
362+
return this.subscription.parameters;
363+
}
364+
365+
waitForFirstSync(): Promise<void> {
366+
return this.subscription.waitForFirstSync();
367+
}
368+
369+
unsubscribe(): void {
370+
_finalizer?.unregister(this);
371+
this.subscription.decrementRefCount();
372+
}
373+
}
374+
375+
const _finalizer =
376+
FinalizationRegistry != null ? new FinalizationRegistry<ActiveSubscription>((sub) => sub.decrementRefCount()) : null;

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ export enum PowerSyncControlCommand {
6565
STOP = 'stop',
6666
START = 'start',
6767
NOTIFY_TOKEN_REFRESHED = 'refreshed_token',
68-
NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload'
68+
NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload',
69+
UPDATE_SUBSCRIPTIONS = 'update_subscriptions'
6970
}
7071

7172
export interface BucketStorageListener extends BaseListener {

0 commit comments

Comments
 (0)