Skip to content

Commit 43c08b8

Browse files
committed
Added dispatcher override config option.
1 parent e2508ce commit 43c08b8

File tree

4 files changed

+52
-31
lines changed

4 files changed

+52
-31
lines changed

.changeset/thirty-humans-shop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Ensuring all options provided to `connect()` are passed onwards, allows packages to have their own option definitions for `connect()` and the abstract `generateSyncStreamImplementation()`.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
417417
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
418418
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
419419
return {
420+
...options,
420421
retryDelayMs:
421422
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
422423
crudUploadThrottleMs:
@@ -436,12 +437,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
436437
throw new Error('Cannot connect using a closed client');
437438
}
438439

439-
const { retryDelayMs, crudUploadThrottleMs } = this.resolvedConnectionOptions(options);
440+
const resolvedConnectOptions = this.resolvedConnectionOptions(options);
440441

441-
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
442-
retryDelayMs,
443-
crudUploadThrottleMs
444-
});
442+
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
445443
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
446444
statusChanged: (status) => {
447445
this.currentStatus = new SyncStatus({
@@ -555,7 +553,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
555553
* This method does include transaction ids in the result, but does not group
556554
* data by transaction. One batch may contain data from multiple transactions,
557555
* and a single transaction may be split over multiple batches.
558-
*
556+
*
559557
* @param limit Maximum number of CRUD entries to include in the batch
560558
* @returns A batch of CRUD operations to upload, or null if there are none
561559
*/
@@ -594,7 +592,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
594592
*
595593
* Unlike {@link getCrudBatch}, this only returns data from a single transaction at a time.
596594
* All data for the transaction is loaded into memory.
597-
*
595+
*
598596
* @returns A transaction of CRUD operations to upload, or null if there are none
599597
*/
600598
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
@@ -633,7 +631,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
633631
* Get an unique client id for this database.
634632
*
635633
* The id is not reset when the database is cleared, only when the database is deleted.
636-
*
634+
*
637635
* @returns A unique identifier for the database instance
638636
*/
639637
async getClientId(): Promise<string> {
@@ -661,7 +659,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
661659
/**
662660
* Execute a SQL write (INSERT/UPDATE/DELETE) query
663661
* and optionally return results.
664-
*
662+
*
665663
* @param sql The SQL query to execute
666664
* @param parameters Optional array of parameters to bind to the query
667665
* @returns The query result as an object with structured key-value pairs
@@ -674,7 +672,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
674672
/**
675673
* Execute a SQL write (INSERT/UPDATE/DELETE) query directly on the database without any PowerSync processing.
676674
* This bypasses certain PowerSync abstractions and is useful for accessing the raw database results.
677-
*
675+
*
678676
* @param sql The SQL query to execute
679677
* @param parameters Optional array of parameters to bind to the query
680678
* @returns The raw query result from the underlying database as a nested array of raw values, where each row is
@@ -689,7 +687,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
689687
* Execute a write query (INSERT/UPDATE/DELETE) multiple times with each parameter set
690688
* and optionally return results.
691689
* This is faster than executing separately with each parameter set.
692-
*
690+
*
693691
* @param sql The SQL query to execute
694692
* @param parameters Optional 2D array of parameter sets, where each inner array is a set of parameters for one execution
695693
* @returns The query result
@@ -701,7 +699,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
701699

702700
/**
703701
* Execute a read-only query and return results.
704-
*
702+
*
705703
* @param sql The SQL query to execute
706704
* @param parameters Optional array of parameters to bind to the query
707705
* @returns An array of results
@@ -713,7 +711,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
713711

714712
/**
715713
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
716-
*
714+
*
717715
* @param sql The SQL query to execute
718716
* @param parameters Optional array of parameters to bind to the query
719717
* @returns The first result if found, or null if no results are returned
@@ -725,7 +723,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
725723

726724
/**
727725
* Execute a read-only query and return the first result, error if the ResultSet is empty.
728-
*
726+
*
729727
* @param sql The SQL query to execute
730728
* @param parameters Optional array of parameters to bind to the query
731729
* @returns The first result matching the query
@@ -761,7 +759,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
761759
* Open a read-only transaction.
762760
* Read transactions can run concurrently to a write transaction.
763761
* Changes from any write transaction are not visible to read transactions started before it.
764-
*
762+
*
765763
* @param callback Function to execute within the transaction
766764
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
767765
* @returns The result of the callback
@@ -786,7 +784,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
786784
* Open a read-write transaction.
787785
* This takes a global lock - only one write transaction can execute against the database at a time.
788786
* Statements within the transaction must be done on the provided {@link Transaction} interface.
789-
*
787+
*
790788
* @param callback Function to execute within the transaction
791789
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
792790
* @returns The result of the callback
@@ -865,7 +863,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
865863
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
866864
*
867865
* Note that the `onChange` callback member of the handler is required.
868-
*
866+
*
869867
* @param sql The SQL query to execute
870868
* @param parameters Optional array of parameters to bind to the query
871869
* @param handler Callbacks for handling results and errors
@@ -915,7 +913,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
915913
* Execute a read query every time the source tables are modified.
916914
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
917915
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
918-
*
916+
*
919917
* @param sql The SQL query to execute
920918
* @param parameters Optional array of parameters to bind to the query
921919
* @param options Options for configuring watch behavior
@@ -944,7 +942,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
944942
* Resolves the list of tables that are used in a SQL query.
945943
* If tables are specified in the options, those are used directly.
946944
* Otherwise, analyzes the query using EXPLAIN to determine which tables are accessed.
947-
*
945+
*
948946
* @param sql The SQL query to analyze
949947
* @param parameters Optional parameters for the SQL query
950948
* @param options Optional watch options that may contain explicit table list
@@ -1077,7 +1075,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
10771075
*
10781076
* This is preferred over {@link watchWithAsyncGenerator} when multiple queries need to be performed
10791077
* together when data is changed.
1080-
*
1078+
*
10811079
* Note: do not declare this as `async *onChange` as it will not work in React Native.
10821080
*
10831081
* @param options Options for configuring watch behavior

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import {
22
AbstractPowerSyncDatabase,
33
AbstractStreamingSyncImplementation,
4+
AdditionalConnectionOptions,
45
BucketStorageAdapter,
56
DBAdapter,
67
PowerSyncBackendConnector,
8+
PowerSyncConnectionOptions,
79
PowerSyncDatabaseOptions,
810
PowerSyncDatabaseOptionsWithSettings,
11+
RequiredAdditionalConnectionOptions,
912
SqliteBucketStorage,
1013
SQLOpenFactory
1114
} from '@powersync/common';
@@ -15,11 +18,18 @@ import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyn
1518

1619
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
1720
import { NodeSQLOpenOptions } from './options.js';
21+
import { Dispatcher } from 'undici';
1822

1923
export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
2024
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
2125
};
2226

27+
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
28+
dispatcher?: Dispatcher;
29+
};
30+
31+
export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;
32+
2333
/**
2434
* A PowerSync database which provides SQLite functionality
2535
* which is automatically synced.
@@ -54,10 +64,18 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
5464
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
5565
}
5666

67+
connect(
68+
connector: PowerSyncBackendConnector,
69+
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
70+
): Promise<void> {
71+
return super.connect(connector, options);
72+
}
73+
5774
protected generateSyncStreamImplementation(
58-
connector: PowerSyncBackendConnector
75+
connector: PowerSyncBackendConnector,
76+
options: NodeAdditionalConnectionOptions
5977
): AbstractStreamingSyncImplementation {
60-
const remote = new NodeRemote(connector);
78+
const remote = new NodeRemote(connector, this.options.logger, { dispatcher: options.dispatcher });
6179

6280
return new NodeStreamingSyncImplementation({
6381
adapter: this.bucketStorageAdapter,

packages/node/src/sync/stream/NodeRemote.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
} from '@powersync/common';
1414
import { BSON } from 'bson';
1515
import Agent from 'proxy-agent';
16-
import { ProxyAgent, EnvHttpProxyAgent } from 'undici';
16+
import { EnvHttpProxyAgent, Dispatcher } from 'undici';
1717
import { WebSocket } from 'ws';
1818

1919
export const STREAMING_POST_TIMEOUT_MS = 30_000;
@@ -24,26 +24,26 @@ class NodeFetchProvider extends FetchImplementationProvider {
2424
}
2525
}
2626

27-
export class NodeRemote extends AbstractRemote {
28-
protected agent: ProxyAgent | undefined;
27+
export type NodeRemoteOptions = AbstractRemoteOptions & {
28+
dispatcher?: Dispatcher;
29+
};
2930

31+
export class NodeRemote extends AbstractRemote {
3032
constructor(
3133
protected connector: RemoteConnector,
3234
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
33-
options?: Partial<AbstractRemoteOptions>
35+
options?: Partial<NodeRemoteOptions>
3436
) {
35-
// Automatically uses relevant env vars for HTTP
36-
const agent = new EnvHttpProxyAgent();
37+
// EnvHttpProxyAgent automatically uses relevant env vars for HTTP
38+
const dispatcher = options?.dispatcher ?? new EnvHttpProxyAgent();
3739

3840
super(connector, logger, {
3941
...(options ?? {}),
4042
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
4143
fetchOptions: {
42-
dispatcher: agent
44+
dispatcher
4345
}
4446
});
45-
46-
this.agent = agent;
4747
}
4848

4949
protected createSocket(url: string): globalThis.WebSocket {

0 commit comments

Comments
 (0)