diff --git a/.changeset/angry-ducks-sneeze.md b/.changeset/angry-ducks-sneeze.md new file mode 100644 index 000000000..5ed7d7714 --- /dev/null +++ b/.changeset/angry-ducks-sneeze.md @@ -0,0 +1,7 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +'@powersync/web': minor +--- + +Add alpha support for sync streams, allowing different sets of data to be synced dynamically. diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 43ad99ec4..5b7fa050c 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -9,14 +9,17 @@ import { UpdateNotification, isBatchedUpdateNotification } from '../db/DBAdapter.js'; -import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; -import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js'; +import { SyncStatus } from '../db/crud/SyncStatus.js'; import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js'; -import { ConnectionManager } from './ConnectionManager.js'; +import { + ConnectionManager, + CreateSyncImplementationOptions, + InternalSubscriptionAdapter +} from './ConnectionManager.js'; import { CustomQuery } from './CustomQuery.js'; import { ArrayQueryDefinition, Query } from './Query.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; @@ -40,6 +43,8 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js'; import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js'; import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js'; import { WatchedQueryComparator } from './watched/processors/comparators.js'; +import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js'; +import { SyncStream } from './sync/sync-streams.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -182,6 +187,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver; protected connectionManager: ConnectionManager; + private subscriptions: InternalSubscriptionAdapter; get syncStreamImplementation() { return this.connectionManager.syncStreamImplementation; @@ -236,10 +242,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver this.waitForStatus(predicate, abort), + resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(), + rustSubscriptionsCommand: async (payload) => { + await this.writeTransaction((tx) => { + return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]); + }); + } + }; this.connectionManager = new ConnectionManager({ createSyncImplementation: async (connector, options) => { await this.waitForReady(); - return this.runExclusive(async () => { const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options)); const onDispose = sync.registerListener({ @@ -304,7 +318,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver status.hasSynced : (status: SyncStatus) => status.statusForPriority(priority).hasSynced; - if (statusMatches(this.currentStatus)) { + return this.waitForStatus(statusMatches, signal); + } + + private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise { + if (predicate(this.currentStatus)) { return; } + return new Promise((resolve) => { const dispose = this.registerListener({ statusChanged: (status) => { - if (statusMatches(status)) { + if (predicate(status)) { dispose(); resolve(); } @@ -373,7 +392,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver cb.initialized?.()); @@ -403,30 +422,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver( - 'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC' - ); - let lastCompleteSync: Date | undefined; - const priorityStatusEntries: SyncPriorityStatus[] = []; - - for (const { priority, last_synced_at } of result) { - const parsedDate = new Date(last_synced_at + 'Z'); + protected async resolveOfflineSyncStatus() { + const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r'); + const parsed = JSON.parse(result.r) as CoreSyncStatus; - if (priority == FULL_SYNC_PRIORITY) { - // This lowest-possible priority represents a complete sync. - lastCompleteSync = parsedDate; - } else { - priorityStatusEntries.push({ priority, hasSynced: true, lastSyncedAt: parsedDate }); - } - } - - const hasSynced = lastCompleteSync != null; const updatedStatus = new SyncStatus({ ...this.currentStatus.toJSON(), - hasSynced, - priorityStatusEntries, - lastSyncedAt: lastCompleteSync + ...coreStatusToJs(parsed) }); if (!updatedStatus.isEqual(this.currentStatus)) { @@ -471,7 +473,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver l.statusChanged?.(this.currentStatus)); } + /** + * Create a sync stream to query its status or to subscribe to it. + * + * @param name The name of the stream to subscribe to. + * @param params Optional parameters for the stream subscription. + * @returns A {@link SyncStream} instance that can be subscribed to. + * @experimental Sync streams are currently in alpha. + */ + syncStream(name: string, params?: Record): SyncStream { + return this.connectionManager.stream(this.subscriptions, name, params ?? null); + } + /** * Close the database, releasing resources. * diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index b322d4c33..7521c31af 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -2,9 +2,18 @@ import { ILogger } from 'js-logger'; import { BaseListener, BaseObserver } from '../utils/BaseObserver.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; import { + AdditionalConnectionOptions, InternalConnectionOptions, - StreamingSyncImplementation + StreamingSyncImplementation, + SubscribedStream } from './sync/stream/AbstractStreamingSyncImplementation.js'; +import { + SyncStream, + SyncStreamDescription, + SyncStreamSubscribeOptions, + SyncStreamSubscription +} from './sync/sync-streams.js'; +import { SyncStatus } from '../db/crud/SyncStatus.js'; /** * @internal @@ -18,14 +27,30 @@ export interface ConnectionManagerSyncImplementationResult { onDispose: () => Promise | void; } +/** + * The subset of {@link AbstractStreamingSyncImplementationOptions} managed by the connection manager. + * + * @internal + */ +export interface CreateSyncImplementationOptions extends AdditionalConnectionOptions { + subscriptions: SubscribedStream[]; +} + +export interface InternalSubscriptionAdapter { + firstStatusMatching(predicate: (status: SyncStatus) => any, abort?: AbortSignal): Promise; + resolveOfflineSyncStatus(): Promise; + rustSubscriptionsCommand(payload: any): Promise; +} + /** * @internal */ export interface ConnectionManagerOptions { createSyncImplementation( connector: PowerSyncBackendConnector, - options: InternalConnectionOptions + options: CreateSyncImplementationOptions ): Promise; + logger: ILogger; } @@ -76,6 +101,13 @@ export class ConnectionManager extends BaseObserver { */ protected syncDisposer: (() => Promise | void) | null; + /** + * Subscriptions managed in this connection manager. + * + * On the web, these local subscriptions are merged across tabs by a shared worker. + */ + private locallyActiveSubscriptions = new Map(); + constructor(protected options: ConnectionManagerOptions) { super(); this.connectingPromise = null; @@ -102,7 +134,7 @@ export class ConnectionManager extends BaseObserver { // Update pending options to the latest values this.pendingConnectionOptions = { connector, - options: options ?? {} + options }; // Disconnecting here provides aborting in progress connection attempts. @@ -169,7 +201,11 @@ export class ConnectionManager extends BaseObserver { appliedOptions = options; this.pendingConnectionOptions = null; - const { sync, onDispose } = await this.options.createSyncImplementation(connector, options); + + const { sync, onDispose } = await this.options.createSyncImplementation(connector, { + subscriptions: this.activeStreams, + ...options + }); this.iterateListeners((l) => l.syncStreamCreated?.(sync)); this.syncStreamImplementation = sync; this.syncDisposer = onDispose; @@ -236,4 +272,122 @@ export class ConnectionManager extends BaseObserver { await sync?.dispose(); await disposer?.(); } + + stream(adapter: InternalSubscriptionAdapter, name: string, parameters: Record | null): SyncStream { + const desc = { name, parameters } satisfies SyncStreamDescription; + + const waitForFirstSync = (abort?: AbortSignal) => { + return adapter.firstStatusMatching((s) => s.forStream(desc)?.subscription.hasSynced, abort); + }; + + return { + ...desc, + subscribe: async (options?: SyncStreamSubscribeOptions) => { + // NOTE: We also run this command if a subscription already exists, because this increases the expiry date + // (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl). + await adapter.rustSubscriptionsCommand({ + subscribe: { + stream: { + name, + params: parameters + }, + ttl: options?.ttl, + priority: options?.priority + } + }); + + if (!this.syncStreamImplementation) { + // We're not connected. So, update the offline sync status to reflect the new subscription. + // (With an active iteration, the sync client would include it in its state). + await adapter.resolveOfflineSyncStatus(); + } + + const key = `${name}|${JSON.stringify(parameters)}`; + let subscription = this.locallyActiveSubscriptions.get(key); + if (subscription == null) { + const clearSubscription = () => { + this.locallyActiveSubscriptions.delete(key); + this.subscriptionsMayHaveChanged(); + }; + + subscription = new ActiveSubscription(name, parameters, this.logger, waitForFirstSync, clearSubscription); + this.locallyActiveSubscriptions.set(key, subscription); + this.subscriptionsMayHaveChanged(); + } + + return new SyncStreamSubscriptionHandle(subscription); + }, + unsubscribeAll: async () => { + await adapter.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } }); + this.subscriptionsMayHaveChanged(); + } + }; + } + + private get activeStreams() { + return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters })); + } + + private subscriptionsMayHaveChanged() { + if (this.syncStreamImplementation) { + this.syncStreamImplementation.updateSubscriptions(this.activeStreams); + } + } +} + +class ActiveSubscription { + refcount: number = 0; + + constructor( + readonly name: string, + readonly parameters: Record | null, + readonly logger: ILogger, + readonly waitForFirstSync: (abort?: AbortSignal) => Promise, + private clearSubscription: () => void + ) {} + + decrementRefCount() { + this.refcount--; + if (this.refcount == 0) { + this.clearSubscription(); + } + } +} + +class SyncStreamSubscriptionHandle implements SyncStreamSubscription { + private active: boolean = true; + + constructor(readonly subscription: ActiveSubscription) { + subscription.refcount++; + _finalizer?.register(this, subscription); + } + + get name() { + return this.subscription.name; + } + + get parameters() { + return this.subscription.parameters; + } + + waitForFirstSync(abort?: AbortSignal): Promise { + return this.subscription.waitForFirstSync(abort); + } + + unsubscribe(): void { + if (this.active) { + this.active = false; + _finalizer?.unregister(this); + this.subscription.decrementRefCount(); + } + } } + +const _finalizer = + 'FinalizationRegistry' in globalThis + ? new FinalizationRegistry((sub) => { + sub.logger.warn( + `A subscription to ${sub.name} with params ${JSON.stringify(sub.parameters)} leaked! Please ensure calling unsubscribe() when you don't need a subscription anymore. For global subscriptions, consider storing them in global fields to avoid this warning.` + ); + }) + : null; diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index 3c24c059c..4287c399a 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -12,6 +12,7 @@ export interface Checkpoint { last_op_id: OpId; buckets: BucketChecksum[]; write_checkpoint?: string; + streams?: any[]; } export interface BucketState { @@ -49,6 +50,12 @@ export interface BucketChecksum { * Count of operations - informational only. */ count?: number; + /** + * The JavaScript client does not use this field, which is why it's defined to be `any`. We rely on the structure of + * this interface to pass custom `BucketChecksum`s to the Rust client in unit tests, which so all fields need to be + * present. + */ + subscriptions?: any; } export enum PSInternalTable { @@ -65,7 +72,8 @@ export enum PowerSyncControlCommand { STOP = 'stop', START = 'start', NOTIFY_TOKEN_REFRESHED = 'refreshed_token', - NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload' + NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload', + UPDATE_SUBSCRIPTIONS = 'update_subscriptions' } export interface BucketStorageListener extends BaseListener { diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index ef282030b..ab3e37c7c 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1,7 +1,6 @@ import Logger, { ILogger } from 'js-logger'; -import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; -import * as sync_status from '../../../db/crud/SyncStatus.js'; +import { InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, BaseObserverInterface, Disposable } from '../../../utils/BaseObserver.js'; @@ -17,7 +16,7 @@ import { import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; -import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js'; +import { coreStatusToJs, EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js'; import { BucketRequest, CrudUploadNotification, @@ -95,8 +94,9 @@ export interface LockOptions { signal?: AbortSignal; } -export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions { +export interface AbstractStreamingSyncImplementationOptions extends RequiredAdditionalConnectionOptions { adapter: BucketStorageAdapter; + subscriptions: SubscribedStream[]; uploadCrud: () => Promise; /** * An identifier for which PowerSync DB this sync implementation is @@ -155,6 +155,13 @@ export interface BaseConnectionOptions { */ params?: Record; + /** + * Whether to include streams that have `auto_subscribe: true` in their definition. + * + * This defaults to `true`. + */ + includeDefaultStreams?: boolean; + /** * The serialized schema - mainly used to forward information about raw tables to the sync client. */ @@ -177,7 +184,9 @@ export interface AdditionalConnectionOptions { } /** @internal */ -export type RequiredAdditionalConnectionOptions = Required; +export interface RequiredAdditionalConnectionOptions extends Required { + subscriptions: SubscribedStream[]; +} export interface StreamingSyncImplementation extends BaseObserverInterface, @@ -200,6 +209,7 @@ export interface StreamingSyncImplementation waitForReady(): Promise; waitForStatus(status: SyncStatusOptions): Promise; waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise; + updateSubscriptions(subscriptions: SubscribedStream[]): void; } export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000; @@ -217,7 +227,13 @@ export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptio clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION, fetchStrategy: FetchStrategy.Buffered, params: {}, - serializedSchema: undefined + serializedSchema: undefined, + includeDefaultStreams: true +}; + +export type SubscribedStream = { + name: string; + params: Record | null; }; // The priority we assume when we receive checkpoint lines where no priority is set. @@ -239,16 +255,19 @@ export abstract class AbstractStreamingSyncImplementation protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; protected logger: ILogger; + private activeStreams: SubscribedStream[]; private isUploadingCrud: boolean = false; private notifyCompletedUploads?: () => void; + private handleActiveStreamsChange?: () => void; syncStatus: SyncStatus; triggerCrudUpload: () => void; constructor(options: AbstractStreamingSyncImplementationOptions) { super(); - this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options }; + this.options = options; + this.activeStreams = options.subscriptions; this.logger = options.logger ?? Logger.get('PowerSyncStream'); this.syncStatus = new SyncStatus({ @@ -530,11 +549,13 @@ The next upload iteration will be delayed.`); while (true) { this.updateSyncStatus({ connecting: true }); let shouldDelayRetry = true; + let result: RustIterationResult | null = null; + try { if (signal?.aborted) { break; } - await this.streamingSyncIteration(nestedAbortController.signal, options); + result = await this.streamingSyncIteration(nestedAbortController.signal, options); // Continue immediately, streamingSyncIteration will wait before completing if necessary. } catch (ex) { /** @@ -568,14 +589,16 @@ The next upload iteration will be delayed.`); nestedAbortController = new AbortController(); } - this.updateSyncStatus({ - connected: false, - connecting: true // May be unnecessary - }); + if (result?.immediateRestart != true) { + this.updateSyncStatus({ + connected: false, + connecting: true // May be unnecessary + }); - // On error, wait a little before retrying - if (shouldDelayRetry) { - await this.delayRetry(nestedAbortController.signal); + // On error, wait a little before retrying + if (shouldDelayRetry) { + await this.delayRetry(nestedAbortController.signal); + } } } } @@ -623,8 +646,11 @@ The next upload iteration will be delayed.`); } } - protected async streamingSyncIteration(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise { - await this.obtainLock({ + protected streamingSyncIteration( + signal: AbortSignal, + options?: PowerSyncConnectionOptions + ): Promise { + return this.obtainLock({ type: LockType.SYNC, signal, callback: async () => { @@ -637,9 +663,10 @@ The next upload iteration will be delayed.`); if (clientImplementation == SyncClientImplementation.JAVASCRIPT) { await this.legacyStreamingSyncIteration(signal, resolvedOptions); + return null; } else { await this.requireKeyFormat(true); - await this.rustSyncIteration(signal, resolvedOptions); + return await this.rustSyncIteration(signal, resolvedOptions); } } }); @@ -894,12 +921,16 @@ The next upload iteration will be delayed.`); return; } - private async rustSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) { + private async rustSyncIteration( + signal: AbortSignal, + resolvedOptions: RequiredPowerSyncConnectionOptions + ): Promise { const syncImplementation = this; const adapter = this.options.adapter; const remote = this.options.remote; let receivingLines: Promise | null = null; let hadSyncLine = false; + let hideDisconnectOnRestart = false; if (signal.aborted) { throw new AbortOperation('Connection request has been aborted'); @@ -985,6 +1016,14 @@ The next upload iteration will be delayed.`); async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) { const rawResponse = await adapter.control(op, payload ?? null); + const logger = syncImplementation.logger; + logger.trace( + 'powersync_control', + op, + payload == null || typeof payload == 'string' ? payload : '', + rawResponse + ); + await handleInstructions(JSON.parse(rawResponse)); } @@ -1002,29 +1041,7 @@ The next upload iteration will be delayed.`); break; } } else if ('UpdateSyncStatus' in instruction) { - function coreStatusToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatus { - return { - priority: status.priority, - hasSynced: status.has_synced ?? undefined, - lastSyncedAt: status?.last_synced_at != null ? new Date(status!.last_synced_at! * 1000) : undefined - }; - } - - const info = instruction.UpdateSyncStatus.status; - const coreCompleteSync = info.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); - const completeSync = coreCompleteSync != null ? coreStatusToJs(coreCompleteSync) : null; - - syncImplementation.updateSyncStatus({ - connected: info.connected, - connecting: info.connecting, - dataFlow: { - downloading: info.downloading != null, - downloadProgress: info.downloading?.buckets - }, - lastSyncedAt: completeSync?.lastSyncedAt, - hasSynced: completeSync?.hasSynced, - priorityStatusEntries: info.priority_status.map(coreStatusToJs) - }); + syncImplementation.updateSyncStatus(coreStatusToJs(instruction.UpdateSyncStatus.status)); } else if ('EstablishSyncStream' in instruction) { if (receivingLines != null) { // Already connected, this shouldn't happen during a single iteration. @@ -1050,6 +1067,7 @@ The next upload iteration will be delayed.`); } } else if ('CloseSyncStream' in instruction) { abortController.abort(); + hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect; } else if ('FlushFileSystem' in instruction) { // Not necessary on JS platforms. } else if ('DidCompleteSync' in instruction) { @@ -1068,7 +1086,11 @@ The next upload iteration will be delayed.`); } try { - const options: any = { parameters: resolvedOptions.params }; + const options: any = { + parameters: resolvedOptions.params, + active_streams: this.activeStreams, + include_defaults: resolvedOptions.includeDefaultStreams + }; if (resolvedOptions.serializedSchema) { options.schema = resolvedOptions.serializedSchema; } @@ -1080,11 +1102,21 @@ The next upload iteration will be delayed.`); controlInvocations.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); } }; + this.handleActiveStreamsChange = () => { + if (controlInvocations && !controlInvocations?.closed) { + controlInvocations.enqueueData({ + command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS, + payload: JSON.stringify(this.activeStreams) + }); + } + }; await receivingLines; } finally { - this.notifyCompletedUploads = undefined; + this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined; await stop(); } + + return { immediateRestart: hideDisconnectOnRestart }; } private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) { @@ -1209,9 +1241,18 @@ The next upload iteration will be delayed.`); timeoutId = setTimeout(endDelay, retryDelayMs); }); } + + updateSubscriptions(subscriptions: SubscribedStream[]): void { + this.activeStreams = subscriptions; + this.handleActiveStreamsChange?.(); + } } interface EnqueuedCommand { command: PowerSyncControlCommand; payload?: Uint8Array | string; } + +interface RustIterationResult { + immediateRestart: boolean; +} diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts index db1f56e81..5da4b43ea 100644 --- a/packages/common/src/client/sync/stream/core-instruction.ts +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -1,4 +1,6 @@ import { StreamingSyncRequest } from './streaming-sync-types.js'; +import * as sync_status from '../../../db/crud/SyncStatus.js'; +import { FULL_SYNC_PRIORITY } from '../../../db/crud/SyncProgress.js'; /** * An internal instruction emitted by the sync client in the core extension in response to the JS @@ -9,7 +11,7 @@ export type Instruction = | { UpdateSyncStatus: UpdateSyncStatus } | { EstablishSyncStream: EstablishSyncStream } | { FetchCredentials: FetchCredentials } - | { CloseSyncStream: any } + | { CloseSyncStream: { hide_disconnect: boolean } } | { FlushFileSystem: any } | { DidCompleteSync: any }; @@ -31,6 +33,20 @@ export interface CoreSyncStatus { connecting: boolean; priority_status: SyncPriorityStatus[]; downloading: DownloadProgress | null; + streams: CoreStreamSubscription[]; +} + +/// An `ActiveStreamSubscription` from the core extension + serialized progress information. +export interface CoreStreamSubscription { + progress: { total: number; downloaded: number }; + name: string; + parameters: any; + priority: number | null; + active: boolean; + is_default: boolean; + has_explicit_subscription: boolean; + expires_at: number | null; + last_synced_at: number | null; } export interface SyncPriorityStatus { @@ -53,3 +69,31 @@ export interface BucketProgress { export interface FetchCredentials { did_expire: boolean; } + +function priorityToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatus { + return { + priority: status.priority, + hasSynced: status.has_synced ?? undefined, + lastSyncedAt: status.last_synced_at != null ? new Date(status.last_synced_at * 1000) : undefined + }; +} + +export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOptions { + const coreCompleteSync = status.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); + const completeSync = coreCompleteSync != null ? priorityToJs(coreCompleteSync) : null; + + return { + connected: status.connected, + connecting: status.connecting, + dataFlow: { + // We expose downloading as a boolean field, the core extension reports download information as a nullable + // download status. When that status is non-null, a download is in progress. + downloading: status.downloading != null, + downloadProgress: status.downloading?.buckets, + internalStreamSubscriptions: status.streams + }, + lastSyncedAt: completeSync?.lastSyncedAt, + hasSynced: completeSync?.hasSynced, + priorityStatusEntries: status.priority_status.map(priorityToJs) + }; +} diff --git a/packages/common/src/client/sync/sync-streams.ts b/packages/common/src/client/sync/sync-streams.ts new file mode 100644 index 000000000..29ba20226 --- /dev/null +++ b/packages/common/src/client/sync/sync-streams.ts @@ -0,0 +1,107 @@ +import { AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js'; + +/** + * A description of a sync stream, consisting of its {@link name} and the {@link parameters} used when subscribing. + */ +export interface SyncStreamDescription { + /** + * The name of the stream as it appears in the stream definition for the PowerSync service. + */ + name: string; + + /** + * The parameters used to subscribe to the stream, if any. + * + * The same stream can be subscribed to multiple times with different parameters. + */ + parameters: Record | null; +} + +/** + * Information about a subscribed sync stream. + * + * This includes the {@link SyncStreamDescription}, along with information about the current sync status. + */ +export interface SyncSubscriptionDescription extends SyncStreamDescription { + active: boolean; + /** + * Whether this stream subscription is included by default, regardless of whether the stream has explicitly been + * subscribed to or not. + * + * It's possible for both {@link isDefault} and {@link hasExplicitSubscription} to be true at the same time - this + * happens when a default stream was subscribed explicitly. + */ + isDefault: boolean; + /** + * Whether this stream has been subscribed to explicitly. + * + * It's possible for both {@link isDefault} and {@link hasExplicitSubscription} to be true at the same time - this + * happens when a default stream was subscribed explicitly. + */ + hasExplicitSubscription: boolean; + /** + * For sync streams that have a time-to-live, the current time at which the stream would expire if not subscribed to + * again. + */ + expiresAt: Date | null; + /** + * Whether this stream subscription has been synced at least once. + */ + hasSynced: boolean; + /** + * If {@link hasSynced} is true, the last time data from this stream has been synced. + */ + lastSyncedAt: Date | null; +} + +export interface SyncStreamSubscribeOptions { + /** + * A "time to live" for this stream subscription, in seconds. + * + * The TTL control when a stream gets evicted after not having an active {@link SyncStreamSubscription} object + * attached to it. + */ + ttl?: number; + /** + * A priority to assign to this subscription. This overrides the default priority that may have been set on streams. + * + * For details on priorities, see [priotized sync](https://docs.powersync.com/usage/use-case-examples/prioritized-sync). + */ + priority?: 0 | 1 | 2 | 3; +} + +/** + * A handle to a {@link SyncStreamDescription} that allows subscribing to the stream. + * + * To obtain an instance of {@link SyncStream}, call {@link AbstractPowerSyncDatabase.syncStream}. + */ +export interface SyncStream extends SyncStreamDescription { + /** + * Adds a subscription to this stream, requesting it to be included when connecting to the sync service. + * + * You should keep a reference to the returned {@link SyncStreamSubscription} object along as you need data for that + * stream. As soon as {@link SyncStreamSubscription.unsubscribe} is called for all subscriptions on this stream + * (including subscriptions created on other tabs), the {@link SyncStreamSubscribeOptions.ttl} starts ticking and will + * eventually evict the stream (unless {@link subscribe} is called again). + */ + subscribe(options?: SyncStreamSubscribeOptions): Promise; + + /** + * Clears all subscriptions attached to this stream and resets the TTL for the stream. + * + * This is a potentially dangerous operations, as it interferes with other stream subscriptions. + */ + unsubscribeAll(): Promise; +} + +export interface SyncStreamSubscription extends SyncStreamDescription { + /** + * A promise that resolves once data from in this sync stream has been synced and applied. + */ + waitForFirstSync(abort?: AbortSignal): Promise; + + /** + * Removes this stream subscription. + */ + unsubscribe(): void; +} diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index 4c687e18d..f5686c037 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -1,5 +1,7 @@ +import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js'; import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js'; -import { InternalProgressInformation, SyncProgress } from './SyncProgress.js'; +import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js'; +import { SyncStreamDescription, SyncSubscriptionDescription } from '../../client/sync/sync-streams.js'; export type SyncDataFlowStatus = Partial<{ downloading: boolean; @@ -21,6 +23,7 @@ export type SyncDataFlowStatus = Partial<{ * Please use the {@link SyncStatus#downloadProgress} property to track sync progress. */ downloadProgress: InternalProgressInformation | null; + internalStreamSubscriptions: CoreStreamSubscription[] | null; }>; export interface SyncPriorityStatus { @@ -114,6 +117,32 @@ export class SyncStatus { ); } + /** + * All sync streams currently being tracked in teh database. + * + * This returns null when the database is currently being opened and we don't have reliable information about all + * included streams yet. + * + * @experimental Sync streams are currently in alpha. + */ + get syncStreams(): SyncStreamStatus[] | undefined { + return this.options.dataFlow?.internalStreamSubscriptions?.map((core) => new SyncStreamStatusView(this, core)); + } + + /** + * If the `stream` appears in {@link syncStreams}, returns the current status for that stream. + * + * @experimental Sync streams are currently in alpha. + */ + forStream(stream: SyncStreamDescription): SyncStreamStatus | undefined { + const asJson = JSON.stringify(stream.parameters); + const raw = this.options.dataFlow?.internalStreamSubscriptions?.find( + (r) => r.name == stream.name && asJson == JSON.stringify(r.parameters) + ); + + return raw && new SyncStreamStatusView(this, raw); + } + /** * Provides sync status information for all bucket priorities, sorted by priority (highest first). * @@ -232,3 +261,48 @@ export class SyncStatus { return b.priority - a.priority; // Reverse because higher priorities have lower numbers } } + +/** + * Information about a sync stream subscription. + */ +export interface SyncStreamStatus { + progress: ProgressWithOperations | null; + subscription: SyncSubscriptionDescription; + priority: number | null; +} + +class SyncStreamStatusView implements SyncStreamStatus { + subscription: SyncSubscriptionDescription; + + constructor( + private status: SyncStatus, + private core: CoreStreamSubscription + ) { + this.subscription = { + name: core.name, + parameters: core.parameters, + active: core.active, + isDefault: core.is_default, + hasExplicitSubscription: core.has_explicit_subscription, + expiresAt: core.expires_at != null ? new Date(core.expires_at * 1000) : null, + hasSynced: core.last_synced_at != null, + lastSyncedAt: core.last_synced_at != null ? new Date(core.last_synced_at * 1000) : null + }; + } + + get progress() { + if (this.status.dataFlowStatus.downloadProgress == null) { + // Don't make download progress public if we're not currently downloading. + return null; + } + + const { total, downloaded } = this.core.progress; + const progress = total == 0 ? 0.0 : downloaded / total; + + return { totalOperations: total, downloadedOperations: downloaded, downloadedFraction: progress }; + } + + get priority() { + return this.core.priority; + } +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 2e886155d..7173a2ece 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -18,6 +18,7 @@ export * from './client/sync/bucket/SyncDataBucket.js'; export * from './client/sync/stream/AbstractRemote.js'; export * from './client/sync/stream/AbstractStreamingSyncImplementation.js'; export * from './client/sync/stream/streaming-sync-types.js'; +export * from './client/sync/sync-streams.js'; export * from './client/ConnectionManager.js'; export { ProgressWithOperations, SyncProgress } from './db/crud/SyncProgress.js'; diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index 4ac7969b6..e2e2d06bf 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -9,6 +9,7 @@ import { PowerSyncConnectionOptions, PowerSyncDatabaseOptions, PowerSyncDatabaseOptionsWithSettings, + RequiredAdditionalConnectionOptions, SqliteBucketStorage, SQLOpenFactory } from '@powersync/common'; @@ -76,7 +77,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { protected generateSyncStreamImplementation( connector: PowerSyncBackendConnector, - options: NodeAdditionalConnectionOptions + options: RequiredAdditionalConnectionOptions & NodeAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { const logger = this.logger; const remote = new NodeRemote(connector, logger, { @@ -91,8 +92,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { await this.waitForReady(); await connector.uploadData(this); }, - retryDelayMs: this.options.retryDelayMs, - crudUploadThrottleMs: this.options.crudUploadThrottleMs, + ...options, identifier: this.database.name, logger }); diff --git a/packages/node/tests/PowerSyncDatabase.test.ts b/packages/node/tests/PowerSyncDatabase.test.ts index 88732522f..b83e729e9 100644 --- a/packages/node/tests/PowerSyncDatabase.test.ts +++ b/packages/node/tests/PowerSyncDatabase.test.ts @@ -43,7 +43,14 @@ databaseTest('links powersync', async ({ database }) => { await database.get('select powersync_rs_version();'); }); -databaseTest('runs queries on multiple threads', async ({ database }) => { +tempDirectoryTest('runs queries on multiple threads', async ({ tmpdir }) => { + const database = new PowerSyncDatabase({ + schema: AppSchema, + database: { + dbFilename: 'test.db', + dbLocation: tmpdir + } + }); const threads = new Set(); const collectWorkerThreadId = async () => { @@ -58,6 +65,7 @@ databaseTest('runs queries on multiple threads', async ({ database }) => { } const res = await Promise.all(queryTasks); + await database.close(); expect(res).toHaveLength(10); expect([...threads]).toHaveLength(5); }); diff --git a/packages/node/tests/sync-stream.test.ts b/packages/node/tests/sync-stream.test.ts new file mode 100644 index 000000000..becba1be5 --- /dev/null +++ b/packages/node/tests/sync-stream.test.ts @@ -0,0 +1,189 @@ +import { describe, vi, expect, onTestFinished } from 'vitest'; +import { PowerSyncConnectionOptions, SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; +import Logger from 'js-logger'; +import { bucket, checkpoint, mockSyncServiceTest, nextStatus, stream, TestConnector } from './utils'; + +Logger.useDefaults({ defaultLevel: Logger.WARN }); + +describe('Sync streams', () => { + const defaultOptions = { + clientImplementation: SyncClientImplementation.RUST, + connectionMethod: SyncStreamConnectionMethod.HTTP + } satisfies PowerSyncConnectionOptions; + + mockSyncServiceTest('can disable default streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + await database.connect(new TestConnector(), { + includeDefaultStreams: false, + ...defaultOptions + }); + + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: false, + subscriptions: [] + } + }); + }); + + mockSyncServiceTest('subscribes with streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('stream', { foo: 'a' }).subscribe(); + const b = await database.syncStream('stream', { foo: 'b' }).subscribe({ priority: 1 }); + onTestFinished(() => { + a.unsubscribe(); + b.unsubscribe(); + }); + + await database.connect(new TestConnector(), defaultOptions); + + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [ + { + stream: 'stream', + parameters: { foo: 'a' }, + override_priority: null + }, + { + stream: 'stream', + parameters: { foo: 'b' }, + override_priority: 1 + } + ] + } + }); + + let statusPromise = nextStatus(database); + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [ + bucket('a', 0, { priority: 3, subscriptions: [{ sub: 0 }] }), + bucket('b', 0, { priority: 1, subscriptions: [{ sub: 1 }] }) + ], + streams: [stream('stream', false)] + }) + ); + let status = await statusPromise; + for (const subscription of [a, b]) { + expect(status.forStream(subscription).subscription.active).toBeTruthy(); + expect(status.forStream(subscription).subscription.lastSyncedAt).toBeNull(); + expect(status.forStream(subscription).subscription.hasExplicitSubscription).toBeTruthy(); + } + + statusPromise = nextStatus(database); + syncService.pushLine({ partial_checkpoint_complete: { last_op_id: '0', priority: 1 } }); + status = await statusPromise; + expect(status.forStream(a).subscription.lastSyncedAt).toBeNull(); + expect(status.forStream(b).subscription.lastSyncedAt).not.toBeNull(); + await b.waitForFirstSync(); + + syncService.pushLine({ checkpoint_complete: { last_op_id: '0' } }); + await a.waitForFirstSync(); + }); + + mockSyncServiceTest('reports default streams', async ({ syncService }) => { + const database = await syncService.createDatabase(); + await database.connect(new TestConnector(), defaultOptions); + + let statusPromise = nextStatus(database); + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [], + streams: [stream('default_stream', true)] + }) + ); + let status = await statusPromise; + + expect(status.syncStreams).toHaveLength(1); + expect(status.syncStreams[0]).toMatchObject({ + subscription: { + name: 'default_stream', + parameters: null, + isDefault: true, + hasExplicitSubscription: false + } + }); + }); + + mockSyncServiceTest('changes subscriptions dynamically', async ({ syncService }) => { + const database = await syncService.createDatabase(); + await database.connect(new TestConnector(), defaultOptions); + + syncService.pushLine( + checkpoint({ + last_op_id: 0, + buckets: [] + }) + ); + const subscription = await database.syncStream('a').subscribe(); + + await vi.waitFor(() => + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [ + { + stream: 'a', + parameters: null, + override_priority: null + } + ] + } + }) + ); + + // Given that the subscription has a TTL, dropping the handle should not re-subscribe. + subscription.unsubscribe(); + await new Promise((r) => setTimeout(r, 100)); + expect(syncService.connectedListeners[0].streams.subscriptions).toHaveLength(1); + }); + + mockSyncServiceTest('subscriptions update while offline', async ({ syncService }) => { + const database = await syncService.createDatabase(); + + let statusPromise = nextStatus(database); + const subscription = await database.syncStream('foo').subscribe(); + let status = await statusPromise; + expect(status.forStream(subscription)).not.toBeNull(); + }); + + mockSyncServiceTest('unsubscribing multiple times has no effect', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('a').subscribe(); + const aAgain = await database.syncStream('a').subscribe(); + a.unsubscribe(); + a.unsubscribe(); + + // Pretend the streams are expired - they should still be requested because the core extension extends the lifetime + // of streams currently referenced before connecting. + await database.execute('UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000'); + await database.connect(new TestConnector(), defaultOptions); + + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [{}] + } + }); + aAgain.unsubscribe(); + }); + + mockSyncServiceTest('unsubscribeAll', async ({ syncService }) => { + const database = await syncService.createDatabase(); + const a = await database.syncStream('a').subscribe(); + database.syncStream('a').unsubscribeAll(); + + await database.connect(new TestConnector(), defaultOptions); + expect(syncService.connectedListeners[0]).toMatchObject({ + streams: { + include_defaults: true, + subscriptions: [] + } + }); + a.unsubscribe(); + }); +}); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index c63163a6f..6d615485d 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -1,7 +1,7 @@ import { describe, vi, expect, beforeEach } from 'vitest'; import util from 'node:util'; -import { MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils'; +import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils'; import { AbstractPowerSyncDatabase, BucketChecksum, @@ -15,8 +15,6 @@ import { } from '@powersync/common'; import Logger from 'js-logger'; -Logger.useDefaults({ defaultLevel: Logger.WARN }); - describe('Sync', () => { describe('js client', () => { defineSyncTests(SyncClientImplementation.JAVASCRIPT); @@ -925,15 +923,6 @@ function defineSyncTests(impl: SyncClientImplementation) { }); } -function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { - return { - bucket: name, - count, - checksum: 0, - priority: options.priority - }; -} - async function waitForProgress( database: AbstractPowerSyncDatabase, total: [number, number], diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index ab10c9fa2..36021726b 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -6,16 +6,20 @@ import { ReadableStream, TransformStream } from 'node:stream/web'; import { onTestFinished, test } from 'vitest'; import { AbstractPowerSyncDatabase, + BucketChecksum, column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, PowerSyncCredentials, PowerSyncDatabase, Schema, + StreamingSyncCheckpoint, StreamingSyncLine, SyncStatus, Table } from '../lib'; +import { createLogger } from '@powersync/common'; +import Logger from 'js-logger'; export async function createTempDir() { const ostmpdir = os.tmpdir(); @@ -50,16 +54,25 @@ export const tempDirectoryTest = test.extend<{ tmpdir: string }>({ } }); -async function createDatabase( +export async function createDatabase( tmpdir: string, options: Partial = {} ): Promise { + const defaultLogger = createLogger('PowerSyncTest', { logLevel: Logger.TRACE }); + (defaultLogger as any).invoke = (_, args) => { + console.log(...args); + }; + const database = new PowerSyncDatabase({ schema: AppSchema, database: { dbFilename: 'test.db', - dbLocation: tmpdir + dbLocation: tmpdir, + // Using a single read worker (instead of multiple, the default) seems to improve the reliability of tests in GH + // actions. So far, we've not been able to reproduce these failures locally. + readWorkerCount: 1 }, + logger: defaultLogger, ...options }); await database.init(); @@ -196,3 +209,45 @@ export function waitForSyncStatus( }); }); } + +export function checkpoint(options: { last_op_id: number; buckets?: any[]; streams?: any[] }): StreamingSyncCheckpoint { + return { + checkpoint: { + last_op_id: `${options.last_op_id}`, + buckets: options.buckets ?? [], + write_checkpoint: null, + streams: options.streams ?? [] + } + }; +} + +export function bucket( + name: string, + count: number, + options: { priority: number; subscriptions?: any } = { priority: 3 } +): BucketChecksum { + return { + bucket: name, + count, + checksum: 0, + priority: options.priority, + subscriptions: options.subscriptions + }; +} + +export function stream(name: string, isDefault: boolean, errors = []) { + return { name, is_default: isDefault, errors }; +} + +export function nextStatus(db: PowerSyncDatabase): Promise { + return new Promise((resolve) => { + let l; + + l = db.registerListener({ + statusChanged(status) { + resolve(status); + l(); + } + }); + }); +} diff --git a/packages/node/vitest.config.ts b/packages/node/vitest.config.ts index 94ec60f24..def711060 100644 --- a/packages/node/vitest.config.ts +++ b/packages/node/vitest.config.ts @@ -1,4 +1,8 @@ import { defineConfig } from 'vitest/config'; // We need to define an empty config to be part of the vitest works -export default defineConfig({}); +export default defineConfig({ + test: { + silent: false + } +}); diff --git a/packages/react-native/src/db/PowerSyncDatabase.ts b/packages/react-native/src/db/PowerSyncDatabase.ts index ddb513694..6b233c9a6 100644 --- a/packages/react-native/src/db/PowerSyncDatabase.ts +++ b/packages/react-native/src/db/PowerSyncDatabase.ts @@ -49,14 +49,13 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { const remote = new ReactNativeRemote(connector, this.logger); return new ReactNativeStreamingSyncImplementation({ + ...options, adapter: this.bucketStorageAdapter, remote, uploadCrud: async () => { await this.waitForReady(); await connector.uploadData(this); }, - retryDelayMs: options.retryDelayMs, - crudUploadThrottleMs: options.crudUploadThrottleMs, identifier: this.database.name, logger: this.logger }); diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index f55346808..0e60de018 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -189,8 +189,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { const remote = new WebRemote(connector, this.logger); const syncOptions: WebStreamingSyncImplementationOptions = { ...(this.options as {}), - retryDelayMs: options.retryDelayMs, - crudUploadThrottleMs: options.crudUploadThrottleMs, + ...options, flags: this.resolvedFlags, adapter: this.bucketStorageAdapter, remote, diff --git a/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts index c09518f46..463a24308 100644 --- a/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts @@ -5,6 +5,7 @@ import { LockType, PowerSyncConnectionOptions, StreamingSyncImplementation, + SubscribedStream, SyncStatus, SyncStatusOptions } from '@powersync/common'; @@ -80,4 +81,9 @@ export class SSRStreamingSyncImplementation extends BaseObserver implements Stre * This is a no-op in SSR mode. */ triggerCrudUpload() {} + + /** + * No-op in SSR mode. + */ + updateSubscriptions(): void {} } diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 1fe0177a7..66d3b51ed 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -1,17 +1,21 @@ -import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common'; +import { + PowerSyncConnectionOptions, + PowerSyncCredentials, + SubscribedStream, + SyncStatus, + SyncStatusOptions +} from '@powersync/common'; import * as Comlink from 'comlink'; import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; -import { - ManualSharedSyncPayload, - SharedSyncClientEvent, - SharedSyncImplementation -} from '../../worker/sync/SharedSyncImplementation'; +import { ManualSharedSyncPayload, SharedSyncClientEvent } from '../../worker/sync/SharedSyncImplementation'; import { DEFAULT_CACHE_SIZE_KB, resolveWebSQLFlags, TemporaryStorageOption } from '../adapters/web-sql-flags'; import { WebDBAdapter } from '../adapters/WebDBAdapter'; import { WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions } from './WebStreamingSyncImplementation'; +import { WorkerClient } from '../../worker/sync/WorkerClient'; +import { getNavigatorLocks } from '../../shared/navigator'; /** * The shared worker will trigger methods on this side of the message port @@ -94,13 +98,17 @@ export interface SharedWebStreamingSyncImplementationOptions extends WebStreamin db: WebDBAdapter; } +/** + * The local part of the sync implementation on the web, which talks to a sync implementation hosted in a shared worker. + */ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplementation { - protected syncManager: Comlink.Remote; + protected syncManager: Comlink.Remote; protected clientProvider: SharedSyncClientProvider; protected messagePort: MessagePort; protected isInitialized: Promise; protected dbAdapter: WebDBAdapter; + private abortOnClose = new AbortController(); constructor(options: SharedWebStreamingSyncImplementationOptions) { super(options); @@ -138,7 +146,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem ).port; } - this.syncManager = Comlink.wrap(this.messagePort); + this.syncManager = Comlink.wrap(this.messagePort); this.syncManager.setLogLevel(this.logger.getLevel()); this.triggerCrudUpload = this.syncManager.triggerCrudUpload; @@ -152,15 +160,18 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options; const flags = { ...this.webOptions.flags, workers: undefined }; - this.isInitialized = this.syncManager.setParams({ - dbParams: this.dbAdapter.getConfiguration(), - streamOptions: { - crudUploadThrottleMs, - identifier, - retryDelayMs, - flags: flags - } - }); + this.isInitialized = this.syncManager.setParams( + { + dbParams: this.dbAdapter.getConfiguration(), + streamOptions: { + crudUploadThrottleMs, + identifier, + retryDelayMs, + flags: flags + } + }, + options.subscriptions + ); /** * Pass along any sync status updates to this listener @@ -179,6 +190,19 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem * This performs bi-directional method calling. */ Comlink.expose(this.clientProvider, this.messagePort); + + // Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which + // will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker + // to free resources associated with this tab. + getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => { + if (!this.abortOnClose.signal.aborted) { + this.syncManager.addLockBasedCloseSignal(lock!.name); + + await new Promise((r) => { + this.abortOnClose.signal.onabort = () => r(); + }); + } + }); } /** @@ -225,6 +249,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem }; this.messagePort.postMessage(closeMessagePayload); }); + this.abortOnClose.abort(); // Release the proxy this.syncManager[Comlink.releaseProxy](); @@ -235,11 +260,15 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem return this.isInitialized; } + updateSubscriptions(subscriptions: SubscribedStream[]): void { + this.syncManager.updateSubscriptions(subscriptions); + } + /** * Used in tests to force a connection states */ private async _testUpdateStatus(status: SyncStatus) { await this.isInitialized; - return (this.syncManager as any)['_testUpdateAllStatuses'](status.toJSON()); + return this.syncManager._testUpdateAllStatuses(status.toJSON()); } } diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index c85947439..7db98c3d0 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -12,6 +12,7 @@ import { DBAdapter, PowerSyncBackendConnector, SqliteBucketStorage, + SubscribedStream, SyncStatus } from '@powersync/common'; import { Mutex } from 'async-mutex'; @@ -55,7 +56,7 @@ export type ManualSharedSyncPayload = { * @internal */ export type SharedSyncInitOptions = { - streamOptions: Omit; + streamOptions: Omit; dbParams: ResolvedWebSQLOpenOptions; }; @@ -73,6 +74,7 @@ export type WrappedSyncPort = { port: MessagePort; clientProvider: Comlink.Remote; db?: DBAdapter; + currentSubscriptions: SubscribedStream[]; }; /** @@ -94,10 +96,7 @@ const CONNECTOR_PLACEHOLDER = {} as PowerSyncBackendConnector; * @internal * Shared sync implementation which runs inside a shared webworker */ -export class SharedSyncImplementation - extends BaseObserver - implements StreamingSyncImplementation -{ +export class SharedSyncImplementation extends BaseObserver { protected ports: WrappedSyncPort[]; protected isInitialized: Promise; @@ -111,6 +110,7 @@ export class SharedSyncImplementation protected logger: ILogger; protected lastConnectOptions: PowerSyncConnectionOptions | undefined; protected portMutex: Mutex; + private subscriptions: SubscribedStream[] = []; protected connectionManager: ConnectionManager; syncStatus: SyncStatus; @@ -186,6 +186,25 @@ export class SharedSyncImplementation return this.isInitialized; } + private collectActiveSubscriptions() { + this.logger.debug('Collecting active stream subscriptions across tabs'); + const active = new Map(); + for (const port of this.ports) { + for (const stream of port.currentSubscriptions) { + const serializedKey = JSON.stringify(stream); + active.set(serializedKey, stream); + } + } + this.subscriptions = [...active.values()]; + this.logger.debug('Collected stream subscriptions', this.subscriptions); + this.connectionManager.syncStreamImplementation?.updateSubscriptions(this.subscriptions); + } + + updateSubscriptions(port: WrappedSyncPort, subscriptions: SubscribedStream[]) { + port.currentSubscriptions = subscriptions; + this.collectActiveSubscriptions(); + } + setLogLevel(level: ILogLevel) { this.logger.setLevel(level); this.broadCastLogger.setLevel(level); @@ -196,6 +215,7 @@ export class SharedSyncImplementation */ async setParams(params: SharedSyncInitOptions) { await this.portMutex.runExclusive(async () => { + this.collectActiveSubscriptions(); if (this.syncParams) { // Cannot modify already existing sync implementation params // But we can ask for a DB adapter, if required, at this point. @@ -250,11 +270,12 @@ export class SharedSyncImplementation * Adds a new client tab's message port to the list of connected ports */ async addPort(port: MessagePort) { - await this.portMutex.runExclusive(() => { + return await this.portMutex.runExclusive(() => { const portProvider = { port, - clientProvider: Comlink.wrap(port) - }; + clientProvider: Comlink.wrap(port), + currentSubscriptions: [] + } satisfies WrappedSyncPort; this.ports.push(portProvider); // Give the newly connected client the latest status @@ -262,6 +283,8 @@ export class SharedSyncImplementation if (status) { portProvider.clientProvider.statusChanged(status.toJSON()); } + + return portProvider; }); } @@ -269,12 +292,12 @@ export class SharedSyncImplementation * Removes a message port client from this manager's managed * clients. */ - async removePort(port: MessagePort) { + async removePort(port: WrappedSyncPort) { // Remove the port within a mutex context. // Warns if the port is not found. This should not happen in practice. // We return early if the port is not found. const { trackedPort, shouldReconnect } = await this.portMutex.runExclusive(async () => { - const index = this.ports.findIndex((p) => p.port == port); + const index = this.ports.findIndex((p) => p == port); if (index < 0) { this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`); return {}; @@ -289,7 +312,7 @@ export class SharedSyncImplementation * not resolve. Abort them here. */ [this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => { - if (abortController?.activePort.port == port) { + if (abortController?.activePort == port) { abortController!.controller.abort( new AbortOperation('Closing pending requests after client port is removed') ); @@ -324,6 +347,10 @@ export class SharedSyncImplementation if (trackedPort.db) { await trackedPort.db.close(); } + + // Re-index subscriptions, the subscriptions of the removed port would no longer be considered. + this.collectActiveSubscriptions(); + // Release proxy return () => trackedPort.clientProvider[Comlink.releaseProxy](); } @@ -427,6 +454,7 @@ export class SharedSyncImplementation }); }, ...syncParams.streamOptions, + subscriptions: this.subscriptions, // Logger cannot be transferred just yet logger: this.logger }); @@ -470,7 +498,7 @@ export class SharedSyncImplementation * A function only used for unit tests which updates the internal * sync stream client and all tab client's sync status */ - private async _testUpdateAllStatuses(status: SyncStatusOptions) { + async _testUpdateAllStatuses(status: SyncStatusOptions) { if (!this.connectionManager.syncStreamImplementation) { throw new Error('Cannot update status without a sync stream implementation'); } diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts index 11a14fd9b..157d7c1cb 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts @@ -1,10 +1,6 @@ import { createBaseLogger } from '@powersync/common'; -import * as Comlink from 'comlink'; -import { - SharedSyncClientEvent, - SharedSyncImplementation, - type ManualSharedSyncPayload -} from './SharedSyncImplementation'; +import { SharedSyncImplementation } from './SharedSyncImplementation'; +import { WorkerClient } from './WorkerClient'; const _self: SharedWorkerGlobalScope = self as any; const logger = createBaseLogger(); @@ -14,23 +10,5 @@ const sharedSyncImplementation = new SharedSyncImplementation(); _self.onconnect = async function (event: MessageEvent) { const port = event.ports[0]; - - /** - * Adds an extra listener which can remove this port - * from the list of monitored ports. - */ - port.addEventListener('message', async (event) => { - const payload = event.data as ManualSharedSyncPayload; - if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { - const release = await sharedSyncImplementation.removePort(port); - port.postMessage({ - event: SharedSyncClientEvent.CLOSE_ACK, - data: {} - } satisfies ManualSharedSyncPayload); - release?.(); - } - }); - - await sharedSyncImplementation.addPort(port); - Comlink.expose(sharedSyncImplementation, port); + await new WorkerClient(sharedSyncImplementation, port).initialize(); }; diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts new file mode 100644 index 000000000..22c44f4ba --- /dev/null +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -0,0 +1,104 @@ +import * as Comlink from 'comlink'; +import { + ManualSharedSyncPayload, + SharedSyncClientEvent, + SharedSyncImplementation, + SharedSyncInitOptions, + WrappedSyncPort +} from './SharedSyncImplementation'; +import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common'; +import { getNavigatorLocks } from '../../shared/navigator'; + +/** + * A client to the shared sync worker. + * + * The shared sync implementation needs a per-client view of subscriptions so that subscriptions of closed tabs can + * automatically be evicted later. + */ +export class WorkerClient { + private resolvedPort: WrappedSyncPort | null = null; + + constructor( + private readonly sync: SharedSyncImplementation, + private readonly port: MessagePort + ) {} + + async initialize() { + /** + * Adds an extra listener which can remove this port + * from the list of monitored ports. + */ + this.port.addEventListener('message', async (event) => { + const payload = event.data as ManualSharedSyncPayload; + if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { + await this.removePort(); + } + }); + + this.resolvedPort = await this.sync.addPort(this.port); + Comlink.expose(this, this.port); + } + + private async removePort() { + if (this.resolvedPort) { + const release = await this.sync.removePort(this.resolvedPort); + this.resolvedPort = null; + this.port.postMessage({ + event: SharedSyncClientEvent.CLOSE_ACK, + data: {} + } satisfies ManualSharedSyncPayload); + release?.(); + } + } + + /** + * Called by a client after obtaining a lock with a random name. + * + * When the client tab is closed, its lock will be returned. So when the shared worker attempts to acquire the lock, + * it can consider the connection to be closed. + */ + addLockBasedCloseSignal(name: string) { + getNavigatorLocks().request(name, async () => { + await this.removePort(); + }); + } + + setLogLevel(level: ILogLevel) { + this.sync.setLogLevel(level); + } + + triggerCrudUpload() { + return this.sync.triggerCrudUpload(); + } + + setParams(params: SharedSyncInitOptions, subscriptions: SubscribedStream[]) { + this.resolvedPort!.currentSubscriptions = subscriptions; + return this.sync.setParams(params); + } + + getWriteCheckpoint() { + return this.sync.getWriteCheckpoint(); + } + + hasCompletedSync() { + return this.sync.hasCompletedSync(); + } + + connect(options?: PowerSyncConnectionOptions) { + return this.sync.connect(options); + } + + updateSubscriptions(subscriptions: SubscribedStream[]) { + if (this.resolvedPort) { + this.sync.updateSubscriptions(this.resolvedPort, subscriptions); + } + } + + disconnect() { + return this.sync.disconnect(); + } + + async _testUpdateAllStatuses(status: SyncStatusOptions) { + return this.sync._testUpdateAllStatuses(status); + } +} diff --git a/packages/web/tests/multiple_instances.test.ts b/packages/web/tests/multiple_instances.test.ts index 739aaf4ca..80b39e419 100644 --- a/packages/web/tests/multiple_instances.test.ts +++ b/packages/web/tests/multiple_instances.test.ts @@ -2,6 +2,8 @@ import { AbstractPowerSyncDatabase, createBaseLogger, createLogger, + DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + DEFAULT_STREAMING_SYNC_OPTIONS, SqliteBucketStorage, SyncStatus } from '@powersync/common'; @@ -131,8 +133,10 @@ describe('Multiple Instances', { sequential: true }, () => { await connector1.uploadData(db); }, identifier, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, retryDelayMs: 90_000, // Large delay to allow for testing - db: db.database as WebDBAdapter + db: db.database as WebDBAdapter, + subscriptions: [] }; const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1); @@ -146,7 +150,10 @@ describe('Multiple Instances', { sequential: true }, () => { await connector2.uploadData(db); }, identifier, - db: db.database as WebDBAdapter + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + retryDelayMs: 90_000, // Large delay to allow for testing + db: db.database as WebDBAdapter, + subscriptions: [] }; const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2); @@ -195,6 +202,8 @@ describe('Multiple Instances', { sequential: true }, () => { identifier, // The large delay here allows us to test between connection retries retryDelayMs: 90_000, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + subscriptions: [], flags: { broadcastLogs: true } diff --git a/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts index cea166a21..0173fe154 100644 --- a/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts +++ b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts @@ -33,7 +33,16 @@ class TestPowerSyncDatabase extends AbstractPowerSyncDatabase { get database() { return { - get: vi.fn().mockResolvedValue({ version: '0.4.5' }), + get: vi.fn().mockResolvedValue({ + version: '0.4.5', + r: JSON.stringify({ + connected: false, + connecting: false, + priority_status: [], + downloading: null, + streams: [] + }) + }), getAll: vi.fn().mockResolvedValue([]), execute: vi.fn(), refreshSchema: vi.fn() diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index f12bd186c..c08c09302 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -4,6 +4,7 @@ import { AbstractStreamingSyncImplementation, BSONImplementation, DataStream, + DEFAULT_CRUD_UPLOAD_THROTTLE_MS, PowerSyncBackendConnector, PowerSyncCredentials, PowerSyncDatabaseOptions, @@ -169,7 +170,9 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { await connector.uploadData(this); }, identifier: this.database.name, - retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster + retryDelayMs: this.options.crudUploadThrottleMs ?? 0, // The zero here makes tests faster + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + subscriptions: [] }); } } diff --git a/tools/diagnostics-app/src/app/views/layout.tsx b/tools/diagnostics-app/src/app/views/layout.tsx index 51e0526cf..9ba207511 100644 --- a/tools/diagnostics-app/src/app/views/layout.tsx +++ b/tools/diagnostics-app/src/app/views/layout.tsx @@ -25,7 +25,7 @@ import { Typography, styled } from '@mui/material'; -import React from 'react'; +import React, { useMemo } from 'react'; import { CLIENT_PARAMETERS_ROUTE, @@ -35,7 +35,7 @@ import { SYNC_DIAGNOSTICS_ROUTE } from '@/app/router'; import { useNavigationPanel } from '@/components/navigation/NavigationPanelContext'; -import { signOut, sync } from '@/library/powersync/ConnectionManager'; +import { signOut, useSyncStatus } from '@/library/powersync/ConnectionManager'; import { usePowerSync } from '@powersync/react'; import { useNavigate } from 'react-router-dom'; @@ -43,8 +43,8 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) const powerSync = usePowerSync(); const navigate = useNavigate(); - const [syncStatus, setSyncStatus] = React.useState(sync?.syncStatus); - const [syncError, setSyncError] = React.useState(null); + const syncStatus = useSyncStatus(); + const syncError = useMemo(() => syncStatus?.dataFlowStatus?.downloadError, [syncStatus]); const { title } = useNavigationPanel(); const [mobileOpen, setMobileOpen] = React.useState(false); @@ -99,17 +99,6 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) [powerSync] ); - // Cannot use `useStatus()`, since we're not using the default sync implementation. - React.useEffect(() => { - const l = sync?.registerListener({ - statusChanged: (status) => { - setSyncStatus(status); - setSyncError(status.dataFlowStatus.downloadError ?? null); - } - }); - return () => l?.(); - }, []); - const drawerWidth = 320; const drawer = ( diff --git a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx index abe82d191..c9853f3db 100644 --- a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx +++ b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx @@ -1,5 +1,6 @@ import { NavigationPage } from '@/components/navigation/NavigationPage'; -import { clearData, db, sync } from '@/library/powersync/ConnectionManager'; +import { NewStreamSubscription } from '@/components/widgets/NewStreamSubscription'; +import { clearData, db, sync, useSyncStatus } from '@/library/powersync/ConnectionManager'; import { Box, Button, @@ -15,7 +16,8 @@ import { styled } from '@mui/material'; import { DataGrid, GridColDef } from '@mui/x-data-grid'; -import React from 'react'; +import { SyncStreamStatus } from '@powersync/web'; +import React, { useState } from 'react'; const BUCKETS_QUERY = ` WITH @@ -295,11 +297,78 @@ export default function SyncDiagnosticsPage() { {bucketRowsLoading ? : bucketsTable} + ); } +function StreamsState() { + const syncStreams = useSyncStatus()?.syncStreams; + const [dialogOpen, setDialogOpen] = useState(false); + const syncStreamsLoading = syncStreams == null; + + return ( + + + Sync stream subscriptions + + {syncStreamsLoading ? : } + setDialogOpen(false)} /> + + + ); +} + +function StreamsGrid(props: { streams: SyncStreamStatus[] }) { + const columns: GridColDef[] = [ + { field: 'name', headerName: 'Stream name', flex: 2 }, + { field: 'parameters', headerName: 'Parameters', flex: 3, type: 'text' }, + { field: 'default', headerName: 'Default', flex: 1, type: 'boolean' }, + { field: 'active', headerName: 'Active', flex: 1, type: 'boolean' }, + { field: 'has_explicit_subscription', headerName: 'Explicit', flex: 1, type: 'boolean' }, + { field: 'priority', headerName: 'Priority', flex: 1, type: 'number' }, + { field: 'last_synced_at', headerName: 'Last synced at', flex: 2, type: 'dateTime' }, + { field: 'expires', headerName: 'Eviction time', flex: 2, type: 'dateTime' } + ]; + + const rows = props.streams.map((stream) => { + const name = stream.subscription.name; + const parameters = JSON.stringify(stream.subscription.parameters); + + return { + id: `${name}-${parameters}`, + name, + parameters, + default: stream.subscription.isDefault, + has_explicit_subscription: stream.subscription.hasExplicitSubscription, + active: stream.subscription.active, + last_synced_at: stream.subscription.lastSyncedAt, + expires: stream.subscription.expiresAt, + priority: stream.priority + }; + }); + + return ( + + ); +} + namespace S { export const MainPaper = styled(Paper)` margin-bottom: 10px; diff --git a/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx b/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx new file mode 100644 index 000000000..3809f662c --- /dev/null +++ b/tools/diagnostics-app/src/components/widgets/NewStreamSubscription.tsx @@ -0,0 +1,124 @@ +import { activeSubscriptions, db } from '@/library/powersync/ConnectionManager'; +import Button from '@mui/material/Button'; +import Dialog from '@mui/material/Dialog'; +import DialogActions from '@mui/material/DialogActions'; +import DialogContent from '@mui/material/DialogContent'; +import DialogContentText from '@mui/material/DialogContentText'; +import DialogTitle from '@mui/material/DialogTitle'; +import FormControl from '@mui/material/FormControl'; +import InputLabel from '@mui/material/InputLabel'; +import MenuItem from '@mui/material/MenuItem'; +import Select from '@mui/material/Select'; +import Stack from '@mui/material/Stack'; +import TextField from '@mui/material/TextField'; +import { Formik, FormikErrors } from 'formik'; + +interface NewStreamSubscriptionValues { + stream: string; + parameters: string; + override_priority: 0 | 1 | 2 | 3 | null; +} + +export function NewStreamSubscription(props: { open: boolean; close: () => void }) { + const { open, close } = props; + + const validate = (values: NewStreamSubscriptionValues) => { + const errors: FormikErrors = {}; + + if (values.stream.length == 0) { + errors.stream = 'Stream is required'; + } + + if (values.parameters.length) { + try { + JSON.parse(values.parameters); + } catch (e) { + errors.parameters = 'Must be empty or a JSON object'; + } + } + + return errors; + }; + + const addSubscription = async (values: NewStreamSubscriptionValues) => { + const parameters = values.parameters == '' ? null : JSON.parse(values.parameters); + + const subscription = await db + .syncStream(values.stream, parameters) + .subscribe({ priority: values.override_priority ?? undefined }); + + // We need to store subscriptions globally, because they have a finalizer set on them that would eventually clear + // them otherwise. + activeSubscriptions.push(subscription); + close(); + }; + + return ( + + initialValues={{ stream: '', parameters: '', override_priority: null }} + validateOnChange={true} + onSubmit={addSubscription} + validate={validate}> + {({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit }) => ( + + Subscribe to sync stream + +
+ + Enter stream name and parameters (as a JSON object or an empty string for null) to subscribe to a + stream. + + + + + + + Override priority + + + + + + +
+
+ + + + +
+ )} + + ); +} diff --git a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts index 661c4b09b..275ff757d 100644 --- a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts +++ b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts @@ -1,9 +1,11 @@ import { BaseListener, createBaseLogger, + DEFAULT_STREAMING_SYNC_OPTIONS, LogLevel, PowerSyncDatabase, SyncClientImplementation, + SyncStreamSubscription, TemporaryStorageOption, WASQLiteOpenFactory, WASQLiteVFS, @@ -16,6 +18,7 @@ import { DynamicSchemaManager } from './DynamicSchemaManager'; import { RecordingStorageAdapter } from './RecordingStorageAdapter'; import { TokenConnector } from './TokenConnector'; import { RustClientInterceptor } from './RustClientInterceptor'; +import React from 'react'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); @@ -45,8 +48,7 @@ export const db = new PowerSyncDatabase({ }); export const connector = new TokenConnector(); - -const adapter = new RecordingStorageAdapter(db.database, schemaManager); +export const activeSubscriptions: SyncStreamSubscription[] = []; export let sync: WebStreamingSyncImplementation | undefined; @@ -59,6 +61,7 @@ if (connector.hasCredentials()) { } export async function connect() { + activeSubscriptions.length = 0; const client = localStorage.getItem('preferred_client_implementation') == SyncClientImplementation.RUST ? SyncClientImplementation.RUST @@ -78,7 +81,9 @@ export async function connect() { uploadCrud: async () => { // No-op }, - identifier: 'diagnostics' + identifier: 'diagnostics', + ...DEFAULT_STREAMING_SYNC_OPTIONS, + subscriptions: [] }; sync = new WebStreamingSyncImplementation(syncOptions); await sync.connect({ params, clientImplementation: client }); @@ -118,4 +123,21 @@ export const setParams = (p: object) => { connect(); }; +/** + * The current sync status - we can't use `useStatus()` since we're not using the default sync implementation. + */ +export function useSyncStatus() { + const [current, setCurrent] = React.useState(sync?.syncStatus); + React.useEffect(() => { + const l = sync?.registerListener({ + statusChanged: (status) => { + setCurrent(status); + } + }); + return () => l?.(); + }, []); + + return current; +} + (window as any).db = db;