Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions builder/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"homepage": "https://github.com/flarelabs-net/poc-next",
"devDependencies": {
"@cloudflare/workers-types": "^4.20240909.0",
"@types/node": "^22.2.0",
"esbuild": "^0.23.0",
"glob": "^11.0.0",
Expand Down
8 changes: 7 additions & 1 deletion builder/src/build/build-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,18 @@ globalThis.fetch = (input, init) => {
if (init) delete init.cache;
return curFetch(input, init);
};
import { Readable } from 'node:stream';
globalThis.fetch.__nextPatched = isPatchedAlready;
fetch = globalThis.fetch;
const CustomRequest = class extends globalThis.Request {
constructor(input, init) {
console.log("CustomRequest", input);
if (init) delete init.cache;
if (init) {
delete init.cache;
if (init.body?.__node_stream__ === true) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine to do without the custom property and just check instance of Readable:

    if (init) {
      init = {
        ...init,
        cache: undefined,
        body: init.body instanceof Readable ? Readable.toWeb(init.body) : init.body
      }
    }

(at least, these works in my tests so far)

init.body = Readable.toWeb(init.body);
}
}
super(input, init);
}
};
Expand Down
139 changes: 93 additions & 46 deletions builder/src/templates/worker.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { Readable } from "node:stream";

import Stream from "node:stream";
import type { NextConfig } from "next";
import { NodeNextRequest, NodeNextResponse } from "next/dist/server/base-http/node";
import { createRequestResponseMocks } from "next/dist/server/lib/mock-request";
import { MockedResponse } from "next/dist/server/lib/mock-request";
import NextNodeServer, { NodeRequestHandler } from "next/dist/server/next-server";
import type { IncomingMessage } from "node:http";

const NON_BODY_RESPONSES = new Set([101, 204, 205, 304]);

/**
* Injected at build time
* (we practically follow what Next.js does here:
https://github.com/vercel/next.js/blob/68a7128/packages/next/src/build/utils.ts#L2137-L2139)
*/
// Injected at build time
const nextConfig: NextConfig = JSON.parse(process.env.__NEXT_PRIVATE_STANDALONE_CONFIG ?? "{}");

let requestHandler: NodeRequestHandler | null = null;
Expand All @@ -23,6 +21,7 @@ export default {
customServer: false,
dev: false,
dir: "",
minimalMode: false,
}).getRequestHandler();
}

Expand All @@ -32,50 +31,98 @@ export default {
let imageUrl =
url.searchParams.get("url") ?? "https://developers.cloudflare.com/_astro/logo.BU9hiExz.svg";
if (imageUrl.startsWith("/")) {
imageUrl = new URL(imageUrl, request.url).href;
return env.ASSETS.fetch(new URL(imageUrl, request.url));
}
return fetch(imageUrl, { cf: { cacheEverything: true } } as any);
}

const resBody = new TransformStream();
const writer = resBody.writable.getWriter();

const reqBodyNodeStream = request.body ? Readable.fromWeb(request.body as any) : undefined;

const { req, res } = createRequestResponseMocks({
method: request.method,
url: url.href.slice(url.origin.length),
headers: Object.fromEntries([...request.headers]),
bodyReadable: reqBodyNodeStream,
resWriter: (chunk) => {
writer.write(chunk).catch(console.error);
return true;
},
});

let headPromiseResolve: any = null;
const headPromise = new Promise<void>((resolve) => {
headPromiseResolve = resolve;
});
res.flushHeaders = () => headPromiseResolve?.();

if (reqBodyNodeStream != null) {
const origPush = reqBodyNodeStream.push;
reqBodyNodeStream.push = (chunk: any) => {
req.push(chunk);
return origPush.call(reqBodyNodeStream, chunk);
};
}

ctx.waitUntil((res as any).hasStreamed.then(() => writer.close()));
const { req, res, webResponse } = getWrappedStreams(request, ctx);

ctx.waitUntil(requestHandler(new NodeNextRequest(req), new NodeNextResponse(res)));

await Promise.race([res.headPromise, headPromise]);

return new Response(resBody.readable, {
status: res.statusCode,
headers: (res as any).headers,
});
return await webResponse();
},
};

function getWrappedStreams(request: Request, ctx: any) {
const url = new URL(request.url);

const req = (
request.body ? Stream.Readable.fromWeb(request.body as any) : Stream.Readable.from([])
) as IncomingMessage;
req.httpVersion = "1.0";
req.httpVersionMajor = 1;
req.httpVersionMinor = 0;
req.url = url.href.slice(url.origin.length);
req.headers = Object.fromEntries([...request.headers]);
req.method = request.method;
Object.defineProperty(req, "__node_stream__", {
value: true,
writable: false,
});
Object.defineProperty(req, "headersDistinct", {
get() {
const headers: Record<string, string[]> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (!value) continue;
headers[key] = Array.isArray(value) ? value : [value];
}
return headers;
},
});

const { readable, writable } = new IdentityTransformStream();
const resBodyWriter = writable.getWriter();

const res = new MockedResponse({
resWriter: (chunk) => {
resBodyWriter.write(typeof chunk === "string" ? Buffer.from(chunk) : chunk).catch((err: any) => {
if (
err.message.includes("WritableStream has been closed") ||
err.message.includes("Network connection lost")
) {
// safe to ignore
return;
}
console.error("Error in resBodyWriter.write");
console.error(err);
});
return true;
},
});

// It's implemented as a no-op, but really it should mark the headers as done
res.flushHeaders = () => (res as any).headPromiseResolve();

// Only allow statusCode to be modified if not sent
let { statusCode } = res;
Object.defineProperty(res, "statusCode", {
get: function () {
return statusCode;
},
set: function (val) {
if (this.finished || this.headersSent) {
console.error("headers already sent");
return;
}
statusCode = val;
},
});

// Make sure the writer is eventually closed
ctx.waitUntil((res as any).hasStreamed.finally(() => resBodyWriter.close().catch(() => {})));

return {
res,
req,
webResponse: async () => {
await res.headPromise;
// TODO: remove this once streaming with compression is working nicely
res.setHeader("content-encoding", "identity");
return new Response(NON_BODY_RESPONSES.has(res.statusCode) ? null : readable, {
status: res.statusCode,
headers: (res as any).headers,
});
},
};
}
3 changes: 2 additions & 1 deletion builder/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": true
"skipLibCheck": true,
"types": ["@cloudflare/workers-types"]
}
}
3 changes: 2 additions & 1 deletion examples/api/app/api/hello/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ export async function GET() {
}

export async function POST(request: Request) {
return new Response(`Hello post-World! body=${await request.text()}`);
const text = await request.text();
return new Response(`Hello post-World! body=${text}`);
}
2 changes: 1 addition & 1 deletion examples/api/wrangler.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#:schema node_modules/wrangler/config-schema.json
name = "api"
main = ".worker-next/index.mjs"
compatibility_date = "2024-08-29"
compatibility_date = "2024-09-16"
compatibility_flags = ["nodejs_compat_v2"]
workers_dev = true
minify = false # let's disable minification for easier debuggability
Expand Down
15 changes: 12 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.