Skip to content
6 changes: 6 additions & 0 deletions .changeset/slow-crews-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
---

Add `retryDelayMs` and `crudUploadThrottleMs` to `connect` so that the values can be dynamically changed upon reconnecting.
25 changes: 11 additions & 14 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js';
import { CrudTransaction } from './sync/bucket/CrudTransaction.js';
import {
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
PowerSyncConnectionOptions,
type AdditionalConnectionOptions,
type PowerSyncConnectionOptions,
StreamingSyncImplementation,
StreamingSyncImplementationListener
} from './sync/stream/AbstractStreamingSyncImplementation.js';
Expand All @@ -35,21 +36,13 @@ export interface DisconnectAndClearOptions {
clearLocal?: boolean;
}

export interface BasePowerSyncDatabaseOptions {
export interface BasePowerSyncDatabaseOptions extends AdditionalConnectionOptions {
/** Schema used for the local database. */
schema: Schema;

/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
* @deprecated Use {@link retryDelayMs} instead as this will be removed in future releases.
*/
retryDelay?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
logger?: ILogger;
}

Expand Down Expand Up @@ -129,7 +122,7 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelay: 5000,
retryDelayMs: 5000,
logger: Logger.get('PowerSyncDatabase'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};
Expand Down Expand Up @@ -243,7 +236,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected abstract openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter;

protected abstract generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options?: AdditionalConnectionOptions
): StreamingSyncImplementation;

protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
Expand Down Expand Up @@ -388,7 +382,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('Cannot connect using a closed client');
}

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
crudUploadThrottleMs: options?.crudUploadThrottleMs,
retryDelayMs: options?.retryDelayMs
});
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@ export interface LockOptions<T> {
signal?: AbortSignal;
}

