From 92d2c379707cc78fbf7ee1802b3088ae9df3c814 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 3 Sep 2025 16:29:30 +0200 Subject: [PATCH 01/21] Port sync streams API --- .../src/client/AbstractPowerSyncDatabase.ts | 58 ++++--- .../common/src/client/ConnectionManager.ts | 145 +++++++++++++++++- .../sync/bucket/BucketStorageAdapter.ts | 3 +- .../AbstractStreamingSyncImplementation.ts | 67 ++++---- .../client/sync/stream/core-instruction.ts | 43 +++++- .../common/src/client/sync/sync-streams.ts | 107 +++++++++++++ packages/common/src/db/crud/SyncStatus.ts | 72 ++++++++- packages/common/src/index.ts | 1 + packages/node/src/db/PowerSyncDatabase.ts | 4 +- 9 files changed, 435 insertions(+), 65 deletions(-) create mode 100644 packages/common/src/client/sync/sync-streams.ts diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 43ad99ec4..03a4fc17e 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -9,14 +9,13 @@ import { UpdateNotification, isBatchedUpdateNotification } from '../db/DBAdapter.js'; -import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; -import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js'; +import { SyncStatus } from '../db/crud/SyncStatus.js'; import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js'; -import { ConnectionManager } from './ConnectionManager.js'; +import { ConnectionManager, CreateSyncImplementationOptions } from './ConnectionManager.js'; import { CustomQuery } from './CustomQuery.js'; import { ArrayQueryDefinition, Query } from './Query.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; @@ -40,6 +39,7 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js'; import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js'; import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js'; import { WatchedQueryComparator } from './watched/processors/comparators.js'; +import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -237,9 +237,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver this.waitForStatus(predicate), + resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(), + rustSubscriptionsCommand: async (payload) => { + await this.writeTransaction((tx) => { + return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]); + }); + }, createSyncImplementation: async (connector, options) => { await this.waitForReady(); - return this.runExclusive(async () => { const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options)); const onDispose = sync.registerListener({ @@ -304,7 +310,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver status.hasSynced : (status: SyncStatus) => status.statusForPriority(priority).hasSynced; - if (statusMatches(this.currentStatus)) { + return this.waitForStatus(statusMatches, signal); + } + + private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise { + if (predicate(this.currentStatus)) { return; } + return new Promise((resolve) => { const dispose = this.registerListener({ statusChanged: (status) => { - if (statusMatches(status)) { + if (predicate(status)) { dispose(); resolve(); } @@ -373,7 +384,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver cb.initialized?.()); @@ -403,30 +414,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver( - 'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC' - ); - let lastCompleteSync: Date | undefined; - const priorityStatusEntries: SyncPriorityStatus[] = []; + protected async resolveOfflineSyncStatus() { + const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r'); + const parsed = JSON.parse(result.r) as CoreSyncStatus; - for (const { priority, last_synced_at } of result) { - const parsedDate = new Date(last_synced_at + 'Z'); - - if (priority == FULL_SYNC_PRIORITY) { - // This lowest-possible priority represents a complete sync. - lastCompleteSync = parsedDate; - } else { - priorityStatusEntries.push({ priority, hasSynced: true, lastSyncedAt: parsedDate }); - } - } - - const hasSynced = lastCompleteSync != null; const updatedStatus = new SyncStatus({ ...this.currentStatus.toJSON(), - hasSynced, - priorityStatusEntries, - lastSyncedAt: lastCompleteSync + ...coreStatusToJs(parsed) }); if (!updatedStatus.isEqual(this.currentStatus)) { @@ -471,7 +465,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver l.statusChanged?.(this.currentStatus)); } + syncStream(name: string, params?: Record) {} + /** * Close the database, releasing resources. * diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index b322d4c33..6fa3a0f43 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -2,9 +2,18 @@ import { ILogger } from 'js-logger'; import { BaseListener, BaseObserver } from '../utils/BaseObserver.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; import { + AdditionalConnectionOptions, InternalConnectionOptions, - StreamingSyncImplementation + StreamingSyncImplementation, + SubscribedStream } from './sync/stream/AbstractStreamingSyncImplementation.js'; +import { + SyncStream, + SyncStreamDescription, + SyncStreamSubscribeOptions, + SyncStreamSubscription +} from './sync/sync-streams.js'; +import { SyncStatus } from '../db/crud/SyncStatus.js'; /** * @internal @@ -18,14 +27,26 @@ export interface ConnectionManagerSyncImplementationResult { onDispose: () => Promise | void; } +/** + * The subset of {@link AbstractStreamingSyncImplementationOptions} managed by the connection manager. + * + * @internal + */ +export interface CreateSyncImplementationOptions extends AdditionalConnectionOptions { + subscriptions: SubscribedStream[]; +} + /** * @internal */ export interface ConnectionManagerOptions { createSyncImplementation( connector: PowerSyncBackendConnector, - options: InternalConnectionOptions + options: CreateSyncImplementationOptions ): Promise; + firstStatusMatching(predicate: (status: SyncStatus) => any): Promise; + resolveOfflineSyncStatus(): Promise; + rustSubscriptionsCommand(payload: any): Promise; logger: ILogger; } @@ -76,6 +97,13 @@ export class ConnectionManager extends BaseObserver { */ protected syncDisposer: (() => Promise | void) | null; + /** + * Subscriptions managed in this connection manager. + * + * On the web, these local subscriptions are merged across tabs by a shared worker. + */ + private locallyActiveSubscriptions = new Map(); + constructor(protected options: ConnectionManagerOptions) { super(); this.connectingPromise = null; @@ -102,7 +130,7 @@ export class ConnectionManager extends BaseObserver { // Update pending options to the latest values this.pendingConnectionOptions = { connector, - options: options ?? {} + options }; // Disconnecting here provides aborting in progress connection attempts. @@ -169,7 +197,11 @@ export class ConnectionManager extends BaseObserver { appliedOptions = options; this.pendingConnectionOptions = null; - const { sync, onDispose } = await this.options.createSyncImplementation(connector, options); + + const { sync, onDispose } = await this.options.createSyncImplementation(connector, { + subscriptions: this.activeStreams, + ...options + }); this.iterateListeners((l) => l.syncStreamCreated?.(sync)); this.syncStreamImplementation = sync; this.syncDisposer = onDispose; @@ -236,4 +268,109 @@ export class ConnectionManager extends BaseObserver { await sync?.dispose(); await disposer?.(); } + + stream(name: string, parameters: Record | null): SyncStream { + const desc = { name, parameters } satisfies SyncStreamDescription; + + const waitForFirstSync = () => { + return this.options.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced ?? false); + }; + + return { + ...desc, + subscribe: async (options?: SyncStreamSubscribeOptions) => { + // NOTE: We also run this command if a subscription already exists, because this increases the expiry date + // (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl). + await this.options.rustSubscriptionsCommand({ + subscribe: { + stream: { + name, + params: parameters + }, + ttl: options?.ttl, + priority: options?.priority + } + }); + + if (!this.pendingConnectionOptions) { + // We're not connected. So, update the offline sync status to reflect the new subscription. + // (With an active iteration, the sync client would include it in its state). + await this.options.resolveOfflineSyncStatus(); + } + + const key = `${name}|${JSON.stringify(parameters)}`; + let subscription = this.locallyActiveSubscriptions.get(key); + if (subscription == null) { + const clearSubscription = () => { + this.locallyActiveSubscriptions.delete(key); + this.subscriptionsMayHaveChanged(); + }; + + subscription = new ActiveSubscription(name, parameters, waitForFirstSync, clearSubscription); + this.locallyActiveSubscriptions.set(key, subscription); + this.subscriptionsMayHaveChanged(); + } + + return new SyncStreamSubscriptionHandle(subscription); + }, + unsubscribeAll: async () => { + await this.options.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } }); + this.subscriptionsMayHaveChanged(); + } + }; + } + + private get activeStreams() { + return [...this.locallyActiveSubscriptions.values().map((a) => ({ name: a.name, params: a.parameters }))]; + } + + private subscriptionsMayHaveChanged() { + if (this.syncStreamImplementation) { + this.syncStreamImplementation.updateSubscriptions(this.activeStreams); + } + } +} + +class ActiveSubscription { + refcount: number = 0; + + constructor( + readonly name: string, + readonly parameters: Record | null, + readonly waitForFirstSync: () => Promise, + private clearSubscription: () => void + ) {} + + decrementRefCount() { + this.refcount--; + if (this.refcount == 0) { + this.clearSubscription(); + } + } } + +class SyncStreamSubscriptionHandle implements SyncStreamSubscription { + constructor(readonly subscription: ActiveSubscription) { + _finalizer?.register(this, subscription); + } + + get name() { + return this.subscription.name; + } + + get parameters() { + return this.subscription.parameters; + } + + waitForFirstSync(): Promise { + return this.subscription.waitForFirstSync(); + } + + unsubscribe(): void { + _finalizer?.unregister(this); + this.subscription.decrementRefCount(); + } +} + +const _finalizer = + FinalizationRegistry != null ? new FinalizationRegistry((sub) => sub.decrementRefCount()) : null; diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index 3c24c059c..e4f9b2495 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -65,7 +65,8 @@ export enum PowerSyncControlCommand { STOP = 'stop', START = 'start', NOTIFY_TOKEN_REFRESHED = 'refreshed_token', - NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload' + NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload', + UPDATE_SUBSCRIPTIONS = 'update_subscriptions' } export interface BucketStorageListener extends BaseListener { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index ef282030b..c3a8f6ba5 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -17,7 +17,7 @@ import { import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; -import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js'; +import { coreStatusToJs, EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js'; import { BucketRequest, CrudUploadNotification, @@ -97,6 +97,7 @@ export interface LockOptions { export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions { adapter: BucketStorageAdapter; + subscriptions: SubscribedStream[]; uploadCrud: () => Promise; /** * An identifier for which PowerSync DB this sync implementation is @@ -155,6 +156,13 @@ export interface BaseConnectionOptions { */ params?: Record; + /** + * Whether to include streams that have `auto_subscribe: true` in their definition. + * + * This defaults to `true`. + */ + includeDefaultStreams?: boolean; + /** * The serialized schema - mainly used to forward information about raw tables to the sync client. */ @@ -200,6 +208,7 @@ export interface StreamingSyncImplementation waitForReady(): Promise; waitForStatus(status: SyncStatusOptions): Promise; waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise; + updateSubscriptions(subscriptions: SubscribedStream[]): void; } export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000; @@ -217,7 +226,13 @@ export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptio clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION, fetchStrategy: FetchStrategy.Buffered, params: {}, - serializedSchema: undefined + serializedSchema: undefined, + includeDefaultStreams: true +}; + +export type SubscribedStream = { + name: string; + params: Record | null; }; // The priority we assume when we receive checkpoint lines where no priority is set. @@ -239,9 +254,11 @@ export abstract class AbstractStreamingSyncImplementation protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; protected logger: ILogger; + private activeStreams: SubscribedStream[]; private isUploadingCrud: boolean = false; private notifyCompletedUploads?: () => void; + private handleActiveStreamsChange?: () => void; syncStatus: SyncStatus; triggerCrudUpload: () => void; @@ -249,6 +266,7 @@ export abstract class AbstractStreamingSyncImplementation constructor(options: AbstractStreamingSyncImplementationOptions) { super(); this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options }; + this.activeStreams = options.subscriptions; this.logger = options.logger ?? Logger.get('PowerSyncStream'); this.syncStatus = new SyncStatus({ @@ -1002,29 +1020,7 @@ The next upload iteration will be delayed.`); break; } } else if ('UpdateSyncStatus' in instruction) { - function coreStatusToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatus { - return { - priority: status.priority, - hasSynced: status.has_synced ?? undefined, - lastSyncedAt: status?.last_synced_at != null ? new Date(status!.last_synced_at! * 1000) : undefined - }; - } - - const info = instruction.UpdateSyncStatus.status; - const coreCompleteSync = info.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); - const completeSync = coreCompleteSync != null ? coreStatusToJs(coreCompleteSync) : null; - - syncImplementation.updateSyncStatus({ - connected: info.connected, - connecting: info.connecting, - dataFlow: { - downloading: info.downloading != null, - downloadProgress: info.downloading?.buckets - }, - lastSyncedAt: completeSync?.lastSyncedAt, - hasSynced: completeSync?.hasSynced, - priorityStatusEntries: info.priority_status.map(coreStatusToJs) - }); + syncImplementation.updateSyncStatus(coreStatusToJs(instruction.UpdateSyncStatus.status)); } else if ('EstablishSyncStream' in instruction) { if (receivingLines != null) { // Already connected, this shouldn't happen during a single iteration. @@ -1068,7 +1064,11 @@ The next upload iteration will be delayed.`); } try { - const options: any = { parameters: resolvedOptions.params }; + const options: any = { + parameters: resolvedOptions.params, + active_streams: this.activeStreams, + include_defaults: resolvedOptions.includeDefaultStreams + }; if (resolvedOptions.serializedSchema) { options.schema = resolvedOptions.serializedSchema; } @@ -1080,9 +1080,17 @@ The next upload iteration will be delayed.`); controlInvocations.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); } }; + this.handleActiveStreamsChange = () => { + if (controlInvocations && !controlInvocations?.closed) { + controlInvocations.enqueueData({ + command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS, + payload: JSON.stringify({ active_streams: this.activeStreams }) + }); + } + }; await receivingLines; } finally { - this.notifyCompletedUploads = undefined; + this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined; await stop(); } } @@ -1209,6 +1217,11 @@ The next upload iteration will be delayed.`); timeoutId = setTimeout(endDelay, retryDelayMs); }); } + + updateSubscriptions(subscriptions: SubscribedStream[]): void { + this.activeStreams = subscriptions; + this.handleActiveStreamsChange?.(); + } } interface EnqueuedCommand { diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index db1f56e81..2da92ac5b 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -1,4 +1,6 @@ import { StreamingSyncRequest } from './streaming-sync-types.js'; +import * as sync_status from '../../../db/crud/SyncStatus.js'; +import { FULL_SYNC_PRIORITY } from '../../../db/crud/SyncProgress.js'; /** * An internal instruction emitted by the sync client in the core extension in response to the JS @@ -9,7 +11,7 @@ export type Instruction = | { UpdateSyncStatus: UpdateSyncStatus } | { EstablishSyncStream: EstablishSyncStream } | { FetchCredentials: FetchCredentials } - | { CloseSyncStream: any } + | { CloseSyncStream: { hide_disconnect: boolean } } | { FlushFileSystem: any } | { DidCompleteSync: any }; @@ -31,6 +33,20 @@ export interface CoreSyncStatus { connecting: boolean; priority_status: SyncPriorityStatus[]; downloading: DownloadProgress | null; + streams: CoreStreamSubscription[]; +} + +/// An `ActiveStreamSubscription` from the core extension + serialized progress information. +export interface CoreStreamSubscription { + progress: { total: number; downloaded: number }; + name: string; + parameters: any; + priority: number | null; + active: boolean; + is_default: boolean; + has_explicit_subscription: boolean; + expires_at: number | null; + last_synced_at: number | null; } export interface SyncPriorityStatus { @@ -53,3 +69,28 @@ export interface BucketProgress { export interface FetchCredentials { did_expire: boolean; } + +function priorityToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatus { + return { + priority: status.priority, + hasSynced: status.has_synced ?? undefined, + lastSyncedAt: status?.last_synced_at != null ? new Date(status!.last_synced_at! * 1000) : undefined + }; +} + +export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOptions { + const coreCompleteSync = status.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); + const completeSync = coreCompleteSync != null ? priorityToJs(coreCompleteSync) : null; + + return { + connected: status.connected, + connecting: status.connecting, + dataFlow: { + downloading: status.downloading != null, + downloadProgress: status.downloading?.buckets + }, + lastSyncedAt: completeSync?.lastSyncedAt, + hasSynced: completeSync?.hasSynced, + priorityStatusEntries: status.priority_status.map(priorityToJs) + }; +} diff --git a/packages/common/src/client/sync/sync-streams.ts b/packages/common/src/client/sync/sync-streams.ts new file mode 100644 index 000000000..ad413b41e --- /dev/null +++ b/packages/common/src/client/sync/sync-streams.ts @@ -0,0 +1,107 @@ +import { AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js'; + +/** + * A description of a sync stream, consisting of its {@link name} and the {@link parameters} used when subscribing. + */ +export interface SyncStreamDescription { + /** + * The name of the stream as it appears in the stream definition for the PowerSync service. + */ + name: string; + + /** + * The parameters used to subscribe to the stream, if any. + * + * The same stream can be subscribed to multiple times with different parameters. + */ + parameters: Record | null; +} + +/** + * Information about a subscribed sync stream. + * + * This includes the {@link SyncStreamDescription}, along with information about the current sync status. + */ +export interface SyncSubscriptionDescription extends SyncStreamDescription { + active: boolean; + /** + * Whether this stream subscription is included by default, regardless of whether the stream has explicitly been + * subscribed to or not. + * + * It's possible for both {@link isDefault} and {@link hasExplicitSubscription} to be true at the same time - this + * happens when a default stream was subscribed explicitly. + */ + isDefault: boolean; + /** + * Whether this stream has been subscribed to explicitly. + * + * It's possible for both {@link isDefault} and {@link hasExplicitSubscription} to be true at the same time - this + * happens when a default stream was subscribed explicitly. + */ + hasExplicitSubscription: boolean; + /** + * For sync streams that have a time-to-live, the current time at which the stream would expire if not subscribed to + * again. + */ + expiresAt: Date | null; + /** + * Whether this stream subscription has been synced at least once. + */ + hasSynced: boolean; + /** + * If {@link hasSynced} is true, the last time data from this stream has been synced. + */ + lastSyncedAt: Date | null; +} + +export interface SyncStreamSubscribeOptions { + /** + * A "time to live" for this stream subscription, in seconds. + * + * The TTL control when a stream gets evicted after not having an active {@link SyncStreamSubscription} object + * attached to it. + */ + ttl?: number; + /** + * A priority to assign to this subscription. This overrides the default priority that may have been set on streams. + * + * For details on priorities, see [priotized sync](https://docs.powersync.com/usage/use-case-examples/prioritized-sync). + */ + priority?: 0 | 1 | 2 | 3; +} + +/** + * A handle to a {@link SyncStreamDescription} that allows subscribing to the stream. + * + * To obtain an instance of {@link SyncStream}, call {@link AbstractPowerSyncDatabase.syncStream}. + */ +export interface SyncStream extends SyncStreamDescription { + /** + * Adds a subscription to this stream, requesting it to be included when connecting to the sync service. + * + * You should keep a reference to the returned {@link SyncStreamSubscription} object along as you need data for that + * stream. As soon as {@link SyncStreamSubscription.unsubscribe} is called for all subscriptions on this stream + * (including subscriptions created on other tabs), the {@link SyncStreamSubscribeOptions.ttl} starts ticking and will + * eventually evict the stream (unless {@link subscribe} is called again). + */ + subscribe(options?: SyncStreamSubscribeOptions): Promise; + + /** + * Clears all subscriptions attached to this stream and resets the TTL for the stream. + * + * This is a potentially dangerous operations, as it interferes with other stream subscriptions. + */ + unsubscribeAll(): Promise; +} + +export interface SyncStreamSubscription extends SyncStreamDescription { + /** + * A promise that resolves once data from in this sync stream has been synced and applied. + */ + waitForFirstSync(): Promise; + + /** + * Removes this stream subscription. + */ + unsubscribe(): void; +} diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index 4c687e18d..dd6fcf505 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -1,5 +1,7 @@ +import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js'; import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js'; -import { InternalProgressInformation, SyncProgress } from './SyncProgress.js'; +import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js'; +import { SyncStreamDescription, SyncSubscriptionDescription } from 'src/client/sync/sync-streams.js'; export type SyncDataFlowStatus = Partial<{ downloading: boolean; @@ -21,6 +23,7 @@ export type SyncDataFlowStatus = Partial<{ * Please use the {@link SyncStatus#downloadProgress} property to track sync progress. */ downloadProgress: InternalProgressInformation | null; + internalStreamSubscriptions: CoreStreamSubscription[] | null; }>; export interface SyncPriorityStatus { @@ -114,6 +117,28 @@ export class SyncStatus { ); } + /** + * All sync streams currently being tracked in teh database. + * + * This returns null when the database is currently being opened and we don't have reliable information about all + * included streams yet. + */ + get subscriptions(): SyncStreamStatus[] | undefined { + return this.options.dataFlow?.internalStreamSubscriptions?.map((core) => new SyncStreamStatusView(this, core)); + } + + /** + * If the `stream` appears in {@link subscriptions}, returns the current status for that stream. + */ + statusFor(stream: SyncStreamDescription): SyncStreamStatus | undefined { + const asJson = JSON.stringify(stream.parameters); + const raw = this.options.dataFlow?.internalStreamSubscriptions?.find( + (r) => r.name == stream.name && asJson == JSON.stringify(r.parameters) + ); + + return raw && new SyncStreamStatusView(this, raw); + } + /** * Provides sync status information for all bucket priorities, sorted by priority (highest first). * @@ -232,3 +257,48 @@ export class SyncStatus { return b.priority - a.priority; // Reverse because higher priorities have lower numbers } } + +/** + * Information about a sync stream subscription. + */ +export interface SyncStreamStatus { + progress: ProgressWithOperations | null; + subscription: SyncSubscriptionDescription; + priority: number | null; +} + +class SyncStreamStatusView implements SyncStreamStatus { + subscription: SyncSubscriptionDescription; + + constructor( + private status: SyncStatus, + private core: CoreStreamSubscription + ) { + this.subscription = { + name: core.name, + parameters: core.parameters, + active: core.active, + isDefault: core.is_default, + hasExplicitSubscription: core.has_explicit_subscription, + expiresAt: core.expires_at != null ? new Date(core.expires_at) : null, + hasSynced: core.last_synced_at != null, + lastSyncedAt: core.last_synced_at != null ? new Date(core.last_synced_at) : null + }; + } + + get progress() { + if (this.status.dataFlowStatus.downloadProgress == null) { + // Don't make download progress public if we're not currently downloading. + return null; + } + + const { total, downloaded } = this.core.progress; + const progress = total == 0 ? 0.0 : downloaded / total; + + return { totalOperations: total, downloadedOperations: downloaded, downloadedFraction: progress }; + } + + get priority() { + return this.core.priority; + } +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 2e886155d..7173a2ece 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -18,6 +18,7 @@ export * from './client/sync/bucket/SyncDataBucket.js'; export * from './client/sync/stream/AbstractRemote.js'; export * from './client/sync/stream/AbstractStreamingSyncImplementation.js'; export * from './client/sync/stream/streaming-sync-types.js'; +export * from './client/sync/sync-streams.js'; export * from './client/ConnectionManager.js'; export { ProgressWithOperations, SyncProgress } from './db/crud/SyncProgress.js'; diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index 4ac7969b6..7f10db214 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -4,6 +4,7 @@ import { AbstractStreamingSyncImplementation, AdditionalConnectionOptions, BucketStorageAdapter, + CreateSyncImplementationOptions, DBAdapter, PowerSyncBackendConnector, PowerSyncConnectionOptions, @@ -76,7 +77,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { protected generateSyncStreamImplementation( connector: PowerSyncBackendConnector, - options: NodeAdditionalConnectionOptions + options: CreateSyncImplementationOptions & NodeAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { const logger = this.logger; const remote = new NodeRemote(connector, logger, { @@ -85,6 +86,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { }); return new NodeStreamingSyncImplementation({ + subscriptions: options.subscriptions, adapter: this.bucketStorageAdapter, remote, uploadCrud: async () => { From 9f03c3b6cebc6d721cae0e5086ed3caecabfe4cb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 3 Sep 2025 18:09:00 +0200 Subject: [PATCH 02/21] Add tests --- .../src/client/AbstractPowerSyncDatabase.ts | 5 +- .../common/src/client/ConnectionManager.ts | 2 +- .../sync/bucket/BucketStorageAdapter.ts | 2 + .../AbstractStreamingSyncImplementation.ts | 45 ++++-- .../client/sync/stream/core-instruction.ts | 3 +- packages/common/src/db/crud/SyncStatus.ts | 2 +- packages/node/tests/stream.test.ts | 153 ++++++++++++++++++ packages/node/tests/sync.test.ts | 11 +- packages/node/tests/utils.ts | 47 +++++- 9 files changed, 242 insertions(+), 28 deletions(-) create mode 100644 packages/node/tests/stream.test.ts diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 03a4fc17e..52909187f 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -40,6 +40,7 @@ import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/Watch import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js'; import { WatchedQueryComparator } from './watched/processors/comparators.js'; import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js'; +import { SyncStream } from './sync/sync-streams.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -536,7 +537,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver l.statusChanged?.(this.currentStatus)); } - syncStream(name: string, params?: Record) {} + syncStream(name: string, params?: Record): SyncStream { + return this.connectionManager.stream(name, params ?? null); + } /** * Close the database, releasing resources. diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 6fa3a0f43..6a76cbbf0 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -292,7 +292,7 @@ export class ConnectionManager extends BaseObserver { } }); - if (!this.pendingConnectionOptions) { + if (!this.syncStreamImplementation) { // We're not connected. So, update the offline sync status to reflect the new subscription. // (With an active iteration, the sync client would include it in its state). await this.options.resolveOfflineSyncStatus(); diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index e4f9b2495..3ecd8848c 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -12,6 +12,7 @@ export interface Checkpoint { last_op_id: OpId; buckets: BucketChecksum[]; write_checkpoint?: string; + streams?: any[]; } export interface BucketState { @@ -49,6 +50,7 @@ export interface BucketChecksum { * Count of operations - informational only. */ count?: number; + subscriptions?: any; } export enum PSInternalTable { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index c3a8f6ba5..cf0e74252 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -548,11 +548,13 @@ The next upload iteration will be delayed.`); while (true) { this.updateSyncStatus({ connecting: true }); let shouldDelayRetry = true; + let result: RustIterationResult | null = null; + try { if (signal?.aborted) { break; } - await this.streamingSyncIteration(nestedAbortController.signal, options); + result = await this.streamingSyncIteration(nestedAbortController.signal, options); // Continue immediately, streamingSyncIteration will wait before completing if necessary. } catch (ex) { /** @@ -586,14 +588,16 @@ The next upload iteration will be delayed.`); nestedAbortController = new AbortController(); } - this.updateSyncStatus({ - connected: false, - connecting: true // May be unnecessary - }); + if (result?.immediateRestart != true) { + this.updateSyncStatus({ + connected: false, + connecting: true // May be unnecessary + }); - // On error, wait a little before retrying - if (shouldDelayRetry) { - await this.delayRetry(nestedAbortController.signal); + // On error, wait a little before retrying + if (shouldDelayRetry) { + await this.delayRetry(nestedAbortController.signal); + } } } } @@ -641,8 +645,11 @@ The next upload iteration will be delayed.`); } } - protected async streamingSyncIteration(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise { - await this.obtainLock({ + protected streamingSyncIteration( + signal: AbortSignal, + options?: PowerSyncConnectionOptions + ): Promise { + return this.obtainLock({ type: LockType.SYNC, signal, callback: async () => { @@ -655,9 +662,10 @@ The next upload iteration will be delayed.`); if (clientImplementation == SyncClientImplementation.JAVASCRIPT) { await this.legacyStreamingSyncIteration(signal, resolvedOptions); + return null; } else { await this.requireKeyFormat(true); - await this.rustSyncIteration(signal, resolvedOptions); + return await this.rustSyncIteration(signal, resolvedOptions); } } }); @@ -912,12 +920,16 @@ The next upload iteration will be delayed.`); return; } - private async rustSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) { + private async rustSyncIteration( + signal: AbortSignal, + resolvedOptions: RequiredPowerSyncConnectionOptions + ): Promise { const syncImplementation = this; const adapter = this.options.adapter; const remote = this.options.remote; let receivingLines: Promise | null = null; let hadSyncLine = false; + let hideDisconnectOnRestart = false; if (signal.aborted) { throw new AbortOperation('Connection request has been aborted'); @@ -1046,6 +1058,7 @@ The next upload iteration will be delayed.`); } } else if ('CloseSyncStream' in instruction) { abortController.abort(); + hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect; } else if ('FlushFileSystem' in instruction) { // Not necessary on JS platforms. } else if ('DidCompleteSync' in instruction) { @@ -1084,7 +1097,7 @@ The next upload iteration will be delayed.`); if (controlInvocations && !controlInvocations?.closed) { controlInvocations.enqueueData({ command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS, - payload: JSON.stringify({ active_streams: this.activeStreams }) + payload: JSON.stringify(this.activeStreams) }); } }; @@ -1093,6 +1106,8 @@ The next upload iteration will be delayed.`); this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined; await stop(); } + + return { immediateRestart: hideDisconnectOnRestart }; } private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) { @@ -1228,3 +1243,7 @@ interface EnqueuedCommand { command: PowerSyncControlCommand; payload?: Uint8Array | string; } + +interface RustIterationResult { + immediateRestart: boolean; +} diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index 2da92ac5b..335b8da11 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -87,7 +87,8 @@ export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOp connecting: status.connecting, dataFlow: { downloading: status.downloading != null, - downloadProgress: status.downloading?.buckets + downloadProgress: status.downloading?.buckets, + internalStreamSubscriptions: status.streams }, lastSyncedAt: completeSync?.lastSyncedAt, hasSynced: completeSync?.hasSynced, diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index dd6fcf505..613816909 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -1,7 +1,7 @@ import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js'; import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js'; import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js'; -import { SyncStreamDescription, SyncSubscriptionDescription } from 'src/client/sync/sync-streams.js'; +import { SyncStreamDescription, SyncSubscriptionDescription } from '../../client/sync/sync-streams.js'; export type SyncDataFlowStatus = Partial<{ downloading: boolean; diff --git a/packages/node/tests/stream.test.ts b/packages/node/tests/stream.test.ts new file mode 100644 index 000000000..268b7251e --- /dev/null +++ b/packages/node/tests/stream.test.ts @@ -0,0 +1,153 @@ +import { describe, vi, expect, beforeEach } from 'vitest'; +import { SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; +import Logger from 'js-logger'; +import { bucket, checkpoint, mockSyncServiceTest, nextStatus, stream, TestConnector } from './utils'; + +Logger.useDefaults({ defaultLevel: Logger.WARN }); + +describe('Sync streams', () => { + const defaultOptions = { + clientImplementation: SyncClientImplementation.RUST, + connectionMethod: SyncStreamConnectionMethod.HTTP + }; + + mockSyncServiceTest('can disable default streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + database.connect(new TestConnector(), { + includeDefaultStreams: false, + ...defaultOptions + }); + + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: false, + subscriptions: [] + } + }); + }); + + mockSyncServiceTest('subscribes with streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('stream', { foo: 'a' }).subscribe(); + const b = await database.syncStream('stream', { foo: 'b' }).subscribe({ priority: 1 }); + + database.connect(new TestConnector(), defaultOptions); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [ + { + stream: 'stream', + parameters: { foo: 'a' }, + override_priority: null + }, + { + stream: 'stream', + parameters: { foo: 'b' }, + override_priority: 1 + } + ] + } + }); + + let statusPromise = nextStatus(database); + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [ + bucket('a', 0, { priority: 3, subscriptions: [{ sub: 0 }] }), + bucket('b', 0, { priority: 1, subscriptions: [{ sub: 1 }] }) + ], + streams: [stream('stream', false)] + }) + ); + let status = await statusPromise; + for (const subscription of [a, b]) { + expect(status.statusFor(subscription).subscription.active).toBeTruthy(); + expect(status.statusFor(subscription).subscription.lastSyncedAt).toBeNull(); + expect(status.statusFor(subscription).subscription.hasExplicitSubscription).toBeTruthy(); + } + + statusPromise = nextStatus(database); + syncService.pushLine({ partial_checkpoint_complete: { last_op_id: '0', priority: 1 } }); + status = await statusPromise; + expect(status.statusFor(a).subscription.lastSyncedAt).toBeNull(); + expect(status.statusFor(b).subscription.lastSyncedAt).not.toBeNull(); + await b.waitForFirstSync(); + + syncService.pushLine({ checkpoint_complete: { last_op_id: '0' } }); + await a.waitForFirstSync(); + }); + + mockSyncServiceTest('reports default streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + database.connect(new TestConnector(), defaultOptions); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + let statusPromise = nextStatus(database); + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [], + streams: [stream('default_stream', true)] + }) + ); + let status = await statusPromise; + + expect(status.subscriptions).toHaveLength(1); + expect(status.subscriptions[0]).toMatchObject({ + subscription: { + name: 'default_stream', + parameters: null, + isDefault: true, + hasExplicitSubscription: false + } + }); + }); + + mockSyncServiceTest('changes subscriptions dynamically', async ({ syncService }) => { + const database = await syncService.createDatabase(); + database.connect(new TestConnector(), defaultOptions); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [] + }) + ); + const subscription = await database.syncStream('a').subscribe(); + + await vi.waitFor(() => + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [ + { + stream: 'a', + parameters: null, + override_priority: null + } + ] + } + }) + ); + + // Given that the subscription has a TTL, dropping the handle should not re-subscribe. + subscription.unsubscribe(); + await new Promise((r) => setTimeout(r, 100)); + expect(syncService.connectedListeners[0].streams.subscriptions).toHaveLength(1); + }); + + mockSyncServiceTest('subscriptions update while offline', async ({ syncService }) => { + const database = await syncService.createDatabase(); + + let statusPromise = nextStatus(database); + const subscription = await database.syncStream('foo').subscribe(); + let status = await statusPromise; + expect(status.statusFor(subscription)).not.toBeNull(); + }); +}); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index c63163a6f..6c47eebfa 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -1,7 +1,7 @@ import { describe, vi, expect, beforeEach } from 'vitest'; import util from 'node:util'; -import { MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils'; +import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils'; import { AbstractPowerSyncDatabase, BucketChecksum, @@ -925,15 +925,6 @@ function defineSyncTests(impl: SyncClientImplementation) { }); } -function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { - return { - bucket: name, - count, - checksum: 0, - priority: options.priority - }; -} - async function waitForProgress( database: AbstractPowerSyncDatabase, total: [number, number], diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index ab10c9fa2..95a057874 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -6,12 +6,15 @@ import { ReadableStream, TransformStream } from 'node:stream/web'; import { onTestFinished, test } from 'vitest'; import { AbstractPowerSyncDatabase, + BucketChecksum, column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, PowerSyncCredentials, PowerSyncDatabase, Schema, + StreamingSyncCheckpoint, + StreamingSyncCheckpointComplete, StreamingSyncLine, SyncStatus, Table @@ -50,7 +53,7 @@ export const tempDirectoryTest = test.extend<{ tmpdir: string }>({ } }); -async function createDatabase( +export async function createDatabase( tmpdir: string, options: Partial = {} ): Promise { @@ -196,3 +199,45 @@ export function waitForSyncStatus( }); }); } + +export function checkpoint(options: { last_op_id: number; buckets?: any[]; streams?: any[] }): StreamingSyncCheckpoint { + return { + checkpoint: { + last_op_id: `${options.last_op_id}`, + buckets: options.buckets ?? [], + write_checkpoint: null, + streams: options.streams ?? [] + } + }; +} + +export function bucket( + name: string, + count: number, + options: { priority: number; subscriptions?: any } = { priority: 3 } +): BucketChecksum { + return { + bucket: name, + count, + checksum: 0, + priority: options.priority, + subscriptions: options.subscriptions + }; +} + +export function stream(name: string, isDefault: boolean, errors = []) { + return { name, is_default: isDefault, errors }; +} + +export function nextStatus(db: PowerSyncDatabase): Promise { + return new Promise((resolve) => { + let l; + + l = db.registerListener({ + statusChanged(status) { + resolve(status); + l(); + } + }); + }); +} From 995c7e532427cec75e8fbb2f2e50fe6855add0f1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 4 Sep 2025 12:22:51 +0200 Subject: [PATCH 03/21] Fix more tests --- .../src/client/AbstractPowerSyncDatabase.ts | 17 ++-- .../common/src/client/ConnectionManager.ts | 28 ++++--- .../AbstractStreamingSyncImplementation.ts | 8 +- .../common/src/client/sync/sync-streams.ts | 2 +- packages/node/src/db/PowerSyncDatabase.ts | 8 +- packages/web/src/db/PowerSyncDatabase.ts | 3 +- .../sync/SSRWebStreamingSyncImplementation.ts | 6 ++ .../SharedWebStreamingSyncImplementation.ts | 41 +++++++--- .../worker/sync/SharedSyncImplementation.ts | 40 ++++++++-- .../sync/SharedSyncImplementation.worker.ts | 28 +------ packages/web/src/worker/sync/WorkerClient.ts | 80 +++++++++++++++++++ packages/web/tests/multiple_instances.test.ts | 13 ++- .../web/tests/utils/MockStreamOpenFactory.ts | 5 +- 13 files changed, 203 insertions(+), 76 deletions(-) create mode 100644 packages/web/src/worker/sync/WorkerClient.ts diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 52909187f..f916e5b5a 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -15,7 +15,11 @@ import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js'; -import { ConnectionManager, CreateSyncImplementationOptions } from './ConnectionManager.js'; +import { + ConnectionManager, + CreateSyncImplementationOptions, + InternalSubscriptionAdapter +} from './ConnectionManager.js'; import { CustomQuery } from './CustomQuery.js'; import { ArrayQueryDefinition, Query } from './Query.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; @@ -183,6 +187,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver; protected connectionManager: ConnectionManager; + private subscriptions: InternalSubscriptionAdapter; get syncStreamImplementation() { return this.connectionManager.syncStreamImplementation; @@ -237,14 +242,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver this.waitForStatus(predicate), + this.subscriptions = { + firstStatusMatching: (predicate, abort) => this.waitForStatus(predicate, abort), resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(), rustSubscriptionsCommand: async (payload) => { await this.writeTransaction((tx) => { return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]); }); - }, + } + }; + this.connectionManager = new ConnectionManager({ createSyncImplementation: async (connector, options) => { await this.waitForReady(); return this.runExclusive(async () => { @@ -538,7 +545,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver): SyncStream { - return this.connectionManager.stream(name, params ?? null); + return this.connectionManager.stream(this.subscriptions, name, params ?? null); } /** diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 6a76cbbf0..205f93f3b 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -36,6 +36,12 @@ export interface CreateSyncImplementationOptions extends AdditionalConnectionOpt subscriptions: SubscribedStream[]; } +export interface InternalSubscriptionAdapter { + firstStatusMatching(predicate: (status: SyncStatus) => any, abort?: AbortSignal): Promise; + resolveOfflineSyncStatus(): Promise; + rustSubscriptionsCommand(payload: any): Promise; +} + /** * @internal */ @@ -44,9 +50,7 @@ export interface ConnectionManagerOptions { connector: PowerSyncBackendConnector, options: CreateSyncImplementationOptions ): Promise; - firstStatusMatching(predicate: (status: SyncStatus) => any): Promise; - resolveOfflineSyncStatus(): Promise; - rustSubscriptionsCommand(payload: any): Promise; + logger: ILogger; } @@ -269,11 +273,11 @@ export class ConnectionManager extends BaseObserver { await disposer?.(); } - stream(name: string, parameters: Record | null): SyncStream { + stream(adapter: InternalSubscriptionAdapter, name: string, parameters: Record | null): SyncStream { const desc = { name, parameters } satisfies SyncStreamDescription; - const waitForFirstSync = () => { - return this.options.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced ?? false); + const waitForFirstSync = (abort?: AbortSignal) => { + return adapter.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced, abort); }; return { @@ -281,7 +285,7 @@ export class ConnectionManager extends BaseObserver { subscribe: async (options?: SyncStreamSubscribeOptions) => { // NOTE: We also run this command if a subscription already exists, because this increases the expiry date // (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl). - await this.options.rustSubscriptionsCommand({ + await adapter.rustSubscriptionsCommand({ subscribe: { stream: { name, @@ -295,7 +299,7 @@ export class ConnectionManager extends BaseObserver { if (!this.syncStreamImplementation) { // We're not connected. So, update the offline sync status to reflect the new subscription. // (With an active iteration, the sync client would include it in its state). - await this.options.resolveOfflineSyncStatus(); + await adapter.resolveOfflineSyncStatus(); } const key = `${name}|${JSON.stringify(parameters)}`; @@ -314,7 +318,7 @@ export class ConnectionManager extends BaseObserver { return new SyncStreamSubscriptionHandle(subscription); }, unsubscribeAll: async () => { - await this.options.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } }); + await adapter.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } }); this.subscriptionsMayHaveChanged(); } }; @@ -337,7 +341,7 @@ class ActiveSubscription { constructor( readonly name: string, readonly parameters: Record | null, - readonly waitForFirstSync: () => Promise, + readonly waitForFirstSync: (abort?: AbortSignal) => Promise, private clearSubscription: () => void ) {} @@ -362,8 +366,8 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription { return this.subscription.parameters; } - waitForFirstSync(): Promise { - return this.subscription.waitForFirstSync(); + waitForFirstSync(abort?: AbortSignal): Promise { + return this.subscription.waitForFirstSync(abort); } unsubscribe(): void { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index cf0e74252..852a808f5 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -95,7 +95,7 @@ export interface LockOptions { signal?: AbortSignal; } -export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions { +export interface AbstractStreamingSyncImplementationOptions extends RequiredAdditionalConnectionOptions { adapter: BucketStorageAdapter; subscriptions: SubscribedStream[]; uploadCrud: () => Promise; @@ -185,7 +185,9 @@ export interface AdditionalConnectionOptions { } /** @internal */ -export type RequiredAdditionalConnectionOptions = Required; +export interface RequiredAdditionalConnectionOptions extends Required { + subscriptions: SubscribedStream[]; +} export interface StreamingSyncImplementation extends BaseObserverInterface, @@ -265,7 +267,7 @@ export abstract class AbstractStreamingSyncImplementation constructor(options: AbstractStreamingSyncImplementationOptions) { super(); - this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options }; + this.options = options; this.activeStreams = options.subscriptions; this.logger = options.logger ?? Logger.get('PowerSyncStream'); diff --git a/packages/common/src/client/sync/sync-streams.ts b/packages/common/src/client/sync/sync-streams.ts index ad413b41e..29ba20226 100644 --- a/packages/common/src/client/sync/sync-streams.ts +++ b/packages/common/src/client/sync/sync-streams.ts @@ -98,7 +98,7 @@ export interface SyncStreamSubscription extends SyncStreamDescription { /** * A promise that resolves once data from in this sync stream has been synced and applied. */ - waitForFirstSync(): Promise; + waitForFirstSync(abort?: AbortSignal): Promise; /** * Removes this stream subscription. diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index 7f10db214..e2e2d06bf 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -4,12 +4,12 @@ import { AbstractStreamingSyncImplementation, AdditionalConnectionOptions, BucketStorageAdapter, - CreateSyncImplementationOptions, DBAdapter, PowerSyncBackendConnector, PowerSyncConnectionOptions, PowerSyncDatabaseOptions, PowerSyncDatabaseOptionsWithSettings, + RequiredAdditionalConnectionOptions, SqliteBucketStorage, SQLOpenFactory } from '@powersync/common'; @@ -77,7 +77,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { protected generateSyncStreamImplementation( connector: PowerSyncBackendConnector, - options: CreateSyncImplementationOptions & NodeAdditionalConnectionOptions + options: RequiredAdditionalConnectionOptions & NodeAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { const logger = this.logger; const remote = new NodeRemote(connector, logger, { @@ -86,15 +86,13 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { }); return new NodeStreamingSyncImplementation({ - subscriptions: options.subscriptions, adapter: this.bucketStorageAdapter, remote, uploadCrud: async () => { await this.waitForReady(); await connector.uploadData(this); }, - retryDelayMs: this.options.retryDelayMs, - crudUploadThrottleMs: this.options.crudUploadThrottleMs, + ...options, identifier: this.database.name, logger }); diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index f55346808..0e60de018 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -189,8 +189,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { const remote = new WebRemote(connector, this.logger); const syncOptions: WebStreamingSyncImplementationOptions = { ...(this.options as {}), - retryDelayMs: options.retryDelayMs, - crudUploadThrottleMs: options.crudUploadThrottleMs, + ...options, flags: this.resolvedFlags, adapter: this.bucketStorageAdapter, remote, diff --git a/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts index c09518f46..463a24308 100644 --- a/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts @@ -5,6 +5,7 @@ import { LockType, PowerSyncConnectionOptions, StreamingSyncImplementation, + SubscribedStream, SyncStatus, SyncStatusOptions } from '@powersync/common'; @@ -80,4 +81,9 @@ export class SSRStreamingSyncImplementation extends BaseObserver implements Stre * This is a no-op in SSR mode. */ triggerCrudUpload() {} + + /** + * No-op in SSR mode. + */ + updateSubscriptions(): void {} } diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 1fe0177a7..1ac8748f7 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -1,4 +1,10 @@ -import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common'; +import { + PowerSyncConnectionOptions, + PowerSyncCredentials, + SubscribedStream, + SyncStatus, + SyncStatusOptions +} from '@powersync/common'; import * as Comlink from 'comlink'; import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; import { @@ -12,6 +18,7 @@ import { WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions } from './WebStreamingSyncImplementation'; +import { WorkerClient } from '../../worker/sync/WorkerClient'; /** * The shared worker will trigger methods on this side of the message port @@ -94,8 +101,11 @@ export interface SharedWebStreamingSyncImplementationOptions extends WebStreamin db: WebDBAdapter; } +/** + * The local part of the sync implementation on the web, which talks to a sync implementation hosted in a shared worker. + */ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplementation { - protected syncManager: Comlink.Remote; + protected syncManager: Comlink.Remote; protected clientProvider: SharedSyncClientProvider; protected messagePort: MessagePort; @@ -138,7 +148,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem ).port; } - this.syncManager = Comlink.wrap(this.messagePort); + this.syncManager = Comlink.wrap(this.messagePort); this.syncManager.setLogLevel(this.logger.getLevel()); this.triggerCrudUpload = this.syncManager.triggerCrudUpload; @@ -152,15 +162,18 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options; const flags = { ...this.webOptions.flags, workers: undefined }; - this.isInitialized = this.syncManager.setParams({ - dbParams: this.dbAdapter.getConfiguration(), - streamOptions: { - crudUploadThrottleMs, - identifier, - retryDelayMs, - flags: flags - } - }); + this.isInitialized = this.syncManager.setParams( + { + dbParams: this.dbAdapter.getConfiguration(), + streamOptions: { + crudUploadThrottleMs, + identifier, + retryDelayMs, + flags: flags + } + }, + options.subscriptions + ); /** * Pass along any sync status updates to this listener @@ -235,6 +248,10 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem return this.isInitialized; } + updateSubscriptions(subscriptions: SubscribedStream[]): void { + this.syncManager.updateSubscriptions(subscriptions); + } + /** * Used in tests to force a connection states */ diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index c85947439..2941a3ff9 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -12,6 +12,7 @@ import { DBAdapter, PowerSyncBackendConnector, SqliteBucketStorage, + SubscribedStream, SyncStatus } from '@powersync/common'; import { Mutex } from 'async-mutex'; @@ -55,7 +56,7 @@ export type ManualSharedSyncPayload = { * @internal */ export type SharedSyncInitOptions = { - streamOptions: Omit; + streamOptions: Omit; dbParams: ResolvedWebSQLOpenOptions; }; @@ -73,6 +74,7 @@ export type WrappedSyncPort = { port: MessagePort; clientProvider: Comlink.Remote; db?: DBAdapter; + currentSubscriptions: SubscribedStream[]; }; /** @@ -94,10 +96,7 @@ const CONNECTOR_PLACEHOLDER = {} as PowerSyncBackendConnector; * @internal * Shared sync implementation which runs inside a shared webworker */ -export class SharedSyncImplementation - extends BaseObserver - implements StreamingSyncImplementation -{ +export class SharedSyncImplementation extends BaseObserver { protected ports: WrappedSyncPort[]; protected isInitialized: Promise; @@ -111,6 +110,7 @@ export class SharedSyncImplementation protected logger: ILogger; protected lastConnectOptions: PowerSyncConnectionOptions | undefined; protected portMutex: Mutex; + private subscriptions: SubscribedStream[] = []; protected connectionManager: ConnectionManager; syncStatus: SyncStatus; @@ -186,6 +186,25 @@ export class SharedSyncImplementation return this.isInitialized; } + private collectActiveSubscriptions() { + this.logger.debug('Collecting active stream subscriptions across tabs'); + const active = new Map(); + for (const port of this.ports) { + for (const stream of port.currentSubscriptions) { + const serializedKey = JSON.stringify(stream); + active.set(serializedKey, stream); + } + } + this.subscriptions = [...active.values()]; + this.logger.debug('Collected stream subscriptions', this.subscriptions); + this.connectionManager.syncStreamImplementation?.updateSubscriptions(this.subscriptions); + } + + updateSubscriptions(port: WrappedSyncPort, subscriptions: SubscribedStream[]) { + port.currentSubscriptions = subscriptions; + this.collectActiveSubscriptions(); + } + setLogLevel(level: ILogLevel) { this.logger.setLevel(level); this.broadCastLogger.setLevel(level); @@ -196,6 +215,7 @@ export class SharedSyncImplementation */ async setParams(params: SharedSyncInitOptions) { await this.portMutex.runExclusive(async () => { + this.collectActiveSubscriptions(); if (this.syncParams) { // Cannot modify already existing sync implementation params // But we can ask for a DB adapter, if required, at this point. @@ -250,11 +270,12 @@ export class SharedSyncImplementation * Adds a new client tab's message port to the list of connected ports */ async addPort(port: MessagePort) { - await this.portMutex.runExclusive(() => { + return await this.portMutex.runExclusive(() => { const portProvider = { port, - clientProvider: Comlink.wrap(port) - }; + clientProvider: Comlink.wrap(port), + currentSubscriptions: [] + } satisfies WrappedSyncPort; this.ports.push(portProvider); // Give the newly connected client the latest status @@ -262,6 +283,8 @@ export class SharedSyncImplementation if (status) { portProvider.clientProvider.statusChanged(status.toJSON()); } + + return portProvider; }); } @@ -427,6 +450,7 @@ export class SharedSyncImplementation }); }, ...syncParams.streamOptions, + subscriptions: this.subscriptions, // Logger cannot be transferred just yet logger: this.logger }); diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts index 11a14fd9b..157d7c1cb 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts @@ -1,10 +1,6 @@ import { createBaseLogger } from '@powersync/common'; -import * as Comlink from 'comlink'; -import { - SharedSyncClientEvent, - SharedSyncImplementation, - type ManualSharedSyncPayload -} from './SharedSyncImplementation'; +import { SharedSyncImplementation } from './SharedSyncImplementation'; +import { WorkerClient } from './WorkerClient'; const _self: SharedWorkerGlobalScope = self as any; const logger = createBaseLogger(); @@ -14,23 +10,5 @@ const sharedSyncImplementation = new SharedSyncImplementation(); _self.onconnect = async function (event: MessageEvent) { const port = event.ports[0]; - - /** - * Adds an extra listener which can remove this port - * from the list of monitored ports. - */ - port.addEventListener('message', async (event) => { - const payload = event.data as ManualSharedSyncPayload; - if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { - const release = await sharedSyncImplementation.removePort(port); - port.postMessage({ - event: SharedSyncClientEvent.CLOSE_ACK, - data: {} - } satisfies ManualSharedSyncPayload); - release?.(); - } - }); - - await sharedSyncImplementation.addPort(port); - Comlink.expose(sharedSyncImplementation, port); + await new WorkerClient(sharedSyncImplementation, port).initialize(); }; diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts new file mode 100644 index 000000000..ca9eb3b2e --- /dev/null +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -0,0 +1,80 @@ +import * as Comlink from 'comlink'; +import { + ManualSharedSyncPayload, + SharedSyncClientEvent, + SharedSyncImplementation, + SharedSyncInitOptions, + WrappedSyncPort +} from './SharedSyncImplementation'; +import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream } from '@powersync/common'; + +/** + * A client to the shared sync worker. + * + * The shared sync implementation needs a per-client view of subscriptions so that subscriptions of closed tabs can + * automatically be evicted later. + */ +export class WorkerClient { + private resolvedPort: WrappedSyncPort | null = null; + + constructor( + private readonly sync: SharedSyncImplementation, + private readonly port: MessagePort + ) {} + + async initialize() { + /** + * Adds an extra listener which can remove this port + * from the list of monitored ports. + */ + this.port.addEventListener('message', async (event) => { + const payload = event.data as ManualSharedSyncPayload; + if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { + const release = await this.sync.removePort(this.port); + this.port.postMessage({ + event: SharedSyncClientEvent.CLOSE_ACK, + data: {} + } satisfies ManualSharedSyncPayload); + release?.(); + } + }); + + this.resolvedPort = await this.sync.addPort(this.port); + Comlink.expose(this, this.port); + } + + setLogLevel(level: ILogLevel) { + this.sync.setLogLevel(level); + } + + triggerCrudUpload() { + return this.sync.triggerCrudUpload(); + } + + setParams(params: SharedSyncInitOptions, subscriptions: SubscribedStream[]) { + this.resolvedPort!.currentSubscriptions = subscriptions; + return this.sync.setParams(params); + } + + getWriteCheckpoint() { + return this.sync.getWriteCheckpoint(); + } + + hasCompletedSync() { + return this.sync.hasCompletedSync(); + } + + connect(options?: PowerSyncConnectionOptions) { + return this.sync.connect(options); + } + + updateSubscriptions(subscriptions: SubscribedStream[]) { + if (this.resolvedPort) { + this.sync.updateSubscriptions; + } + } + + disconnect() { + return this.sync.disconnect(); + } +} diff --git a/packages/web/tests/multiple_instances.test.ts b/packages/web/tests/multiple_instances.test.ts index 739aaf4ca..80b39e419 100644 --- a/packages/web/tests/multiple_instances.test.ts +++ b/packages/web/tests/multiple_instances.test.ts @@ -2,6 +2,8 @@ import { AbstractPowerSyncDatabase, createBaseLogger, createLogger, + DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + DEFAULT_STREAMING_SYNC_OPTIONS, SqliteBucketStorage, SyncStatus } from '@powersync/common'; @@ -131,8 +133,10 @@ describe('Multiple Instances', { sequential: true }, () => { await connector1.uploadData(db); }, identifier, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, retryDelayMs: 90_000, // Large delay to allow for testing - db: db.database as WebDBAdapter + db: db.database as WebDBAdapter, + subscriptions: [] }; const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1); @@ -146,7 +150,10 @@ describe('Multiple Instances', { sequential: true }, () => { await connector2.uploadData(db); }, identifier, - db: db.database as WebDBAdapter + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + retryDelayMs: 90_000, // Large delay to allow for testing + db: db.database as WebDBAdapter, + subscriptions: [] }; const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2); @@ -195,6 +202,8 @@ describe('Multiple Instances', { sequential: true }, () => { identifier, // The large delay here allows us to test between connection retries retryDelayMs: 90_000, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + subscriptions: [], flags: { broadcastLogs: true } diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index f12bd186c..c08c09302 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -4,6 +4,7 @@ import { AbstractStreamingSyncImplementation, BSONImplementation, DataStream, + DEFAULT_CRUD_UPLOAD_THROTTLE_MS, PowerSyncBackendConnector, PowerSyncCredentials, PowerSyncDatabaseOptions, @@ -169,7 +170,9 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { await connector.uploadData(this); }, identifier: this.database.name, - retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster + retryDelayMs: this.options.crudUploadThrottleMs ?? 0, // The zero here makes tests faster + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + subscriptions: [] }); } } From 2e196541a44ed5a0219ae6e13589a027e72e0a35 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 4 Sep 2025 12:35:49 +0200 Subject: [PATCH 04/21] Fix web tests --- .../db/sync/SharedWebStreamingSyncImplementation.ts | 2 +- .../web/src/worker/sync/SharedSyncImplementation.ts | 2 +- packages/web/src/worker/sync/WorkerClient.ts | 6 +++++- .../tests/src/db/AbstractPowerSyncDatabase.test.ts | 11 ++++++++++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 1ac8748f7..dc030bccb 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -257,6 +257,6 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem */ private async _testUpdateStatus(status: SyncStatus) { await this.isInitialized; - return (this.syncManager as any)['_testUpdateAllStatuses'](status.toJSON()); + return this.syncManager._testUpdateAllStatuses(status.toJSON()); } } diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 2941a3ff9..5b9f8a82e 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -494,7 +494,7 @@ export class SharedSyncImplementation extends BaseObserver Date: Thu, 4 Sep 2025 13:04:00 +0200 Subject: [PATCH 05/21] Fix RN build --- packages/react-native/src/db/PowerSyncDatabase.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/react-native/src/db/PowerSyncDatabase.ts b/packages/react-native/src/db/PowerSyncDatabase.ts index ddb513694..6b233c9a6 100644 --- a/packages/react-native/src/db/PowerSyncDatabase.ts +++ b/packages/react-native/src/db/PowerSyncDatabase.ts @@ -49,14 +49,13 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { const remote = new ReactNativeRemote(connector, this.logger); return new ReactNativeStreamingSyncImplementation({ + ...options, adapter: this.bucketStorageAdapter, remote, uploadCrud: async () => { await this.waitForReady(); await connector.uploadData(this); }, - retryDelayMs: options.retryDelayMs, - crudUploadThrottleMs: options.crudUploadThrottleMs, identifier: this.database.name, logger: this.logger }); From 2722fbd0642b473b6d9395543f8d5a733b733d40 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 4 Sep 2025 14:20:27 +0200 Subject: [PATCH 06/21] Node: Properly wait for connection --- .../{stream.test.ts => sync-stream.test.ts} | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) rename packages/node/tests/{stream.test.ts => sync-stream.test.ts} (86%) diff --git a/packages/node/tests/stream.test.ts b/packages/node/tests/sync-stream.test.ts similarity index 86% rename from packages/node/tests/stream.test.ts rename to packages/node/tests/sync-stream.test.ts index 268b7251e..5ea5b23a6 100644 --- a/packages/node/tests/stream.test.ts +++ b/packages/node/tests/sync-stream.test.ts @@ -1,5 +1,5 @@ -import { describe, vi, expect, beforeEach } from 'vitest'; -import { SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; +import { describe, vi, expect } from 'vitest'; +import { PowerSyncConnectionOptions, SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; import Logger from 'js-logger'; import { bucket, checkpoint, mockSyncServiceTest, nextStatus, stream, TestConnector } from './utils'; @@ -9,16 +9,15 @@ describe('Sync streams', () => { const defaultOptions = { clientImplementation: SyncClientImplementation.RUST, connectionMethod: SyncStreamConnectionMethod.HTTP - }; + } satisfies PowerSyncConnectionOptions; mockSyncServiceTest('can disable default streams', async ({ syncService }) => { const database = await syncService.createDatabase(); - database.connect(new TestConnector(), { + await database.connect(new TestConnector(), { includeDefaultStreams: false, ...defaultOptions }); - await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); expect(syncService.connectedListeners[0]).toMatchObject({ streams: { include_defaults: false, @@ -32,8 +31,7 @@ describe('Sync streams', () => { const a = await database.syncStream('stream', { foo: 'a' }).subscribe(); const b = await database.syncStream('stream', { foo: 'b' }).subscribe({ priority: 1 }); - database.connect(new TestConnector(), defaultOptions); - await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + await database.connect(new TestConnector(), defaultOptions); expect(syncService.connectedListeners[0]).toMatchObject({ streams: { @@ -84,8 +82,7 @@ describe('Sync streams', () => { mockSyncServiceTest('reports default streams', async ({ syncService }) => { const database = await syncService.createDatabase(); - database.connect(new TestConnector(), defaultOptions); - await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + await database.connect(new TestConnector(), defaultOptions); let statusPromise = nextStatus(database); syncService.pushLine( @@ -110,8 +107,7 @@ describe('Sync streams', () => { mockSyncServiceTest('changes subscriptions dynamically', async ({ syncService }) => { const database = await syncService.createDatabase(); - database.connect(new TestConnector(), defaultOptions); - await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + await database.connect(new TestConnector(), defaultOptions); syncService.pushLine( checkpoint({ From eb5491c0b1bd1404e69ea77ae8bdaa8aca8c587e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 4 Sep 2025 17:55:12 +0200 Subject: [PATCH 07/21] Add to diagnostics app --- .../common/src/client/ConnectionManager.ts | 2 +- packages/common/src/db/crud/SyncStatus.ts | 10 +- packages/node/tests/sync-stream.test.ts | 16 +-- .../diagnostics-app/src/app/views/layout.tsx | 19 +-- .../src/app/views/sync-diagnostics.tsx | 73 ++++++++++- .../widgets/NewStreamSubscription.tsx | 124 ++++++++++++++++++ .../library/powersync/ConnectionManager.ts | 28 +++- 7 files changed, 238 insertions(+), 34 deletions(-) create mode 100644 tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 205f93f3b..1bfa491b4 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -277,7 +277,7 @@ export class ConnectionManager extends BaseObserver { const desc = { name, parameters } satisfies SyncStreamDescription; const waitForFirstSync = (abort?: AbortSignal) => { - return adapter.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced, abort); + return adapter.firstStatusMatching((s) => s.forStream(desc)?.subscription.hasSynced, abort); }; return { diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index 613816909..150de8fb2 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -123,14 +123,14 @@ export class SyncStatus { * This returns null when the database is currently being opened and we don't have reliable information about all * included streams yet. */ - get subscriptions(): SyncStreamStatus[] | undefined { + get syncStreams(): SyncStreamStatus[] | undefined { return this.options.dataFlow?.internalStreamSubscriptions?.map((core) => new SyncStreamStatusView(this, core)); } /** - * If the `stream` appears in {@link subscriptions}, returns the current status for that stream. + * If the `stream` appears in {@link syncStreams}, returns the current status for that stream. */ - statusFor(stream: SyncStreamDescription): SyncStreamStatus | undefined { + forStream(stream: SyncStreamDescription): SyncStreamStatus | undefined { const asJson = JSON.stringify(stream.parameters); const raw = this.options.dataFlow?.internalStreamSubscriptions?.find( (r) => r.name == stream.name && asJson == JSON.stringify(r.parameters) @@ -280,9 +280,9 @@ class SyncStreamStatusView implements SyncStreamStatus { active: core.active, isDefault: core.is_default, hasExplicitSubscription: core.has_explicit_subscription, - expiresAt: core.expires_at != null ? new Date(core.expires_at) : null, + expiresAt: core.expires_at != null ? new Date(core.expires_at * 1000) : null, hasSynced: core.last_synced_at != null, - lastSyncedAt: core.last_synced_at != null ? new Date(core.last_synced_at) : null + lastSyncedAt: core.last_synced_at != null ? new Date(core.last_synced_at * 1000) : null }; } diff --git a/packages/node/tests/sync-stream.test.ts b/packages/node/tests/sync-stream.test.ts index 5ea5b23a6..c45234ad9 100644 --- a/packages/node/tests/sync-stream.test.ts +++ b/packages/node/tests/sync-stream.test.ts @@ -64,16 +64,16 @@ describe('Sync streams', () => { ); let status = await statusPromise; for (const subscription of [a, b]) { - expect(status.statusFor(subscription).subscription.active).toBeTruthy(); - expect(status.statusFor(subscription).subscription.lastSyncedAt).toBeNull(); - expect(status.statusFor(subscription).subscription.hasExplicitSubscription).toBeTruthy(); + expect(status.forStream(subscription).subscription.active).toBeTruthy(); + expect(status.forStream(subscription).subscription.lastSyncedAt).toBeNull(); + expect(status.forStream(subscription).subscription.hasExplicitSubscription).toBeTruthy(); } statusPromise = nextStatus(database); syncService.pushLine({ partial_checkpoint_complete: { last_op_id: '0', priority: 1 } }); status = await statusPromise; - expect(status.statusFor(a).subscription.lastSyncedAt).toBeNull(); - expect(status.statusFor(b).subscription.lastSyncedAt).not.toBeNull(); + expect(status.forStream(a).subscription.lastSyncedAt).toBeNull(); + expect(status.forStream(b).subscription.lastSyncedAt).not.toBeNull(); await b.waitForFirstSync(); syncService.pushLine({ checkpoint_complete: { last_op_id: '0' } }); @@ -94,8 +94,8 @@ describe('Sync streams', () => { ); let status = await statusPromise; - expect(status.subscriptions).toHaveLength(1); - expect(status.subscriptions[0]).toMatchObject({ + expect(status.syncStreams).toHaveLength(1); + expect(status.syncStreams[0]).toMatchObject({ subscription: { name: 'default_stream', parameters: null, @@ -144,6 +144,6 @@ describe('Sync streams', () => { let statusPromise = nextStatus(database); const subscription = await database.syncStream('foo').subscribe(); let status = await statusPromise; - expect(status.statusFor(subscription)).not.toBeNull(); + expect(status.forStream(subscription)).not.toBeNull(); }); }); diff --git a/tools/diagnostics-app/src/app/views/layout.tsx b/tools/diagnostics-app/src/app/views/layout.tsx index 51e0526cf..9ba207511 100644 --- a/tools/diagnostics-app/src/app/views/layout.tsx +++ b/tools/diagnostics-app/src/app/views/layout.tsx @@ -25,7 +25,7 @@ import { Typography, styled } from '@mui/material'; -import React from 'react'; +import React, { useMemo } from 'react'; import { CLIENT_PARAMETERS_ROUTE, @@ -35,7 +35,7 @@ import { SYNC_DIAGNOSTICS_ROUTE } from '@/app/router'; import { useNavigationPanel } from '@/components/navigation/NavigationPanelContext'; -import { signOut, sync } from '@/library/powersync/ConnectionManager'; +import { signOut, useSyncStatus } from '@/library/powersync/ConnectionManager'; import { usePowerSync } from '@powersync/react'; import { useNavigate } from 'react-router-dom'; @@ -43,8 +43,8 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) const powerSync = usePowerSync(); const navigate = useNavigate(); - const [syncStatus, setSyncStatus] = React.useState(sync?.syncStatus); - const [syncError, setSyncError] = React.useState(null); + const syncStatus = useSyncStatus(); + const syncError = useMemo(() => syncStatus?.dataFlowStatus?.downloadError, [syncStatus]); const { title } = useNavigationPanel(); const [mobileOpen, setMobileOpen] = React.useState(false); @@ -99,17 +99,6 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) [powerSync] ); - // Cannot use `useStatus()`, since we're not using the default sync implementation. - React.useEffect(() => { - const l = sync?.registerListener({ - statusChanged: (status) => { - setSyncStatus(status); - setSyncError(status.dataFlowStatus.downloadError ?? null); - } - }); - return () => l?.(); - }, []); - const drawerWidth = 320; const drawer = ( diff --git a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx index abe82d191..c9853f3db 100644 --- a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx +++ b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx @@ -1,5 +1,6 @@ import { NavigationPage } from '@/components/navigation/NavigationPage'; -import { clearData, db, sync } from '@/library/powersync/ConnectionManager'; +import { NewStreamSubscription } from '@/components/widgets/NewStreamSubscription'; +import { clearData, db, sync, useSyncStatus } from '@/library/powersync/ConnectionManager'; import { Box, Button, @@ -15,7 +16,8 @@ import { styled } from '@mui/material'; import { DataGrid, GridColDef } from '@mui/x-data-grid'; -import React from 'react'; +import { SyncStreamStatus } from '@powersync/web'; +import React, { useState } from 'react'; const BUCKETS_QUERY = ` WITH @@ -295,11 +297,78 @@ export default function SyncDiagnosticsPage() { {bucketRowsLoading ? : bucketsTable} + ); } +function StreamsState() { + const syncStreams = useSyncStatus()?.syncStreams; + const [dialogOpen, setDialogOpen] = useState(false); + const syncStreamsLoading = syncStreams == null; + + return ( + + + Sync stream subscriptions + + {syncStreamsLoading ? : } + setDialogOpen(false)} /> + + + ); +} + +function StreamsGrid(props: { streams: SyncStreamStatus[] }) { + const columns: GridColDef[] = [ + { field: 'name', headerName: 'Stream name', flex: 2 }, + { field: 'parameters', headerName: 'Parameters', flex: 3, type: 'text' }, + { field: 'default', headerName: 'Default', flex: 1, type: 'boolean' }, + { field: 'active', headerName: 'Active', flex: 1, type: 'boolean' }, + { field: 'has_explicit_subscription', headerName: 'Explicit', flex: 1, type: 'boolean' }, + { field: 'priority', headerName: 'Priority', flex: 1, type: 'number' }, + { field: 'last_synced_at', headerName: 'Last synced at', flex: 2, type: 'dateTime' }, + { field: 'expires', headerName: 'Eviction time', flex: 2, type: 'dateTime' } + ]; + + const rows = props.streams.map((stream) => { + const name = stream.subscription.name; + const parameters = JSON.stringify(stream.subscription.parameters); + + return { + id: `${name}-${parameters}`, + name, + parameters, + default: stream.subscription.isDefault, + has_explicit_subscription: stream.subscription.hasExplicitSubscription, + active: stream.subscription.active, + last_synced_at: stream.subscription.lastSyncedAt, + expires: stream.subscription.expiresAt, + priority: stream.priority + }; + }); + + return ( + + ); +} + namespace S { export const MainPaper = styled(Paper)` margin-bottom: 10px; diff --git a/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx b/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx new file mode 100644 index 000000000..3809f662c --- /dev/null +++ b/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx @@ -0,0 +1,124 @@ +import { activeSubscriptions, db } from '@/library/powersync/ConnectionManager'; +import Button from '@mui/material/Button'; +import Dialog from '@mui/material/Dialog'; +import DialogActions from '@mui/material/DialogActions'; +import DialogContent from '@mui/material/DialogContent'; +import DialogContentText from '@mui/material/DialogContentText'; +import DialogTitle from '@mui/material/DialogTitle'; +import FormControl from '@mui/material/FormControl'; +import InputLabel from '@mui/material/InputLabel'; +import MenuItem from '@mui/material/MenuItem'; +import Select from '@mui/material/Select'; +import Stack from '@mui/material/Stack'; +import TextField from '@mui/material/TextField'; +import { Formik, FormikErrors } from 'formik'; + +interface NewStreamSubscriptionValues { + stream: string; + parameters: string; + override_priority: 0 | 1 | 2 | 3 | null; +} + +export function NewStreamSubscription(props: { open: boolean; close: () => void }) { + const { open, close } = props; + + const validate = (values: NewStreamSubscriptionValues) => { + const errors: FormikErrors = {}; + + if (values.stream.length == 0) { + errors.stream = 'Stream is required'; + } + + if (values.parameters.length) { + try { + JSON.parse(values.parameters); + } catch (e) { + errors.parameters = 'Must be empty or a JSON object'; + } + } + + return errors; + }; + + const addSubscription = async (values: NewStreamSubscriptionValues) => { + const parameters = values.parameters == '' ? null : JSON.parse(values.parameters); + + const subscription = await db + .syncStream(values.stream, parameters) + .subscribe({ priority: values.override_priority ?? undefined }); + + // We need to store subscriptions globally, because they have a finalizer set on them that would eventually clear + // them otherwise. + activeSubscriptions.push(subscription); + close(); + }; + + return ( + + initialValues={{ stream: '', parameters: '', override_priority: null }} + validateOnChange={true} + onSubmit={addSubscription} + validate={validate}> + {({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit }) => ( + + Subscribe to sync stream + +
+ + Enter stream name and parameters (as a JSON object or an empty string for null) to subscribe to a + stream. + + + + + + + Override priority + + + + + + +
+
+ + + + +
+ )} + + ); +} diff --git a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts index 661c4b09b..275ff757d 100644 --- a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts +++ b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts @@ -1,9 +1,11 @@ import { BaseListener, createBaseLogger, + DEFAULT_STREAMING_SYNC_OPTIONS, LogLevel, PowerSyncDatabase, SyncClientImplementation, + SyncStreamSubscription, TemporaryStorageOption, WASQLiteOpenFactory, WASQLiteVFS, @@ -16,6 +18,7 @@ import { DynamicSchemaManager } from './DynamicSchemaManager'; import { RecordingStorageAdapter } from './RecordingStorageAdapter'; import { TokenConnector } from './TokenConnector'; import { RustClientInterceptor } from './RustClientInterceptor'; +import React from 'react'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); @@ -45,8 +48,7 @@ export const db = new PowerSyncDatabase({ }); export const connector = new TokenConnector(); - -const adapter = new RecordingStorageAdapter(db.database, schemaManager); +export const activeSubscriptions: SyncStreamSubscription[] = []; export let sync: WebStreamingSyncImplementation | undefined; @@ -59,6 +61,7 @@ if (connector.hasCredentials()) { } export async function connect() { + activeSubscriptions.length = 0; const client = localStorage.getItem('preferred_client_implementation') == SyncClientImplementation.RUST ? SyncClientImplementation.RUST @@ -78,7 +81,9 @@ export async function connect() { uploadCrud: async () => { // No-op }, - identifier: 'diagnostics' + identifier: 'diagnostics', + ...DEFAULT_STREAMING_SYNC_OPTIONS, + subscriptions: [] }; sync = new WebStreamingSyncImplementation(syncOptions); await sync.connect({ params, clientImplementation: client }); @@ -118,4 +123,21 @@ export const setParams = (p: object) => { connect(); }; +/** + * The current sync status - we can't use `useStatus()` since we're not using the default sync implementation. + */ +export function useSyncStatus() { + const [current, setCurrent] = React.useState(sync?.syncStatus); + React.useEffect(() => { + const l = sync?.registerListener({ + statusChanged: (status) => { + setCurrent(status); + } + }); + return () => l?.(); + }, []); + + return current; +} + (window as any).db = db; From 74619794a967822fc8668d6b6a093488e26124b9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 8 Sep 2025 09:47:26 +0200 Subject: [PATCH 08/21] Just warn on leak --- packages/common/src/client/ConnectionManager.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 1bfa491b4..8609ceddb 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -310,7 +310,7 @@ export class ConnectionManager extends BaseObserver { this.subscriptionsMayHaveChanged(); }; - subscription = new ActiveSubscription(name, parameters, waitForFirstSync, clearSubscription); + subscription = new ActiveSubscription(name, parameters, this.logger, waitForFirstSync, clearSubscription); this.locallyActiveSubscriptions.set(key, subscription); this.subscriptionsMayHaveChanged(); } @@ -341,6 +341,7 @@ class ActiveSubscription { constructor( readonly name: string, readonly parameters: Record | null, + readonly logger: ILogger, readonly waitForFirstSync: (abort?: AbortSignal) => Promise, private clearSubscription: () => void ) {} @@ -377,4 +378,10 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription { } const _finalizer = - FinalizationRegistry != null ? new FinalizationRegistry((sub) => sub.decrementRefCount()) : null; + FinalizationRegistry != null + ? new FinalizationRegistry((sub) => { + sub.logger.warn( + `A subscription to ${sub.name} with params ${JSON.stringify(sub.parameters)} leaked! Please ensure calling unsubscribe() when you don't need a subscription anymore. For global subscriptions, consider storing them in global fields to avoid this warning.` + ); + }) + : null; From b4ad9d3a60a55a64841a13e297f0f36fd7919509 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 8 Sep 2025 11:48:13 +0200 Subject: [PATCH 09/21] Remove subscriptions when tab is removed --- .../SharedWebStreamingSyncImplementation.ts | 22 ++++++++++--- .../worker/sync/SharedSyncImplementation.ts | 10 ++++-- packages/web/src/worker/sync/WorkerClient.ts | 31 +++++++++++++++---- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index dc030bccb..66d3b51ed 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -7,11 +7,7 @@ import { } from '@powersync/common'; import * as Comlink from 'comlink'; import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; -import { - ManualSharedSyncPayload, - SharedSyncClientEvent, - SharedSyncImplementation -} from '../../worker/sync/SharedSyncImplementation'; +import { ManualSharedSyncPayload, SharedSyncClientEvent } from '../../worker/sync/SharedSyncImplementation'; import { DEFAULT_CACHE_SIZE_KB, resolveWebSQLFlags, TemporaryStorageOption } from '../adapters/web-sql-flags'; import { WebDBAdapter } from '../adapters/WebDBAdapter'; import { @@ -19,6 +15,7 @@ import { WebStreamingSyncImplementationOptions } from './WebStreamingSyncImplementation'; import { WorkerClient } from '../../worker/sync/WorkerClient'; +import { getNavigatorLocks } from '../../shared/navigator'; /** * The shared worker will trigger methods on this side of the message port @@ -111,6 +108,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem protected isInitialized: Promise; protected dbAdapter: WebDBAdapter; + private abortOnClose = new AbortController(); constructor(options: SharedWebStreamingSyncImplementationOptions) { super(options); @@ -192,6 +190,19 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem * This performs bi-directional method calling. */ Comlink.expose(this.clientProvider, this.messagePort); + + // Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which + // will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker + // to free resources associated with this tab. + getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => { + if (!this.abortOnClose.signal.aborted) { + this.syncManager.addLockBasedCloseSignal(lock!.name); + + await new Promise((r) => { + this.abortOnClose.signal.onabort = () => r(); + }); + } + }); } /** @@ -238,6 +249,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem }; this.messagePort.postMessage(closeMessagePayload); }); + this.abortOnClose.abort(); // Release the proxy this.syncManager[Comlink.releaseProxy](); diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 5b9f8a82e..7db98c3d0 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -292,12 +292,12 @@ export class SharedSyncImplementation extends BaseObserver { - const index = this.ports.findIndex((p) => p.port == port); + const index = this.ports.findIndex((p) => p == port); if (index < 0) { this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`); return {}; @@ -312,7 +312,7 @@ export class SharedSyncImplementation extends BaseObserver { - if (abortController?.activePort.port == port) { + if (abortController?.activePort == port) { abortController!.controller.abort( new AbortOperation('Closing pending requests after client port is removed') ); @@ -347,6 +347,10 @@ export class SharedSyncImplementation extends BaseObserver trackedPort.clientProvider[Comlink.releaseProxy](); } diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts index 2232d955a..ca63e5771 100644 --- a/packages/web/src/worker/sync/WorkerClient.ts +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -7,6 +7,7 @@ import { WrappedSyncPort } from './SharedSyncImplementation'; import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common'; +import { getNavigatorLocks } from '../../shared/navigator'; /** * A client to the shared sync worker. @@ -30,12 +31,7 @@ export class WorkerClient { this.port.addEventListener('message', async (event) => { const payload = event.data as ManualSharedSyncPayload; if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { - const release = await this.sync.removePort(this.port); - this.port.postMessage({ - event: SharedSyncClientEvent.CLOSE_ACK, - data: {} - } satisfies ManualSharedSyncPayload); - release?.(); + await this.removePort(); } }); @@ -43,6 +39,29 @@ export class WorkerClient { Comlink.expose(this, this.port); } + private async removePort() { + if (this.resolvedPort) { + const release = await this.sync.removePort(this.resolvedPort); + this.port.postMessage({ + event: SharedSyncClientEvent.CLOSE_ACK, + data: {} + } satisfies ManualSharedSyncPayload); + release?.(); + } + } + + /** + * Called by a client after obtaining a lock with a random name. + * + * When the client tab is closed, its lock will be returned. So when the shared worker attempts to acquire the lock, + * it can consider the connection to be closed. + */ + addLockBasedCloseSignal(name: string) { + getNavigatorLocks().request(name, async () => { + await this.removePort(); + }); + } + setLogLevel(level: ILogLevel) { this.sync.setLogLevel(level); } From 3f1bd3d9ecdc8af159313a5ed4a400fa991cc4ff Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 8 Sep 2025 13:54:45 +0200 Subject: [PATCH 10/21] Adopt in example --- .../.env.local.template | 1 + .../src/app/views/todo-lists/edit/page.tsx | 26 ++++++++++++++++++- .../components/providers/SystemProvider.tsx | 10 +++++-- .../src/library/powersync/vite-env.d.ts | 1 + .../common/src/client/ConnectionManager.ts | 1 + packages/web/src/worker/sync/WorkerClient.ts | 3 ++- 6 files changed, 38 insertions(+), 4 deletions(-) diff --git a/demos/react-supabase-todolist/.env.local.template b/demos/react-supabase-todolist/.env.local.template index dc4088ca7..d69bec1d8 100644 --- a/demos/react-supabase-todolist/.env.local.template +++ b/demos/react-supabase-todolist/.env.local.template @@ -3,3 +3,4 @@ VITE_SUPABASE_URL=https://foo.supabase.co VITE_SUPABASE_ANON_KEY=foo VITE_POWERSYNC_URL=https://foo.powersync.journeyapps.com +VITE_USE_SYNC_STREAMS=false diff --git a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx index 7a488792f..eae5d310c 100644 --- a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx +++ b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx @@ -19,7 +19,8 @@ import { } from '@mui/material'; import Fab from '@mui/material/Fab'; import { usePowerSync, useQuery } from '@powersync/react'; -import React, { Suspense } from 'react'; +import { SyncStreamSubscription } from '@powersync/web'; +import React, { Suspense, useEffect } from 'react'; import { useParams } from 'react-router-dom'; /** @@ -32,6 +33,29 @@ const TodoEditSection = () => { const supabase = useSupabase(); const { id: listID } = useParams(); + if (import.meta.env.VITE_USE_SYNC_STREAMS == 'true') { + useEffect(() => { + let active = true; + let subscription: SyncStreamSubscription | null = null; + + powerSync + .syncStream('todos', { list: listID }) + .subscribe() + .then((sub) => { + if (!active) { + sub.unsubscribe(); + } else { + subscription = sub; + } + }); + + return () => { + active = false; + subscription?.unsubscribe(); + }; + }, [listID]); + } + const { data: [listRecord] } = useQuery<{ name: string }>( diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 8a3f3c209..3a54211ee 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,7 +3,13 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web'; +import { + createBaseLogger, + DifferentialWatchedQuery, + LogLevel, + PowerSyncDatabase, + SyncClientImplementation +} from '@powersync/web'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -68,7 +74,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => { const l = connector.registerListener({ initialized: () => {}, sessionStarted: () => { - powerSync.connect(connector); + powerSync.connect(connector, { clientImplementation: SyncClientImplementation.RUST }); } }); diff --git a/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts b/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts index e3e71b5ba..0fdbf8508 100644 --- a/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts +++ b/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts @@ -4,6 +4,7 @@ interface ImportMetaEnv { readonly VITE_SUPABASE_URL: string; readonly VITE_SUPABASE_ANON_KEY: string; readonly VITE_POWERSYNC_URL: string; + readonly VITE_USE_SYNC_STREAMS: string; } interface ImportMeta { diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 8609ceddb..47a7bf71d 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -356,6 +356,7 @@ class ActiveSubscription { class SyncStreamSubscriptionHandle implements SyncStreamSubscription { constructor(readonly subscription: ActiveSubscription) { + subscription.refcount++; _finalizer?.register(this, subscription); } diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts index ca63e5771..22c44f4ba 100644 --- a/packages/web/src/worker/sync/WorkerClient.ts +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -42,6 +42,7 @@ export class WorkerClient { private async removePort() { if (this.resolvedPort) { const release = await this.sync.removePort(this.resolvedPort); + this.resolvedPort = null; this.port.postMessage({ event: SharedSyncClientEvent.CLOSE_ACK, data: {} @@ -89,7 +90,7 @@ export class WorkerClient { updateSubscriptions(subscriptions: SubscribedStream[]) { if (this.resolvedPort) { - this.sync.updateSubscriptions; + this.sync.updateSubscriptions(this.resolvedPort, subscriptions); } } From 9c4434cc5d146e9193137674a076bab472364d50 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:35:04 +0200 Subject: [PATCH 11/21] Two more tests --- .../common/src/client/ConnectionManager.ts | 9 +++-- packages/node/tests/sync-stream.test.ts | 36 +++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 47a7bf71d..d48519254 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -355,6 +355,8 @@ class ActiveSubscription { } class SyncStreamSubscriptionHandle implements SyncStreamSubscription { + private active: boolean = false; + constructor(readonly subscription: ActiveSubscription) { subscription.refcount++; _finalizer?.register(this, subscription); @@ -373,8 +375,11 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription { } unsubscribe(): void { - _finalizer?.unregister(this); - this.subscription.decrementRefCount(); + if (this.active) { + this.active = false; + _finalizer?.unregister(this); + this.subscription.decrementRefCount(); + } } } diff --git a/packages/node/tests/sync-stream.test.ts b/packages/node/tests/sync-stream.test.ts index c45234ad9..6cd3caf67 100644 --- a/packages/node/tests/sync-stream.test.ts +++ b/packages/node/tests/sync-stream.test.ts @@ -146,4 +146,40 @@ describe('Sync streams', () => { let status = await statusPromise; expect(status.forStream(subscription)).not.toBeNull(); }); + + mockSyncServiceTest('unsubscribing multiple times has no effect', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('a').subscribe(); + const aAgain = await database.syncStream('a').subscribe(); + a.unsubscribe(); + a.unsubscribe(); + + // Pretend the streams are expired - they should still be requested because the core extension extends the lifetime + // of streams currently referenced before connecting. + await database.execute('UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000'); + await database.connect(new TestConnector(), defaultOptions); + + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [{}] + } + }); + aAgain.unsubscribe(); + }); + + mockSyncServiceTest('unsubscribeAll', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('a').subscribe(); + database.syncStream('a').unsubscribeAll(); + + await database.connect(new TestConnector(), defaultOptions); + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [] + } + }); + a.unsubscribe(); + }); }); From d60318140dd9546437d6b331ffeb4c5abd1c0830 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 15:11:51 +0200 Subject: [PATCH 12/21] Fix feature detection for finalization registry --- packages/common/src/client/ConnectionManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index d48519254..c262e2540 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -384,7 +384,7 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription { } const _finalizer = - FinalizationRegistry != null + 'FinalizationRegistry' in globalThis ? new FinalizationRegistry((sub) => { sub.logger.warn( `A subscription to ${sub.name} with params ${JSON.stringify(sub.parameters)} leaked! Please ensure calling unsubscribe() when you don't need a subscription anymore. For global subscriptions, consider storing them in global fields to avoid this warning.` From ec4f56498a1422eb375833ebabae8352310e7765 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 10 Sep 2025 11:57:04 +0200 Subject: [PATCH 13/21] Review feedback --- .../common/src/client/sync/bucket/BucketStorageAdapter.ts | 5 +++++ packages/common/src/client/sync/stream/core-instruction.ts | 2 ++ 2 files changed, 7 insertions(+) diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index 3ecd8848c..4287c399a 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -50,6 +50,11 @@ export interface BucketChecksum { * Count of operations - informational only. */ count?: number; + /** + * The JavaScript client does not use this field, which is why it's defined to be `any`. We rely on the structure of + * this interface to pass custom `BucketChecksum`s to the Rust client in unit tests, which so all fields need to be + * present. + */ subscriptions?: any; } diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index 335b8da11..2879b883c 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -86,6 +86,8 @@ export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOp connected: status.connected, connecting: status.connecting, dataFlow: { + // We expose downloading as a boolean field, the core extension reports download information as a nullable + // download status. When that status is non-null, a download is in progress. downloading: status.downloading != null, downloadProgress: status.downloading?.buckets, internalStreamSubscriptions: status.streams From 917aba48a18c52a7f0279510a46598cc18f64af0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 11 Sep 2025 14:53:34 +0200 Subject: [PATCH 14/21] Avoid Iterator.map --- packages/common/src/client/ConnectionManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index c262e2540..91549cfb0 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -325,7 +325,7 @@ export class ConnectionManager extends BaseObserver { } private get activeStreams() { - return [...this.locallyActiveSubscriptions.values().map((a) => ({ name: a.name, params: a.parameters }))]; + return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters })); } private subscriptionsMayHaveChanged() { From f8eac12fa6b4e3c1c47bec42252c07c59ec2c3ea Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 16 Sep 2025 13:50:45 +0200 Subject: [PATCH 15/21] Fix unecessary non-null assertion --- packages/common/src/client/sync/stream/core-instruction.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index 2879b883c..062088380 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -74,7 +74,7 @@ function priorityToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatu return { priority: status.priority, hasSynced: status.has_synced ?? undefined, - lastSyncedAt: status?.last_synced_at != null ? new Date(status!.last_synced_at! * 1000) : undefined + lastSyncedAt: status?.last_synced_at != null ? new Date(status.last_synced_at * 1000) : undefined }; } From 0f62be75a02b62a3a2896638454781811848de19 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 16 Sep 2025 14:00:56 +0200 Subject: [PATCH 16/21] Mark as experimental --- packages/common/src/client/AbstractPowerSyncDatabase.ts | 8 ++++++++ packages/common/src/db/crud/SyncStatus.ts | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index f916e5b5a..5b7fa050c 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -544,6 +544,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver l.statusChanged?.(this.currentStatus)); } + /** + * Create a sync stream to query its status or to subscribe to it. + * + * @param name The name of the stream to subscribe to. + * @param params Optional parameters for the stream subscription. + * @returns A {@link SyncStream} instance that can be subscribed to. + * @experimental Sync streams are currently in alpha. + */ syncStream(name: string, params?: Record): SyncStream { return this.connectionManager.stream(this.subscriptions, name, params ?? null); } diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index 150de8fb2..f5686c037 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -122,6 +122,8 @@ export class SyncStatus { * * This returns null when the database is currently being opened and we don't have reliable information about all * included streams yet. + * + * @experimental Sync streams are currently in alpha. */ get syncStreams(): SyncStreamStatus[] | undefined { return this.options.dataFlow?.internalStreamSubscriptions?.map((core) => new SyncStreamStatusView(this, core)); @@ -129,6 +131,8 @@ export class SyncStatus { /** * If the `stream` appears in {@link syncStreams}, returns the current status for that stream. + * + * @experimental Sync streams are currently in alpha. */ forStream(stream: SyncStreamDescription): SyncStreamStatus | undefined { const asJson = JSON.stringify(stream.parameters); From 93e5bf1acda20ec7a9cd4f13ab8409fc906a670c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 16 Sep 2025 14:30:33 +0200 Subject: [PATCH 17/21] Another one --- packages/common/src/client/sync/stream/core-instruction.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index 062088380..5da4b43ea 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -74,7 +74,7 @@ function priorityToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatu return { priority: status.priority, hasSynced: status.has_synced ?? undefined, - lastSyncedAt: status?.last_synced_at != null ? new Date(status.last_synced_at * 1000) : undefined + lastSyncedAt: status.last_synced_at != null ? new Date(status.last_synced_at * 1000) : undefined }; } From a2067d19084ea07432606730a721e1a27bbcc737 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 16 Sep 2025 17:25:03 +0200 Subject: [PATCH 18/21] Better test logs --- packages/common/src/client/ConnectionManager.ts | 2 +- .../stream/AbstractStreamingSyncImplementation.ts | 11 +++++++++-- packages/node/tests/sync-stream.test.ts | 6 +++++- packages/node/tests/sync.test.ts | 2 -- packages/node/tests/utils.ts | 9 ++++++++- packages/node/vitest.config.ts | 6 +++++- 6 files changed, 28 insertions(+), 8 deletions(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 91549cfb0..7521c31af 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -355,7 +355,7 @@ class ActiveSubscription { } class SyncStreamSubscriptionHandle implements SyncStreamSubscription { - private active: boolean = false; + private active: boolean = true; constructor(readonly subscription: ActiveSubscription) { subscription.refcount++; diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 852a808f5..ab3e37c7c 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1,7 +1,6 @@ import Logger, { ILogger } from 'js-logger'; -import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; -import * as sync_status from '../../../db/crud/SyncStatus.js'; +import { InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js'; @@ -1017,6 +1016,14 @@ The next upload iteration will be delayed.`); async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) { const rawResponse = await adapter.control(op, payload ?? null); + const logger = syncImplementation.logger; + logger.trace( + 'powersync_control', + op, + payload == null || typeof payload == 'string' ? payload : '', + rawResponse + ); + await handleInstructions(JSON.parse(rawResponse)); } diff --git a/packages/node/tests/sync-stream.test.ts b/packages/node/tests/sync-stream.test.ts index 6cd3caf67..becba1be5 100644 --- a/packages/node/tests/sync-stream.test.ts +++ b/packages/node/tests/sync-stream.test.ts @@ -1,4 +1,4 @@ -import { describe, vi, expect } from 'vitest'; +import { describe, vi, expect, onTestFinished } from 'vitest'; import { PowerSyncConnectionOptions, SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; import Logger from 'js-logger'; import { bucket, checkpoint, mockSyncServiceTest, nextStatus, stream, TestConnector } from './utils'; @@ -30,6 +30,10 @@ describe('Sync streams', () => { const database = await syncService.createDatabase(); const a = await database.syncStream('stream', { foo: 'a' }).subscribe(); const b = await database.syncStream('stream', { foo: 'b' }).subscribe({ priority: 1 }); + onTestFinished(() => { + a.unsubscribe(); + b.unsubscribe(); + }); await database.connect(new TestConnector(), defaultOptions); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 6c47eebfa..6d615485d 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -15,8 +15,6 @@ import { } from '@powersync/common'; import Logger from 'js-logger'; -Logger.useDefaults({ defaultLevel: Logger.WARN }); - describe('Sync', () => { describe('js client', () => { defineSyncTests(SyncClientImplementation.JAVASCRIPT); diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 95a057874..9b3002f6f 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -14,11 +14,12 @@ import { PowerSyncDatabase, Schema, StreamingSyncCheckpoint, - StreamingSyncCheckpointComplete, StreamingSyncLine, SyncStatus, Table } from '../lib'; +import { createLogger } from '@powersync/common'; +import Logger from 'js-logger'; export async function createTempDir() { const ostmpdir = os.tmpdir(); @@ -57,12 +58,18 @@ export async function createDatabase( tmpdir: string, options: Partial = {} ): Promise { + const defaultLogger = createLogger('PowerSyncTest', { logLevel: Logger.TRACE }); + (defaultLogger as any).invoke = (_, args) => { + console.log(...args); + }; + const database = new PowerSyncDatabase({ schema: AppSchema, database: { dbFilename: 'test.db', dbLocation: tmpdir }, + logger: defaultLogger, ...options }); await database.init(); diff --git a/packages/node/vitest.config.ts b/packages/node/vitest.config.ts index 94ec60f24..def711060 100644 --- a/packages/node/vitest.config.ts +++ b/packages/node/vitest.config.ts @@ -1,4 +1,8 @@ import { defineConfig } from 'vitest/config'; // We need to define an empty config to be part of the vitest works -export default defineConfig({}); +export default defineConfig({ + test: { + silent: false + } +}); From e365c6f78f2536a68b1cd5bd71d4fd0ac746aea2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 16 Sep 2025 19:12:46 +0200 Subject: [PATCH 19/21] Reduce read workers to improve test reliability --- packages/node/tests/PowerSyncDatabase.test.ts | 10 +++++++++- packages/node/tests/utils.ts | 3 ++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/node/tests/PowerSyncDatabase.test.ts b/packages/node/tests/PowerSyncDatabase.test.ts index 88732522f..b83e729e9 100644 --- a/packages/node/tests/PowerSyncDatabase.test.ts +++ b/packages/node/tests/PowerSyncDatabase.test.ts @@ -43,7 +43,14 @@ databaseTest('links powersync', async ({ database }) => { await database.get('select powersync_rs_version();'); }); -databaseTest('runs queries on multiple threads', async ({ database }) => { +tempDirectoryTest('runs queries on multiple threads', async ({ tmpdir }) => { + const database = new PowerSyncDatabase({ + schema: AppSchema, + database: { + dbFilename: 'test.db', + dbLocation: tmpdir + } + }); const threads = new Set(); const collectWorkerThreadId = async () => { @@ -58,6 +65,7 @@ databaseTest('runs queries on multiple threads', async ({ database }) => { } const res = await Promise.all(queryTasks); + await database.close(); expect(res).toHaveLength(10); expect([...threads]).toHaveLength(5); }); diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 9b3002f6f..33edc35b8 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -67,7 +67,8 @@ export async function createDatabase( schema: AppSchema, database: { dbFilename: 'test.db', - dbLocation: tmpdir + dbLocation: tmpdir, + readWorkerCount: 1 }, logger: defaultLogger, ...options From 181822097b9678b3509ec6adf0afeaa701a513ae Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 17 Sep 2025 10:35:28 +0200 Subject: [PATCH 20/21] Restore demo --- .../.env.local.template | 1 - .../src/app/views/todo-lists/edit/page.tsx | 26 +------------------ .../components/providers/SystemProvider.tsx | 10 ++----- .../src/library/powersync/vite-env.d.ts | 1 - 4 files changed, 3 insertions(+), 35 deletions(-) diff --git a/demos/react-supabase-todolist/.env.local.template b/demos/react-supabase-todolist/.env.local.template index d69bec1d8..dc4088ca7 100644 --- a/demos/react-supabase-todolist/.env.local.template +++ b/demos/react-supabase-todolist/.env.local.template @@ -3,4 +3,3 @@ VITE_SUPABASE_URL=https://foo.supabase.co VITE_SUPABASE_ANON_KEY=foo VITE_POWERSYNC_URL=https://foo.powersync.journeyapps.com -VITE_USE_SYNC_STREAMS=false diff --git a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx index eae5d310c..7a488792f 100644 --- a/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx +++ b/demos/react-supabase-todolist/src/app/views/todo-lists/edit/page.tsx @@ -19,8 +19,7 @@ import { } from '@mui/material'; import Fab from '@mui/material/Fab'; import { usePowerSync, useQuery } from '@powersync/react'; -import { SyncStreamSubscription } from '@powersync/web'; -import React, { Suspense, useEffect } from 'react'; +import React, { Suspense } from 'react'; import { useParams } from 'react-router-dom'; /** @@ -33,29 +32,6 @@ const TodoEditSection = () => { const supabase = useSupabase(); const { id: listID } = useParams(); - if (import.meta.env.VITE_USE_SYNC_STREAMS == 'true') { - useEffect(() => { - let active = true; - let subscription: SyncStreamSubscription | null = null; - - powerSync - .syncStream('todos', { list: listID }) - .subscribe() - .then((sub) => { - if (!active) { - sub.unsubscribe(); - } else { - subscription = sub; - } - }); - - return () => { - active = false; - subscription?.unsubscribe(); - }; - }, [listID]); - } - const { data: [listRecord] } = useQuery<{ name: string }>( diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 3a54211ee..8a3f3c209 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,13 +3,7 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { - createBaseLogger, - DifferentialWatchedQuery, - LogLevel, - PowerSyncDatabase, - SyncClientImplementation -} from '@powersync/web'; +import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -74,7 +68,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => { const l = connector.registerListener({ initialized: () => {}, sessionStarted: () => { - powerSync.connect(connector, { clientImplementation: SyncClientImplementation.RUST }); + powerSync.connect(connector); } }); diff --git a/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts b/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts index 0fdbf8508..e3e71b5ba 100644 --- a/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts +++ b/demos/react-supabase-todolist/src/library/powersync/vite-env.d.ts @@ -4,7 +4,6 @@ interface ImportMetaEnv { readonly VITE_SUPABASE_URL: string; readonly VITE_SUPABASE_ANON_KEY: string; readonly VITE_POWERSYNC_URL: string; - readonly VITE_USE_SYNC_STREAMS: string; } interface ImportMeta { From 8046cae1154f09c4c4810ed3a0a960cb88371635 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 17 Sep 2025 10:44:54 +0200 Subject: [PATCH 21/21] Add changeset --- .changeset/angry-ducks-sneeze.md | 7 +++++++ packages/node/tests/utils.ts | 2 ++ 2 files changed, 9 insertions(+) create mode 100644 .changeset/angry-ducks-sneeze.md diff --git a/.changeset/angry-ducks-sneeze.md b/.changeset/angry-ducks-sneeze.md new file mode 100644 index 000000000..5ed7d7714 --- /dev/null +++ b/.changeset/angry-ducks-sneeze.md @@ -0,0 +1,7 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +'@powersync/web': minor +--- + +Add alpha support for sync streams, allowing different sets of data to be synced dynamically. diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 33edc35b8..36021726b 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -68,6 +68,8 @@ export async function createDatabase( database: { dbFilename: 'test.db', dbLocation: tmpdir, + // Using a single read worker (instead of multiple, the default) seems to improve the reliability of tests in GH + // actions. So far, we've not been able to reproduce these failures locally. readWorkerCount: 1 }, logger: defaultLogger,