Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/angry-ducks-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/web': minor
---

Add alpha support for sync streams, allowing different sets of data to be synced dynamically.
76 changes: 46 additions & 30 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import {
UpdateNotification,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
import { ConnectionManager } from './ConnectionManager.js';
import {
ConnectionManager,
CreateSyncImplementationOptions,
InternalSubscriptionAdapter
} from './ConnectionManager.js';
import { CustomQuery } from './CustomQuery.js';
import { ArrayQueryDefinition, Query } from './Query.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
Expand All @@ -40,6 +43,8 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
import { WatchedQueryComparator } from './watched/processors/comparators.js';
import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js';
import { SyncStream } from './sync/sync-streams.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -182,6 +187,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected bucketStorageAdapter: BucketStorageAdapter;
protected _isReadyPromise: Promise<void>;
protected connectionManager: ConnectionManager;
private subscriptions: InternalSubscriptionAdapter;

get syncStreamImplementation() {
return this.connectionManager.syncStreamImplementation;
Expand Down Expand Up @@ -236,10 +242,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.runExclusiveMutex = new Mutex();

// Start async init
this.subscriptions = {
firstStatusMatching: (predicate, abort) => this.waitForStatus(predicate, abort),
resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(),
rustSubscriptionsCommand: async (payload) => {
await this.writeTransaction((tx) => {
return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]);
});
}
};
this.connectionManager = new ConnectionManager({
createSyncImplementation: async (connector, options) => {
await this.waitForReady();

return this.runExclusive(async () => {
const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options));
const onDispose = sync.registerListener({
Expand Down Expand Up @@ -304,7 +318,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

protected abstract generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
options: CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions
): StreamingSyncImplementation;

protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
Expand Down Expand Up @@ -338,13 +352,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
? (status: SyncStatus) => status.hasSynced
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;

if (statusMatches(this.currentStatus)) {
return this.waitForStatus(statusMatches, signal);
}

private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
if (predicate(this.currentStatus)) {
return;
}

return new Promise((resolve) => {
const dispose = this.registerListener({
statusChanged: (status) => {
if (statusMatches(status)) {
if (predicate(status)) {
dispose();
resolve();
}
Expand Down Expand Up @@ -373,7 +392,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.bucketStorageAdapter.init();
await this._loadVersion();
await this.updateSchema(this.options.schema);
await this.updateHasSynced();
await this.resolveOfflineSyncStatus();
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
this.ready = true;
this.iterateListeners((cb) => cb.initialized?.());
Expand Down Expand Up @@ -403,30 +422,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}
}

protected async updateHasSynced() {
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
);
let lastCompleteSync: Date | undefined;
const priorityStatusEntries: SyncPriorityStatus[] = [];

for (const { priority, last_synced_at } of result) {
const parsedDate = new Date(last_synced_at + 'Z');
protected async resolveOfflineSyncStatus() {
const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r');
const parsed = JSON.parse(result.r) as CoreSyncStatus;

if (priority == FULL_SYNC_PRIORITY) {
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
} else {
priorityStatusEntries.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
}
}

const hasSynced = lastCompleteSync != null;
const updatedStatus = new SyncStatus({
...this.currentStatus.toJSON(),
hasSynced,
priorityStatusEntries,
lastSyncedAt: lastCompleteSync
...coreStatusToJs(parsed)
});

if (!updatedStatus.isEqual(this.currentStatus)) {
Expand Down Expand Up @@ -471,7 +473,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
protected resolvedConnectionOptions(
options: CreateSyncImplementationOptions
): CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions {
return {
...options,
retryDelayMs:
Expand Down Expand Up @@ -540,6 +544,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
}

/**
* Create a sync stream to query its status or to subscribe to it.
*
* @param name The name of the stream to subscribe to.
* @param params Optional parameters for the stream subscription.
* @returns A {@link SyncStream} instance that can be subscribed to.
* @experimental Sync streams are currently in alpha.
*/
syncStream(name: string, params?: Record<string, any>): SyncStream {
return this.connectionManager.stream(this.subscriptions, name, params ?? null);
}

/**
* Close the database, releasing resources.
*
Expand Down
162 changes: 158 additions & 4 deletions packages/common/src/client/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ import { ILogger } from 'js-logger';
import { BaseListener, BaseObserver } from '../utils/BaseObserver.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import {
AdditionalConnectionOptions,
InternalConnectionOptions,
StreamingSyncImplementation
StreamingSyncImplementation,
SubscribedStream
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import {
SyncStream,
SyncStreamDescription,
SyncStreamSubscribeOptions,
SyncStreamSubscription
} from './sync/sync-streams.js';
import { SyncStatus } from '../db/crud/SyncStatus.js';

/**
* @internal
Expand All @@ -18,14 +27,30 @@ export interface ConnectionManagerSyncImplementationResult {
onDispose: () => Promise<void> | void;
}

/**
* The subset of {@link AbstractStreamingSyncImplementationOptions} managed by the connection manager.
*
* @internal
*/
export interface CreateSyncImplementationOptions extends AdditionalConnectionOptions {
subscriptions: SubscribedStream[];
}

export interface InternalSubscriptionAdapter {
firstStatusMatching(predicate: (status: SyncStatus) => any, abort?: AbortSignal): Promise<void>;
resolveOfflineSyncStatus(): Promise<void>;
rustSubscriptionsCommand(payload: any): Promise<void>;
}

/**
* @internal
*/
export interface ConnectionManagerOptions {
createSyncImplementation(
connector: PowerSyncBackendConnector,
options: InternalConnectionOptions
options: CreateSyncImplementationOptions
): Promise<ConnectionManagerSyncImplementationResult>;

logger: ILogger;
}

Expand Down Expand Up @@ -76,6 +101,13 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
*/
protected syncDisposer: (() => Promise<void> | void) | null;

/**
* Subscriptions managed in this connection manager.
*
* On the web, these local subscriptions are merged across tabs by a shared worker.
*/
private locallyActiveSubscriptions = new Map<string, ActiveSubscription>();

constructor(protected options: ConnectionManagerOptions) {
super();
this.connectingPromise = null;
Expand All @@ -102,7 +134,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
// Update pending options to the latest values
this.pendingConnectionOptions = {
connector,
options: options ?? {}
options
};

// Disconnecting here provides aborting in progress connection attempts.
Expand Down Expand Up @@ -169,7 +201,11 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
appliedOptions = options;

this.pendingConnectionOptions = null;
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);

const { sync, onDispose } = await this.options.createSyncImplementation(connector, {
subscriptions: this.activeStreams,
...options
});
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
this.syncStreamImplementation = sync;
this.syncDisposer = onDispose;
Expand Down Expand Up @@ -236,4 +272,122 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
await sync?.dispose();
await disposer?.();
}

stream(adapter: InternalSubscriptionAdapter, name: string, parameters: Record<string, any> | null): SyncStream {
const desc = { name, parameters } satisfies SyncStreamDescription;

const waitForFirstSync = (abort?: AbortSignal) => {
return adapter.firstStatusMatching((s) => s.forStream(desc)?.subscription.hasSynced, abort);
};

return {
...desc,
subscribe: async (options?: SyncStreamSubscribeOptions) => {
// NOTE: We also run this command if a subscription already exists, because this increases the expiry date
// (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl).
await adapter.rustSubscriptionsCommand({
subscribe: {
stream: {
name,
params: parameters
},
ttl: options?.ttl,
priority: options?.priority
}
});

if (!this.syncStreamImplementation) {
// We're not connected. So, update the offline sync status to reflect the new subscription.
// (With an active iteration, the sync client would include it in its state).
await adapter.resolveOfflineSyncStatus();
}

const key = `${name}|${JSON.stringify(parameters)}`;
let subscription = this.locallyActiveSubscriptions.get(key);
if (subscription == null) {
const clearSubscription = () => {
this.locallyActiveSubscriptions.delete(key);
this.subscriptionsMayHaveChanged();
};

subscription = new ActiveSubscription(name, parameters, this.logger, waitForFirstSync, clearSubscription);
this.locallyActiveSubscriptions.set(key, subscription);
this.subscriptionsMayHaveChanged();
}

return new SyncStreamSubscriptionHandle(subscription);
},
unsubscribeAll: async () => {
await adapter.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } });
this.subscriptionsMayHaveChanged();
}
};
}

private get activeStreams() {
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
}

private subscriptionsMayHaveChanged() {
if (this.syncStreamImplementation) {
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
}
}
}

class ActiveSubscription {
refcount: number = 0;

constructor(
readonly name: string,
readonly parameters: Record<string, any> | null,
readonly logger: ILogger,
readonly waitForFirstSync: (abort?: AbortSignal) => Promise<void>,
private clearSubscription: () => void
) {}

decrementRefCount() {
this.refcount--;
if (this.refcount == 0) {
this.clearSubscription();
}
}
}

class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
private active: boolean = true;

constructor(readonly subscription: ActiveSubscription) {
subscription.refcount++;
_finalizer?.register(this, subscription);
}

get name() {
return this.subscription.name;
}

get parameters() {
return this.subscription.parameters;
}

waitForFirstSync(abort?: AbortSignal): Promise<void> {
return this.subscription.waitForFirstSync(abort);
}

unsubscribe(): void {
if (this.active) {
this.active = false;
_finalizer?.unregister(this);
this.subscription.decrementRefCount();
}
}
}

const _finalizer =
'FinalizationRegistry' in globalThis
? new FinalizationRegistry<ActiveSubscription>((sub) => {
sub.logger.warn(
`A subscription to ${sub.name} with params ${JSON.stringify(sub.parameters)} leaked! Please ensure calling unsubscribe() when you don't need a subscription anymore. For global subscriptions, consider storing them in global fields to avoid this warning.`
);
})
: null;
Loading