export interface AbstractStreamingSyncImplementationOptions {
export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions {
adapter: BucketStorageAdapter;
uploadCrud: () => Promise<void>;
crudUploadThrottleMs?: number;
/**
* An identifier for which PowerSync DB this sync implementation is
* linked to. Most commonly DB name, but not restricted to DB name.
*/
identifier?: string;
logger?: ILogger;
remote: AbstractRemote;
retryDelayMs?: number;
}

export interface StreamingSyncImplementationListener extends BaseListener {
Expand All @@ -67,7 +65,10 @@ export interface StreamingSyncImplementationListener extends BaseListener {
* Configurable options to be used when connecting to the PowerSync
* backend instance.
*/
export interface PowerSyncConnectionOptions {
export interface PowerSyncConnectionOptions extends DefaultConnectionOptions, AdditionalConnectionOptions {}

/** @internal */
export interface DefaultConnectionOptions {
/**
* The connection method to use when streaming updates from
* the PowerSync backend instance.
Expand All @@ -81,6 +82,21 @@ export interface PowerSyncConnectionOptions {
params?: Record<string, StreamingSyncRequestParameterType>;
}

/** @internal */
export interface AdditionalConnectionOptions {
/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
*/
retryDelayMs?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
}

export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
/**
* Connects to the sync service
Expand All @@ -102,14 +118,17 @@ export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncI
}

export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;
export const DEFAULT_RETRY_DELAY_MS = 5000;

export const DEFAULT_STREAMING_SYNC_OPTIONS = {
retryDelayMs: 5000,
retryDelayMs: DEFAULT_RETRY_DELAY_MS,
logger: Logger.get('PowerSyncStream'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

export const DEFAULT_STREAM_CONNECTION_OPTIONS: Required<PowerSyncConnectionOptions> = {
export type RequiredPowerSyncConnectionOptions = Required<DefaultConnectionOptions>;

export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
params: {}
};
Expand Down Expand Up @@ -427,7 +446,7 @@ The next upload iteration will be delayed.`);
type: LockType.SYNC,
signal,
callback: async () => {
const resolvedOptions: Required<PowerSyncConnectionOptions> = {
const resolvedOptions: RequiredPowerSyncConnectionOptions = {
...DEFAULT_STREAM_CONNECTION_OPTIONS,
...(options ?? {})
};
Expand Down
12 changes: 9 additions & 3 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
AbstractPowerSyncDatabase,
AbstractStreamingSyncImplementation,
AdditionalConnectionOptions,
BucketStorageAdapter,
DBAdapter,
PowerSyncBackendConnector,
Expand Down Expand Up @@ -42,9 +43,14 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
// This is used to pass in options on connection instead of only during database creation
options?: AdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new ReactNativeRemote(connector);
// Use the options passed in during connect, or fallback to the options set during database creation
const retryDelayMs = options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay;
const crudUploadThrottleMs = options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs;

return new ReactNativeStreamingSyncImplementation({
adapter: this.bucketStorageAdapter,
Expand All @@ -53,8 +59,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay,
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
retryDelayMs,
crudUploadThrottleMs,
identifier: this.database.name
});
}
Expand Down
12 changes: 11 additions & 1 deletion packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
type PowerSyncBackendConnector,
type PowerSyncCloseOptions,
type PowerSyncConnectionOptions,
type AdditionalConnectionOptions,
AbstractPowerSyncDatabase,
DBAdapter,
DEFAULT_POWERSYNC_CLOSE_OPTIONS,
Expand Down Expand Up @@ -164,11 +165,20 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return getNavigatorLocks().request(`lock-${this.database.name}`, cb);
}

protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector): StreamingSyncImplementation {
protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
// This is used to pass in options on connection instead of only during db creation
options?: AdditionalConnectionOptions
): StreamingSyncImplementation {
const remote = new WebRemote(connector);
// Use the options passed in during connect, or fallback to the options set during database creation
const retryDelayMs = options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay;
const crudUploadThrottleMs = options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs;

const syncOptions: WebStreamingSyncImplementationOptions = {
...(this.options as {}),
retryDelayMs,
crudUploadThrottleMs,
flags: this.resolvedFlags,
adapter: this.bucketStorageAdapter,
remote,
Expand Down
122 changes: 122 additions & 0 deletions packages/web/tests/src/db/PowersyncDatabase.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { describe, it, expect, beforeEach, vi } from 'vitest'
import { PowerSyncDatabase, SharedWebStreamingSyncImplementation, WebStreamingSyncImplementation } from '../../../src'
import { SSRStreamingSyncImplementation } from '../../../src/db/sync/SSRWebStreamingSyncImplementation'
import { testSchema } from '../../utils/testDb'


vi.mock('../../../src/db/sync/WebStreamingSyncImplementation')
vi.mock('../../../src/db/sync/SharedWebStreamingSyncImplementation')
vi.mock('../../../src/db/sync/SSRWebStreamingSyncImplementation')

describe('PowerSyncDatabase - generateSyncStreamImplementation', () => {
const mockConnector = {
uploadData: vi.fn(),
fetchCredentials: vi.fn()
}

beforeEach(() => {
vi.resetAllMocks()
})

it('uses SSRStreamingSyncImplementation when ssrMode is true', async () => {
// This is to prevent a false positive from the unhandled rejection
// of using SSR in this test.
const handler = (event: PromiseRejectionEvent) => {
event.preventDefault()
}
window.addEventListener('unhandledrejection', handler)

const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: true,
},
retryDelayMs: 1000,
crudUploadThrottleMs: 2000
})

db['generateSyncStreamImplementation'](mockConnector)
expect(SSRStreamingSyncImplementation).toHaveBeenCalled()

await setTimeout(() => window.removeEventListener('unhandledrejection', handler), 1)
})

it('uses SharedWebStreamingSyncImplementation when enableMultiTabs is true', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
flags: { enableMultiTabs: true }
})
db['generateSyncStreamImplementation'](mockConnector)
expect(SharedWebStreamingSyncImplementation).toHaveBeenCalled()
})

it('handles option overrides', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: false,
enableMultiTabs: false,
},
crudUploadThrottleMs: 1000
})

db['generateSyncStreamImplementation'](mockConnector, { crudUploadThrottleMs: 20000, retryDelayMs: 50000 })
expect(WebStreamingSyncImplementation).toHaveBeenCalledWith(
expect.objectContaining({
retryDelayMs: 50000,
crudUploadThrottleMs: 20000
})
)
})

it('handles partial option overrides', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: false,
enableMultiTabs: false,
},
retryDelayMs: 1000,
crudUploadThrottleMs: 2000
})

db['generateSyncStreamImplementation'](mockConnector, { retryDelayMs: 50000 })
expect(WebStreamingSyncImplementation).toHaveBeenCalledWith(
expect.objectContaining({
retryDelayMs: 50000,
})
)
})

// This test can be removed once retryDelay is removed and entirely replaced with retryDelayMs
it('works when using deprecated retryDelay instead of retryDelayMs', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: false,
enableMultiTabs: false,
},
retryDelay: 11100,
})

db['generateSyncStreamImplementation'](mockConnector)
expect(WebStreamingSyncImplementation).toHaveBeenCalledWith(
expect.objectContaining({
retryDelayMs: 11100,
})
)
})
})
Loading