Skip to content

Commit 896455c

Browse files
committed
Add ALL_PROXY, WS_PROXY, WSS_PROXY support.
1 parent d5f3e74 commit 896455c

File tree

3 files changed

+61
-25
lines changed

3 files changed

+61
-25
lines changed

packages/node/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@
5353
"async-lock": "^1.4.0",
5454
"bson": "^6.6.0",
5555
"comlink": "^4.4.2",
56-
"proxy-agent": "^6.5.0",
57-
"undici": "^7.8.0",
56+
"undici": "^7.10.0",
5857
"ws": "^8.18.1"
5958
},
6059
"devDependencies": {

packages/node/src/sync/stream/ErrorRecordingDispatcher.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import { Dispatcher } from 'undici';
66
*/
77
export class ErrorRecordingDispatcher extends Dispatcher {
88
private targetDispatcher: Dispatcher;
9-
private onError: (error: Error) => void;
9+
public onError: ((error: Error) => void) | undefined;
1010

11-
constructor(targetDispatcher: Dispatcher, onError: (error: Error) => void) {
11+
constructor(targetDispatcher: Dispatcher) {
1212
super();
1313
this.targetDispatcher = targetDispatcher;
14-
this.onError = onError;
1514
}
1615

1716
dispatch(opts: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean {
@@ -25,7 +24,7 @@ export class ErrorRecordingDispatcher extends Dispatcher {
2524
onResponseEnd: handler.onResponseEnd?.bind(handler),
2625

2726
onResponseError: (controller: any, error: Error) => {
28-
this.onError(error);
27+
this.onError?.(error);
2928
// Pass through to original handler
3029
return handler.onResponseError?.(controller, error);
3130
},
@@ -38,7 +37,7 @@ export class ErrorRecordingDispatcher extends Dispatcher {
3837
onComplete: handler.onComplete?.bind(handler),
3938

4039
onError: (error: Error) => {
41-
this.onError(error);
40+
this.onError?.(error);
4241

4342
// Pass through to original handler
4443
return handler.onError?.(error);

packages/node/src/sync/stream/NodeRemote.ts

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,14 @@ import {
1111
RemoteConnector
1212
} from '@powersync/common';
1313
import { BSON } from 'bson';
14-
import { Dispatcher, EnvHttpProxyAgent, ErrorEvent, WebSocket as UndiciWebSocket } from 'undici';
14+
import {
15+
Dispatcher,
16+
EnvHttpProxyAgent,
17+
ErrorEvent,
18+
getGlobalDispatcher,
19+
ProxyAgent,
20+
WebSocket as UndiciWebSocket
21+
} from 'undici';
1522
import { ErrorRecordingDispatcher } from './ErrorRecordingDispatcher.js';
1623

1724
export const STREAMING_POST_TIMEOUT_MS = 30_000;
@@ -35,51 +42,64 @@ export type NodeCustomConnectionOptions = {
3542
export type NodeRemoteOptions = AbstractRemoteOptions & NodeCustomConnectionOptions;
3643

3744
export class NodeRemote extends AbstractRemote {
38-
private dispatcher: Dispatcher;
45+
private wsDispatcher: Dispatcher | undefined;
3946

4047
constructor(
4148
protected connector: RemoteConnector,
4249
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
4350
options?: Partial<NodeRemoteOptions>
4451
) {
45-
// EnvHttpProxyAgent automatically uses relevant env vars for HTTP
46-
const dispatcher = options?.dispatcher ?? new EnvHttpProxyAgent();
52+
const fetchDispatcher = options?.dispatcher ?? defaultFetchDispatcher();
4753

4854
super(connector, logger, {
4955
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
5056
fetchOptions: {
51-
dispatcher
57+
dispatcher: fetchDispatcher
5258
},
5359
...(options ?? {})
5460
});
5561

56-
this.dispatcher = dispatcher;
62+
this.wsDispatcher = options?.dispatcher;
5763
}
5864

5965
protected createSocket(url: string): globalThis.WebSocket {
6066
// Create dedicated dispatcher for this WebSocket
61-
let ws: UndiciWebSocket | undefined;
62-
const onError = (error: Error) => {
67+
const baseDispatcher = this.getWebsocketDispatcher(url);
68+
const errorRecordingDispatcher = new ErrorRecordingDispatcher(baseDispatcher);
69+
70+
// Create WebSocket with dedicated dispatcher
71+
const ws = new UndiciWebSocket(url, {
72+
dispatcher: errorRecordingDispatcher,
73+
headers: {
74+
'User-Agent': this.getUserAgent()
75+
}
76+
});
77+
78+
errorRecordingDispatcher.onError = (error: Error) => {
6379
// When we receive an error from the Dispatcher, emit the event on the websocket.
6480
// This will take precedence over the WebSocket's own error event, giving more details on what went wrong.
6581
const event = new ErrorEvent('error', {
6682
error,
6783
message: error.message
6884
});
69-
ws?.dispatchEvent(event);
85+
ws.dispatchEvent(event);
7086
};
7187

72-
const errorRecordingDispatcher = new ErrorRecordingDispatcher(this.dispatcher, onError);
88+
return ws as globalThis.WebSocket;
89+
}
7390

74-
// Create WebSocket with dedicated dispatcher
75-
ws = new UndiciWebSocket(url, {
76-
dispatcher: errorRecordingDispatcher,
77-
headers: {
78-
'User-Agent': this.getUserAgent()
79-
}
80-
});
91+
protected getWebsocketDispatcher(url: string) {
92+
if (this.wsDispatcher != null) {
93+
return this.wsDispatcher;
94+
}
8195

82-
return ws as globalThis.WebSocket;
96+
const protocol = new URL(url).protocol.replace(':', '');
97+
const proxy = getProxyForProtocol(protocol);
98+
if (proxy != null) {
99+
return new ProxyAgent(proxy);
100+
} else {
101+
return getGlobalDispatcher();
102+
}
83103
}
84104

85105
getUserAgent(): string {
@@ -95,3 +115,21 @@ export class NodeRemote extends AbstractRemote {
95115
return BSON;
96116
}
97117
}
118+
119+
function defaultFetchDispatcher(): Dispatcher {
120+
// EnvHttpProxyAgent automatically uses HTTP_PROXY, HTTPS_PROXY and NO_PROXY env vars by default.
121+
// We add ALL_PROXY support.
122+
return new EnvHttpProxyAgent({
123+
httpProxy: getProxyForProtocol('http'),
124+
httpsProxy: getProxyForProtocol('https')
125+
});
126+
}
127+
128+
function getProxyForProtocol(protocol: string): string | undefined {
129+
return (
130+
process.env[`${protocol.toLowerCase()}_proxy`] ??
131+
process.env[`${protocol.toUpperCase()}_PROXY`] ??
132+
process.env[`all_proxy`] ??
133+
process.env[`ALL_PROXY`]
134+
);
135+
}

0 commit comments

Comments
 (0)