Skip to content

Commit b42a1fd

Browse files
authored
Convert node stream to web stream
* Add @cloudflare/workers-types * Bump compatibility_date
1 parent b84cd5f commit b42a1fd

File tree

7 files changed

+118
-53
lines changed

7 files changed

+118
-53
lines changed

builder/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
},
2828
"homepage": "https://github.com/flarelabs-net/poc-next",
2929
"devDependencies": {
30+
"@cloudflare/workers-types": "^4.20240909.0",
3031
"@types/node": "^22.2.0",
3132
"esbuild": "^0.23.0",
3233
"glob": "^11.0.0",

builder/src/build/build-worker.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,18 @@ globalThis.fetch = (input, init) => {
100100
if (init) delete init.cache;
101101
return curFetch(input, init);
102102
};
103+
import { Readable } from 'node:stream';
103104
globalThis.fetch.__nextPatched = isPatchedAlready;
104105
fetch = globalThis.fetch;
105106
const CustomRequest = class extends globalThis.Request {
106107
constructor(input, init) {
107108
console.log("CustomRequest", input);
108-
if (init) delete init.cache;
109+
if (init) {
110+
delete init.cache;
111+
if (init.body?.__node_stream__ === true) {
112+
init.body = Readable.toWeb(init.body);
113+
}
114+
}
109115
super(input, init);
110116
}
111117
};

builder/src/templates/worker.ts

Lines changed: 93 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
import { Readable } from "node:stream";
2-
1+
import Stream from "node:stream";
32
import type { NextConfig } from "next";
43
import { NodeNextRequest, NodeNextResponse } from "next/dist/server/base-http/node";
5-
import { createRequestResponseMocks } from "next/dist/server/lib/mock-request";
4+
import { MockedResponse } from "next/dist/server/lib/mock-request";
65
import NextNodeServer, { NodeRequestHandler } from "next/dist/server/next-server";
6+
import type { IncomingMessage } from "node:http";
7+
8+
const NON_BODY_RESPONSES = new Set([101, 204, 205, 304]);
79

8-
/**
9-
* Injected at build time
10-
* (we practically follow what Next.js does here:
11-
https://github.com/vercel/next.js/blob/68a7128/packages/next/src/build/utils.ts#L2137-L2139)
12-
*/
10+
// Injected at build time
1311
const nextConfig: NextConfig = JSON.parse(process.env.__NEXT_PRIVATE_STANDALONE_CONFIG ?? "{}");
1412

1513
let requestHandler: NodeRequestHandler | null = null;
@@ -23,6 +21,7 @@ export default {
2321
customServer: false,
2422
dev: false,
2523
dir: "",
24+
minimalMode: false,
2625
}).getRequestHandler();
2726
}
2827

@@ -32,50 +31,98 @@ export default {
3231
let imageUrl =
3332
url.searchParams.get("url") ?? "https://developers.cloudflare.com/_astro/logo.BU9hiExz.svg";
3433
if (imageUrl.startsWith("/")) {
35-
imageUrl = new URL(imageUrl, request.url).href;
34+
return env.ASSETS.fetch(new URL(imageUrl, request.url));
3635
}
3736
return fetch(imageUrl, { cf: { cacheEverything: true } } as any);
3837
}
3938

40-
const resBody = new TransformStream();
41-
const writer = resBody.writable.getWriter();
42-
43-
const reqBodyNodeStream = request.body ? Readable.fromWeb(request.body as any) : undefined;
44-
45-
const { req, res } = createRequestResponseMocks({
46-
method: request.method,
47-
url: url.href.slice(url.origin.length),
48-
headers: Object.fromEntries([...request.headers]),
49-
bodyReadable: reqBodyNodeStream,
50-
resWriter: (chunk) => {
51-
writer.write(chunk).catch(console.error);
52-
return true;
53-
},
54-
});
55-
56-
let headPromiseResolve: any = null;
57-
const headPromise = new Promise<void>((resolve) => {
58-
headPromiseResolve = resolve;
59-
});
60-
res.flushHeaders = () => headPromiseResolve?.();
61-
62-
if (reqBodyNodeStream != null) {
63-
const origPush = reqBodyNodeStream.push;
64-
reqBodyNodeStream.push = (chunk: any) => {
65-
req.push(chunk);
66-
return origPush.call(reqBodyNodeStream, chunk);
67-
};
68-
}
69-
70-
ctx.waitUntil((res as any).hasStreamed.then(() => writer.close()));
39+
const { req, res, webResponse } = getWrappedStreams(request, ctx);
7140

