Skip to content
6 changes: 6 additions & 0 deletions .changeset/slow-crews-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
---

Add `retryDelayMs` and `crudUploadThrottleMs` to `connect` so that the values can be dynamically changed upon reconnecting.
11 changes: 9 additions & 2 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected abstract openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter;

protected abstract generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options?: {
retryDelayMs?: number;
crudUploadThrottleMs?: number;
}
): StreamingSyncImplementation;

protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
Expand Down Expand Up @@ -388,7 +392,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('Cannot connect using a closed client');
}

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
crudUploadThrottleMs: options?.crudUploadThrottleMs,
retryDelayMs: options?.retryDelayMs
});
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,19 @@ export interface PowerSyncConnectionOptions {
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
*/
params?: Record<string, StreamingSyncRequestParameterType>;
/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
*/
retryDelayMs?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
}
export type PowerSyncConnectionRequiredOptions = Required<Omit<PowerSyncConnectionOptions, 'retryDelayMs' | 'crudUploadThrottleMs'>>;

export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
/**
Expand All @@ -102,14 +114,15 @@ export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncI
}

export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;
export const DEFAULT_RETRY_DELAY_MS = 5000;

export const DEFAULT_STREAMING_SYNC_OPTIONS = {
retryDelayMs: 5000,
retryDelayMs: DEFAULT_RETRY_DELAY_MS,
logger: Logger.get('PowerSyncStream'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

export const DEFAULT_STREAM_CONNECTION_OPTIONS: Required<PowerSyncConnectionOptions> = {
export const DEFAULT_STREAM_CONNECTION_OPTIONS: Required<Omit<PowerSyncConnectionOptions, 'retryDelayMs' | 'crudUploadThrottleMs'>> = {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
params: {}
};
Expand Down Expand Up @@ -427,7 +440,7 @@ The next upload iteration will be delayed.`);
type: LockType.SYNC,
signal,
callback: async () => {
const resolvedOptions: Required<PowerSyncConnectionOptions> = {
const resolvedOptions: Required<PowerSyncConnectionRequiredOptions> = {
...DEFAULT_STREAM_CONNECTION_OPTIONS,
...(options ?? {})
};
Expand Down
14 changes: 11 additions & 3 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,17 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
// This is used to pass in options on connection instead of only during database creation
options?: {
retryDelayMs: number;
crudUploadThrottleMs: number;
}
): AbstractStreamingSyncImplementation {
const remote = new ReactNativeRemote(connector);
// Use the options passed in during connect, or fallback to the options set during database creation
const retryDelayMs = options?.retryDelayMs || this.options.retryDelay;
const crudUploadThrottleMs = options?.crudUploadThrottleMs || this.options.crudUploadThrottleMs;

return new ReactNativeStreamingSyncImplementation({
adapter: this.bucketStorageAdapter,
Expand All @@ -53,8 +61,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay,
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
retryDelayMs,
crudUploadThrottleMs,
identifier: this.database.name
});
}
Expand Down
14 changes: 13 additions & 1 deletion packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,23 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return getNavigatorLocks().request(`lock-${this.database.name}`, cb);
}

protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector): StreamingSyncImplementation {
protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
// This is used to pass in options on connection instead of only during db creation
options?: {
retryDelayMs?: number;
crudUploadThrottleMs?: number;
}
): StreamingSyncImplementation {
const remote = new WebRemote(connector);
// Use the options passed in during connect, or fallback to the options set during database creation
const retryDelayMs = options?.retryDelayMs || this.options.retryDelay;
const crudUploadThrottleMs = options?.crudUploadThrottleMs || this.options.crudUploadThrottleMs;

const syncOptions: WebStreamingSyncImplementationOptions = {
...(this.options as {}),
retryDelayMs,
crudUploadThrottleMs,
flags: this.resolvedFlags,
adapter: this.bucketStorageAdapter,
remote,
Expand Down
100 changes: 100 additions & 0 deletions packages/web/tests/src/db/PowersyncDatabase.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { describe, it, expect, beforeEach, vi } from 'vitest'
import { PowerSyncDatabase, SharedWebStreamingSyncImplementation, WebStreamingSyncImplementation } from '../../../src'
import { SSRStreamingSyncImplementation } from '../../../src/db/sync/SSRWebStreamingSyncImplementation'
import { testSchema } from '../../utils/testDb'


vi.mock('../../../src/db/sync/WebStreamingSyncImplementation')
vi.mock('../../../src/db/sync/SharedWebStreamingSyncImplementation')
vi.mock('../../../src/db/sync/SSRWebStreamingSyncImplementation')

describe('PowerSyncDatabase - generateSyncStreamImplementation', () => {
const mockConnector = {
uploadData: vi.fn(),
fetchCredentials: vi.fn()
}

beforeEach(() => {
vi.resetAllMocks()
})

it('uses SSRStreamingSyncImplementation when ssrMode is true', async () => {
// This is to prevent a false positive from the unhandled rejection
// of using SSR in this test.
const handler = (event: PromiseRejectionEvent) => {
event.preventDefault()
}
window.addEventListener('unhandledrejection', handler)

const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: true,
},
retryDelay: 1000,
crudUploadThrottleMs: 2000
})

db['generateSyncStreamImplementation'](mockConnector)
expect(SSRStreamingSyncImplementation).toHaveBeenCalled()

await setTimeout(() => window.removeEventListener('unhandledrejection', handler), 1)
})

it('uses SharedWebStreamingSyncImplementation when enableMultiTabs is true', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
flags: { enableMultiTabs: true }
})
db['generateSyncStreamImplementation'](mockConnector)
expect(SharedWebStreamingSyncImplementation).toHaveBeenCalled()
})

it('handles option overrides', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: false,
enableMultiTabs: false,
},
crudUploadThrottleMs: 1000
})

db['generateSyncStreamImplementation'](mockConnector, { crudUploadThrottleMs: 20000, retryDelayMs: 50000 })
expect(WebStreamingSyncImplementation).toHaveBeenCalledWith(
expect.objectContaining({
retryDelayMs: 50000,
crudUploadThrottleMs: 20000
})
)
})

it('handles partial option overrides', () => {
const db = new PowerSyncDatabase({
schema: testSchema,
database: {
dbFilename: 'test.db'
},
flags: {
ssrMode: false,
enableMultiTabs: false,
},
retryDelay: 1000,
crudUploadThrottleMs: 2000
})

db['generateSyncStreamImplementation'](mockConnector, { retryDelayMs: 50000 })
expect(WebStreamingSyncImplementation).toHaveBeenCalledWith(
expect.objectContaining({
retryDelayMs: 50000,
})
)
})
})
Loading