Skip to content

Commit 47f47a7

Browse files
committed
v0.3.21-beta.1
1 parent 5e5d605 commit 47f47a7

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

channel.ts

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,19 @@ export const makeReadableStream = (
179179
): ReadableStream<Uint8Array> => {
180180
return new ReadableStream({
181181
async start(controller) {
182-
for await (const content of ch.recv(signal)) {
183-
controller.enqueue(content);
182+
try {
183+
for await (const content of ch.recv(signal)) {
184+
controller.enqueue(content);
185+
}
186+
controller.close();
187+
} catch (error) {
188+
// Handle cancellation gracefully
189+
if (signal?.aborted || error instanceof ClosedChannelError) {
190+
controller.close();
191+
} else {
192+
controller.error(error);
193+
}
184194
}
185-
// Uncomment if necessary. this will send a signal to the controller
186-
// if (signal?.aborted) {
187-
// controller.error(new Error("aborted"));
188-
// }
189-
controller.close();
190195
},
191196
cancel() {
192197
ch.close();
@@ -201,15 +206,23 @@ export const makeChanStream = (
201206
// Consume the transformed stream to trigger the pipeline
202207
const reader = stream.getReader();
203208
const processStream = async () => {
204-
while (true) {
205-
const { done, value } = await reader.read();
206-
if (done) break;
207-
await chan.send(value);
209+
try {
210+
while (true) {
211+
const { done, value } = await reader.read();
212+
if (done) break;
213+
await chan.send(value);
214+
}
215+
chan.close();
216+
} catch (err) {
217+
// Handle errors gracefully, especially for Node.js/undici
218+
if (!chan.signal.aborted) {
219+
console.error("error processing stream", err);
220+
chan.close();
221+
}
208222
}
209-
chan.close();
210223
};
211224
processStream().catch((err) => {
212-
if (!err?.target?.aborted) {
225+
if (!chan.signal.aborted) {
213226
console.error("error processing stream", err);
214227
}
215228
});

handlers.client.ts

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const onRequestStart: ServerMessageHandler<RequestStartMessage> = async (
5151
await handleWebSocket(message, state);
5252
return;
5353
}
54+
console.log(`[req-start] ${message.method} ${message.url} hasBody: ${message.hasBody}`);
5455
const abortCtrl = new AbortController();
5556
state.requests[message.id] = { abortCtrl };
5657
if (!message.hasBody) {
@@ -69,6 +70,11 @@ const onRequestStart: ServerMessageHandler<RequestStartMessage> = async (
6970
state.ch.out,
7071
abortCtrl.signal,
7172
).catch(ignoreIfClosed).finally(() => {
73+
// Ensure the body channel is closed when the request is cleaned up
74+
const req = state.requests[message.id];
75+
if (req?.body) {
76+
req.body.close();
77+
}
7278
delete state.requests[message.id];
7379
});
7480
}
@@ -88,6 +94,7 @@ const onRequestData: ServerMessageHandler<RequestDataMessage> = async (
8894
console.info("[req-data] req not found", message.id);
8995
return;
9096
}
97+
console.log(`[req-data] ${message.id} chunk size: ${message.chunk.length}`);
9198
// @ts-ignore: bodyData is a Uint8Array
9299
await reqBody.send?.(message.chunk);
93100
};
@@ -113,10 +120,12 @@ const onRequestDataEnd: ServerMessageHandler<RequestDataEndMessage> = (
113120
state,
114121
message,
115122
) => {
123+
console.log(`[req-data-end] ${message.id}`);
116124
const reqBody = state.requests[message.id]?.body;
117125
if (!reqBody) {
118126
return;
119127
}
128+
// Close the body channel to signal end of request data
120129
reqBody.close();
121130
};
122131

@@ -239,20 +248,29 @@ async function doFetch(
239248
// Read from the stream
240249
const signal = link(clientCh.signal, reqSignal);
241250
try {
251+
// For Node.js with undici, we need to handle the body differently
252+
const fetchOptions: RequestInit = {
253+
...state.client ? { client: state.client } : {},
254+
redirect: "manual",
255+
method: request.method,
256+
headers: state.client
257+
? { ...request.headers, host: request.domain }
258+
: request.headers,
259+
signal,
260+
};
261+
262+
// Only add body and duplex if we actually have a body
263+
if (request.body) {
264+
fetchOptions.body = request.body;
265+
// Remove duplex option for Node.js/undici compatibility
266+
// fetchOptions.duplex = "half";
267+
}
268+
242269
const response = await fetch(
243270
new URL(request.url, state.localAddr),
244-
{
245-
...state.client ? { client: state.client } : {},
246-
redirect: "manual",
247-
method: request.method,
248-
headers: state.client
249-
? { ...request.headers, host: request.domain }
250-
: request.headers,
251-
body: request.body,
252-
...(request.body ? { duplex: "half" } : {}),
253-
signal,
254-
},
271+
fetchOptions,
255272
);
273+
console.log("response", response);
256274

257275
const headers: Record<string, Array<string>> = {};
258276

@@ -289,6 +307,7 @@ async function doFetch(
289307
if (signal.aborted) {
290308
return;
291309
}
310+
console.error(`[doFetch] Error for request ${request.id}:`, err);
292311
throw err;
293312
}
294313
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@deco-cx/warp-node",
3-
"version": "0.3.20",
3+
"version": "0.3.21-beta.1",
44
"description": "WebSocket tunneling library for Node.js and other runtimes",
55
"type": "module",
66
"main": "dist/mod.js",

0 commit comments

Comments
 (0)