7241
ctx.waitUntil(requestHandler(new NodeNextRequest(req), new NodeNextResponse(res)));
7342

74-
await Promise.race([res.headPromise, headPromise]);
75-
76-
return new Response(resBody.readable, {
77-
status: res.statusCode,
78-
headers: (res as any).headers,
79-
});
43+
return await webResponse();
8044
},
8145
};
46+
47+
function getWrappedStreams(request: Request, ctx: any) {
48+
const url = new URL(request.url);
49+
50+
const req = (
51+
request.body ? Stream.Readable.fromWeb(request.body as any) : Stream.Readable.from([])
52+
) as IncomingMessage;
53+
req.httpVersion = "1.0";
54+
req.httpVersionMajor = 1;
55+
req.httpVersionMinor = 0;
56+
req.url = url.href.slice(url.origin.length);
57+
req.headers = Object.fromEntries([...request.headers]);
58+
req.method = request.method;
59+
Object.defineProperty(req, "__node_stream__", {
60+
value: true,
61+
writable: false,
62+
});
63+
Object.defineProperty(req, "headersDistinct", {
64+
get() {
65+
const headers: Record<string, string[]> = {};
66+
for (const [key, value] of Object.entries(req.headers)) {
67+
if (!value) continue;
68+
headers[key] = Array.isArray(value) ? value : [value];
69+
}
70+
return headers;
71+
},
72+
});
73+
74+
const { readable, writable } = new IdentityTransformStream();
75+
const resBodyWriter = writable.getWriter();
76+
77+
const res = new MockedResponse({
78+
resWriter: (chunk) => {
79+
resBodyWriter.write(typeof chunk === "string" ? Buffer.from(chunk) : chunk).catch((err: any) => {
80+
if (
81+
err.message.includes("WritableStream has been closed") ||
82+
err.message.includes("Network connection lost")
83+
) {
84+
// safe to ignore
85+
return;
86+
}
87+
console.error("Error in resBodyWriter.write");
88+
console.error(err);
89+
});
90+
return true;
91+
},
92+
});
93+
94+
// It's implemented as a no-op, but really it should mark the headers as done
95+
res.flushHeaders = () => (res as any).headPromiseResolve();
96+
97+
// Only allow statusCode to be modified if not sent
98+
let { statusCode } = res;
99+
Object.defineProperty(res, "statusCode", {
100+
get: function () {
101+
return statusCode;
102+
},
103+
set: function (val) {
104+
if (this.finished || this.headersSent) {
105+
console.error("headers already sent");
106+
return;
107+
}
108+
statusCode = val;
109+
},
110+
});
111+
112+
// Make sure the writer is eventually closed
113+
ctx.waitUntil((res as any).hasStreamed.finally(() => resBodyWriter.close().catch(() => {})));
114+
115+
return {
116+
res,
117+
req,
118+
webResponse: async () => {
119+
await res.headPromise;
120+
// TODO: remove this once streaming with compression is working nicely
121+
res.setHeader("content-encoding", "identity");
122+
return new Response(NON_BODY_RESPONSES.has(res.statusCode) ? null : readable, {
123+
status: res.statusCode,
124+
headers: (res as any).headers,
125+
});
126+
},
127+
};
128+
}

builder/tsconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"esModuleInterop": true,
77
"forceConsistentCasingInFileNames": true,
88
"strict": true,
9-
"skipLibCheck": true
9+
"skipLibCheck": true,
10+
"types": ["@cloudflare/workers-types"]
1011
}
1112
}

examples/api/app/api/hello/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ export async function GET() {
88
}
99

1010
export async function POST(request: Request) {
11-
return new Response(`Hello post-World! body=${await request.text()}`);
11+
const text = await request.text();
12+
return new Response(`Hello post-World! body=${text}`);
1213
}

examples/api/wrangler.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#:schema node_modules/wrangler/config-schema.json
22
name = "api"
33
main = ".worker-next/index.mjs"
4-
compatibility_date = "2024-08-29"
4+
compatibility_date = "2024-09-16"
55
compatibility_flags = ["nodejs_compat_v2"]
66
workers_dev = true
77
minify = false # let's disable minification for easier debuggability

pnpm-lock.yaml

Lines changed: 12 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)