Skip to content

Commit af20fa2

Browse files
committed
Use Undici WebSocket with Dispatcher support.
1 parent 96ddd5d commit af20fa2

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@ import {
1313
SQLOpenFactory
1414
} from '@powersync/common';
1515

16-
import { NodeRemote } from '../sync/stream/NodeRemote.js';
16+
import { NodeCustomConnectionOptions, NodeRemote } from '../sync/stream/NodeRemote.js';
1717
import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js';
1818

19-
import { Dispatcher } from 'undici';
2019
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
2120
import { NodeSQLOpenOptions } from './options.js';
2221

@@ -30,13 +29,7 @@ export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
3029
remoteOptions?: Partial<AbstractRemoteOptions>;
3130
};
3231

33-
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
34-
/**
35-
* Optional custom dispatcher for HTTP connections (e.g. using undici).
36-
* Only used when the connection method is SyncStreamConnectionMethod.HTTP
37-
*/
38-
dispatcher?: Dispatcher;
39-
};
32+
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & NodeCustomConnectionOptions;
4033

4134
export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;
4235

@@ -76,7 +69,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
7669

7770
connect(
7871
connector: PowerSyncBackendConnector,
79-
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
72+
options?: PowerSyncConnectionOptions & NodeCustomConnectionOptions
8073
): Promise<void> {
8174
return super.connect(connector, options);
8275
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Dispatcher } from 'undici';
2+
3+
/**
4+
* A simple dispatcher wrapper that only records the last error.
5+
* Everything else passes straight through to the original handler.
6+
*/
7+
export class ErrorRecordingDispatcher extends Dispatcher {
8+
private targetDispatcher: Dispatcher;
9+
private onError: (error: Error) => void;
10+
11+
constructor(targetDispatcher: Dispatcher, onError: (error: Error) => void) {
12+
super();
13+
this.targetDispatcher = targetDispatcher;
14+
this.onError = onError;
15+
}
16+
17+
dispatch(opts: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean {
18+
// Create a simple wrapper that only intercepts errors
19+
const errorRecordingHandler: Dispatcher.DispatchHandler = {
20+
// New API methods (preferred)
21+
onRequestStart: handler.onRequestStart?.bind(handler),
22+
onRequestUpgrade: handler.onRequestUpgrade?.bind(handler),
23+
onResponseStart: handler.onResponseStart?.bind(handler),
24+
onResponseData: handler.onResponseData?.bind(handler),
25+
onResponseEnd: handler.onResponseEnd?.bind(handler),
26+
27+
onResponseError: (controller: any, error: Error) => {
28+
this.onError(error);
29+
// Pass through to original handler
30+
return handler.onResponseError?.(controller, error);
31+
},
32+
33+
// Legacy API methods (for backward compatibility)
34+
onConnect: handler.onConnect?.bind(handler),
35+
onUpgrade: handler.onUpgrade?.bind(handler),
36+
onHeaders: handler.onHeaders?.bind(handler),
37+
onData: handler.onData?.bind(handler),
38+
onComplete: handler.onComplete?.bind(handler),
39+
40+
onError: (error: Error) => {
41+
this.onError(error);
42+
43+
// Pass through to original handler
44+
return handler.onError?.(error);
45+
}
46+
};
47+
48+
// Delegate to the target dispatcher with our simple error-recording handler
49+
return this.targetDispatcher.dispatch(opts, errorRecordingHandler);
50+
}
51+
52+
async close(): Promise<void> {
53+
return this.targetDispatcher.close();
54+
}
55+
56+
async destroy(): Promise<void> {
57+
return this.targetDispatcher.destroy();
58+
}
59+
}

packages/node/src/sync/stream/NodeRemote.ts

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ import {
1111
RemoteConnector
1212
} from '@powersync/common';
1313
import { BSON } from 'bson';
14-
import Agent from 'proxy-agent';
15-
import { EnvHttpProxyAgent, Dispatcher } from 'undici';
16-
import { WebSocket } from 'ws';
14+
import { Dispatcher, EnvHttpProxyAgent, ErrorEvent, WebSocket as UndiciWebSocket } from 'undici';
15+
import { ErrorRecordingDispatcher } from './ErrorRecordingDispatcher.js';
1716

1817
export const STREAMING_POST_TIMEOUT_MS = 30_000;
1918

@@ -23,11 +22,21 @@ class NodeFetchProvider extends FetchImplementationProvider {
2322
}
2423
}
2524

26-
export type NodeRemoteOptions = AbstractRemoteOptions & {
25+
export type NodeCustomConnectionOptions = {
26+
/**
27+
* Optional custom dispatcher for HTTP or WEB_SOCKET connections.
28+
*
29+
* This can be used to customize proxy usage (using undici ProxyAgent),
30+
* or other connection options.
31+
*/
2732
dispatcher?: Dispatcher;
2833
};
2934

35+
export type NodeRemoteOptions = AbstractRemoteOptions & NodeCustomConnectionOptions;
36+
3037
export class NodeRemote extends AbstractRemote {
38+
private dispatcher: Dispatcher;
39+
3140
constructor(
3241
protected connector: RemoteConnector,
3342
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
@@ -43,16 +52,34 @@ export class NodeRemote extends AbstractRemote {
4352
},
4453
...(options ?? {})
4554
});
55+
56+
this.dispatcher = dispatcher;
4657
}
4758

4859
protected createSocket(url: string): globalThis.WebSocket {
49-
return new WebSocket(url, {
50-
// Automatically uses relevant env vars for web sockets
51-
agent: new Agent.ProxyAgent(),
60+
// Create dedicated dispatcher for this WebSocket
61+
let ws: UndiciWebSocket | undefined;
62+
const onError = (error: Error) => {
63+
// When we receive an error from the Dispatcher, emit the event on the websocket.
64+
// This will take precedence over the WebSocket's own error event, giving more details on what went wrong.
65+
const event = new ErrorEvent('error', {
66+
error,
67+
message: error.message
68+
});
69+
ws?.dispatchEvent(event);
70+
};
71+
72+
const errorRecordingDispatcher = new ErrorRecordingDispatcher(this.dispatcher, onError);
73+
74+
// Create WebSocket with dedicated dispatcher
75+
ws = new UndiciWebSocket(url, {
76+
dispatcher: errorRecordingDispatcher,
5277
headers: {
5378
'User-Agent': this.getUserAgent()
5479
}
55-
}) as any as globalThis.WebSocket; // This is compatible in Node environments
80+
});
81+
82+
return ws as globalThis.WebSocket;
5683
}
5784

5885
getUserAgent(): string {

0 commit comments

Comments
 (0)