Skip to content

Commit 3c0b07d

Browse files
committed
Comments
1 parent b60a0ae commit 3c0b07d

File tree

1 file changed

+167
-10
lines changed
  • packages/@apphosting/adapter-nextjs/src/bin

1 file changed

+167
-10
lines changed

packages/@apphosting/adapter-nextjs/src/bin/serve.ts

Lines changed: 167 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,35 +13,46 @@ import { HeaderValue } from "../protos/envoy/config/core/v3/base_pb.js";
1313
import { Socket } from "node:net";
1414
import { PassThrough } from "node:stream";
1515

16+
// The standalone directory is the root of the Next.js application.
1617
const dir = join(process.cwd(), process.argv[2] || ".next/standalone");
1718

19+
// Standard NodeJS HTTP server port and hostname.
1820
const port = parseInt(process.env.PORT!, 10) || 3000
1921
const hostname = process.env.HOSTNAME || '0.0.0.0'
2022

23+
// Polyfill for the `self` global object, used by Next.js in minimal mode.
2124
// @ts-ignore
2225
globalThis.self = globalThis;
26+
// Polyfill for the `AsyncLocalStorage` global object, used by Next.js in minimal mode.
2327
globalThis.AsyncLocalStorage = AsyncLocalStorage;
2428

29+
// Required by Next.js in minimal mode.
2530
// @ts-ignore
2631
process.env.NODE_ENV = "production";
2732

33+
// Allow the keep-alive timeout to be configured.
2834
let keepAliveTimeout: number | undefined = parseInt(process.env.KEEP_ALIVE_TIMEOUT!, 10);
2935

36+
// Load the Next.js configuration from the standalone directory.
3037
const conf = JSON.parse(readFileSync(join(dir, ".next", "required-server-files.json"), "utf-8")).config;
3138

39+
// Pass the Next.js configuration to the Next.js server.
3240
process.env.__NEXT_PRIVATE_STANDALONE_CONFIG = JSON.stringify(conf);
3341

42+
// Increase the max listeners to prevent warnings when many requests are in-flight.
3443
process.setMaxListeners(1_000);
3544

45+
// Dynamically import the Next.js middleware.
3646
const resolveMiddleware = import(join(dir, ".next/server/middleware.js"));
3747

38-
// TODO don't hardcode
48+
// TODO don't hardcode these matchers, they should be derived from the build output.
3949
const matchers = [
4050
'/about/:path*',
4151
'/((?!_next|firebase|[^?]*\\.(?:html?|css|js(?!on)|jpe?g|webp|png|gif|svg|ttf|woff2?|ico|csv|docx?|xlsx?|zip|webmanifest)).*)',
4252
'/private/:path*',
4353
].map(it => new RegExp(it));
4454

55+
// If the keep-alive timeout is not a valid number, use the default.
4556
if (
4657
Number.isNaN(keepAliveTimeout) ||
4758
!Number.isFinite(keepAliveTimeout) ||
@@ -50,6 +61,7 @@ if (
5061
keepAliveTimeout = undefined
5162
}
5263

64+
// Initialize the Next.js server in minimal mode.
5365
const resolveNextServer = import(join(dir, "node_modules/next/dist/server/next-server.js")).then(async ({ default: NextServer }) => {
5466
const server = new NextServer.default({
5567
conf,
@@ -63,6 +75,16 @@ const resolveNextServer = import(join(dir, "node_modules/next/dist/server/next-s
6375
return server;
6476
});
6577

78+
/**
79+
* Injects App Hosting specific headers into the response.
80+
*
81+
* This is used to communicate the postponed state of a page to the App Hosting backend.
82+
* The backend will then use this information to resume the request when the page is
83+
* ready.
84+
*
85+
* @param req The incoming request.
86+
* @param res The server response.
87+
*/
6688
async function injectAppHostingHeaders(req: IncomingMessage, res: ServerResponse) {
6789
if (req.method !== 'GET' && req.method !== 'HEAD') return;
6890
if (!res.getHeaderNames().includes('x-nextjs-postponed')) return;
@@ -75,9 +97,31 @@ async function injectAppHostingHeaders(req: IncomingMessage, res: ServerResponse
7597
res.appendHeader('x-fah-postponed', Buffer.from(cacheEntry.value.postponed).toString('base64url'));
7698
}
7799

100+
/**
101+
* Handles incoming HTTP requests.
102+
*
103+
* This function is the entry point for all HTTP requests. It is responsible for
104+
* proxying requests to the Next.js server and for handling PPR (Partial Prerendering)
105+
* requests.
106+
*
107+
* @param req The incoming request.
108+
* @param res The server response.
109+
*/
78110
async function requestHandle(req: IncomingMessage, res: ServerResponse) {
111+
// This is a temporary workaround to enable PPR for the home page.
79112
const isPPR = req.url === "/";
80113
if (isPPR) {
114+
/**
115+
* This is a critical interception. The Next.js server (`getRequestHandler`)
116+
* takes full control of the `ServerResponse` object and doesn't provide
117+
* a simple "beforeWrite" hook.
118+
*
119+
* To inject our `x-fah-postponed` header *before* Next.js sends the
120+
* first body chunk, we must monkey-patch `res.write` and `res.end`.
121+
* We wrap them in a promise (`resolveHeaders`) to ensure our
122+
* `injectAppHostingHeaders` function runs exactly once before any
123+
* data is sent to the client.
124+
*/
81125
const originalWrite = res.write.bind(res);
82126
let resolveHeaders: Promise<void> | undefined;
83127
// We need to append our headers before the body starts getting written
@@ -104,25 +148,60 @@ async function requestHandle(req: IncomingMessage, res: ServerResponse) {
104148
return nextServer.getRequestHandler()(req, res, parsedUrl);
105149
};
106150

107-
151+
/**
152+
* The gRPC server that handles Envoy's external processing requests.
153+
*
154+
* This server is responsible for handling all gRPC requests from Envoy. It is
155+
* used to implement middleware and to resume PPR requests.
156+
*/
108157
const grpcServer = fastify({ http2: true } as {});
109158

110159
await grpcServer.register(fastifyConnectPlugin, {
111160
routes: (router) => router.service(ExternalProcessor, {
161+
/**
162+
* The `process` function is the entry point for all gRPC requests.
163+
*
164+
* It is a bidirectional streaming RPC that allows the data plane to send
165+
* information about the HTTP request to the service and for the service to
166+
* send back a `ProcessingResponse` message that directs the data plane on
167+
* how to handle the request.
168+
*
169+
* https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
170+
*
171+
* @param callouts The stream of `ProcessingRequest` messages from the data plane.
172+
*/
112173
process: async function *processCallouts(callouts) {
113174
let requestHeaders: HeaderValue[] = [];
114175
let resolveResumeBuffer: Promise<PassThrough>|undefined;
115176
let path: string|undefined = undefined;
177+
// For whatever reason the header.value is always an empty string at least with
178+
// my local version of envoy. I have to decode the rawValue every time
116179
const getRequestHeader = (key: string) => {
117180
const header = requestHeaders.find((it) => it.key === key);
118181
if (header) return header.value || new TextDecoder().decode(header.rawValue);
119182
return undefined;
120183
}
121184
for await (const callout of callouts) {
185+
console.log(callout);
122186
switch (callout.request.case) {
123187
case "requestHeaders": {
124188
requestHeaders = callout.request.value.headers?.headers || [];
189+
// TODO look at the callout attributes, can we send thing like path
190+
// so we don't have to parse from the pseudo headers
125191
path = getRequestHeader(":path");
192+
/**
193+
* `requestHeaders` is the first callout we get. If `endOfStream` is
194+
* true, it's a `GET` (or other body-less request), and we can run
195+
* the middleware logic immediately. When there is no body
196+
* `requestBody` would not otherwise be called.
197+
*
198+
* We do this by *intentionally falling through* to the `requestBody`
199+
* case below, which contains our unified middleware logic.
200+
*
201+
* If `endOfStream` is false (e.g., a `POST`), we `break` and wait
202+
* for the `requestBody` callout to arrive, which will then
203+
* execute the *same* logic block.
204+
*/
126205
if (!callout.request.value.endOfStream) break;
127206
}
128207
case "requestBody": {
@@ -131,8 +210,14 @@ await grpcServer.register(fastifyConnectPlugin, {
131210
const scheme = getRequestHeader(":scheme")!;
132211
const referrer = getRequestHeader("referer");
133212
const authority = getRequestHeader(":authority")!;
213+
214+
// If the path does not match any of the middleware matchers, we can
215+
// skip the middleware execution.
134216
if (!matchers.some(it => path?.match(it))) break;
135-
// middleware is intended for v8 isolates, with the fetch api
217+
218+
// Next.js middleware is intended for v8 isolates, with the fetch api.
219+
// We construct a Fetch API compliant request object to pass to the
220+
// middleware.
136221
const middlewareRequest = {
137222
url: `${scheme}://${authority}${path}`,
138223
method,
@@ -167,6 +252,9 @@ await grpcServer.register(fastifyConnectPlugin, {
167252
};
168253
continue;
169254
}
255+
// If the middleware returns a response with the `x-middleware-next`
256+
// header, it means we should continue processing the request as if
257+
// the middleware was not there.
170258
if (middlewareResponse.headers.has("x-middleware-next")) break;
171259
const middlewareResponseHeaders = Object.fromEntries(middlewareResponse.headers);
172260
delete middlewareResponseHeaders["x-middleware-next"]; // Clean up middleware-specific header, TODO clean up other headers
@@ -176,6 +264,8 @@ await grpcServer.register(fastifyConnectPlugin, {
176264
header: { key, rawValue: Uint8Array.from(Buffer.from(value)) },
177265
}));
178266

267+
// If the middleware returns a response, we send it back to the client
268+
// and stop processing the request.
179269
yield {
180270
response: {
181271
case: "immediateResponse",
@@ -191,8 +281,14 @@ await grpcServer.register(fastifyConnectPlugin, {
191281
continue;
192282
}
193283
case "responseHeaders": {
284+
// This is where we handle PPR resumption.
285+
// If the response has a `x-fah-postponed` header, it means the page
286+
// is in a postponed state and we need to resume it.
194287
const postponedToken = callout.request.value.headers?.headers.find((it) => it.key === "x-fah-postponed")?.rawValue;
195288
if (!postponedToken) break;
289+
// We tell Envoy to continue processing the request, but we also
290+
// modify the headers to indicate that the response is chunked and
291+
// to remove the `x-fah-postponed` and `content-length` headers.
196292
yield {
197293
response: {
198294
case: "responseHeaders",
@@ -210,11 +306,17 @@ await grpcServer.register(fastifyConnectPlugin, {
210306
},
211307
}
212308

309+
// We then kick off the resume request, so it's happening in parallel to the GET's
310+
// body being sent to the client. Buffer it up.
213311
resolveResumeBuffer = new Promise<PassThrough>(async (resolve) => {
214312
const socket = new Socket();
215313
const resumeRequest = new IncomingMessage(socket);
216314
const postponed = Buffer.from(new TextDecoder().decode(postponedToken), "base64url").toString();
217315

316+
// We construct a new request to the Next.js server to resume the
317+
// postponed page.
318+
// This is the old way of doing PPR resumption, I'm having trouble with it in NextJS 16
319+
// TODO investigate a stable API or why this is bugging out on me
218320
const resumePath = `/_next/postponed/resume${path === "/" ? "/index" : path}`;
219321
resumeRequest.url = resumePath;
220322
resumeRequest.method = "POST";
@@ -235,6 +337,23 @@ await grpcServer.register(fastifyConnectPlugin, {
235337
const resumeResponse = new ServerResponse(resumeRequest);
236338
const intermediaryStream = new PassThrough();
237339

340+
/**
341+
* This is the core of the PPR streaming workaround. We cannot
342+
* directly `await` the `resumeResponse` as it's a "push-style"
343+
* classic Node.js stream, not a modern "pull-style" async iterable.
344+
*
345+
* To fix this, we create an `intermediaryStream` (a PassThrough)
346+
* and manually override `resumeResponse.write` and `resumeResponse.end`.
347+
*
348+
* This effectively "pipes" the data from the Next.js handler (which
349+
* *thinks* it's writing to a normal socket) into our intermediary
350+
* stream, which we *can* await in the `responseBody` case.
351+
*
352+
* There's probably a "better" way of doing but the old school pipes
353+
* in NodeJS are rough. It might be better to start with the new
354+
* fetch style request/response and convert to InboundMessage /
355+
* ServerResponse from those more modern APIs.
356+
*/
238357
resumeResponse.write = (data) => {
239358
const result = intermediaryStream.push(data);
240359
if (!result) intermediaryStream.on("drain", () => resumeResponse.emit("drain"));
@@ -261,30 +380,63 @@ await grpcServer.register(fastifyConnectPlugin, {
261380
continue;
262381
}
263382
case "responseBody": {
264-
//if (!resolveResumeBuffer) break;
383+
// Let the original GET request be fulfilled, since we're using NextJS minimal-mode
384+
// that request will be served in a CDN friendly manner, hopefully we have a hit ;)
385+
386+
/**
387+
* -------------------- Full-Duplex Mode ----------------------
388+
*
389+
* Because we're using `streamedResponse` later (for PPR), we are
390+
* we've configured Envoy for full-duplex streaming mode.
391+
*
392+
* In this mode, Envoy *always* expects us to send `streamedResponse`
393+
* mutations. If we just `yield` a simple `CONTINUE` (our fallback)
394+
* for a non-PPR request, Envoy's state machine gets confused
395+
* and it will segfault.
396+
*
397+
* Therefore, for *all* requests, we must replace the response
398+
* body with a stream, even if that stream is just the *original*
399+
* response body.
400+
*
401+
* TODO: look into switching mode dynamically using `mode_override`
402+
* in `responseHeaders` to avoid this for non-PPR requests.
403+
*
404+
* This logic determines the "passthrough" end_stream state.
405+
* `end_stream` should *only* be true if:
406+
* 1. We are *not* doing a PPR resume (`!resolveResumeBuffer`)
407+
* 2. AND the original upstream chunk was the last one.
408+
*
409+
* TODO name resolveResumeBuffer better
410+
*/
265411
const end_stream = !resolveResumeBuffer && callout.request.value.endOfStream;
266412

267-
// OK now that we're duplex streaming, we need to replace everythign with a stream
268-
// TODO look into switching mode for PPR
413+
// Serve up the original response, only EOF if this is not a PPR request and the
414+
// original chunk was EOF.
269415
const body = callout.request.value.body;
270416
yield {
271417
response: {
272418
case: "responseBody",
273419
value: {
274420
response: {
275421
status: CommonResponse_ResponseStatus.CONTINUE_AND_REPLACE,
422+
// Note: We use 'streamedResponse' even for the pass-through.
276423
bodyMutation: { mutation: { case: 'streamedResponse', value: { body, endOfStream: end_stream } } },
277424
end_stream,
278425
},
279426
},
280427
},
281428
};
282429

430+
// If the original response wasn't EOF yet, continue serving chunks (which will call this
431+
// case again.
283432
if (!callout.request.value.endOfStream) continue;
284433

285434
const resumeBuffer = await resolveResumeBuffer!;
286-
resolveResumeBuffer = undefined;
435+
resolveResumeBuffer = undefined; // TODO do I need to do this?
287436

437+
// Ok, let's start streaming in the PPR resume response
438+
// full duplex mode is what allows us to yield multiple times, so we can stream, this
439+
// is a marked improvement over the primitives available in proxy-Wasm at the moment.
288440
for await (const body of resumeBuffer) {
289441
yield {
290442
response: {
@@ -300,6 +452,7 @@ await grpcServer.register(fastifyConnectPlugin, {
300452
};
301453
}
302454

455+
// Finally send EOF
303456
yield {
304457
response: {
305458
case: "responseBody",
@@ -314,9 +467,12 @@ await grpcServer.register(fastifyConnectPlugin, {
314467
};
315468
continue;
316469
}
470+
// TODO can we intercept trailers to handle waitFor functionality?
317471
}
472+
// If we fall through the switch, it means we are not handling the
473+
// request in any special way, so we just tell Envoy to continue.
318474
const empty = {};
319-
yield {
475+
yield {
320476
response: {
321477
case: callout.request.case,
322478
value: {
@@ -331,7 +487,7 @@ await grpcServer.register(fastifyConnectPlugin, {
331487
}),
332488
});
333489

334-
490+
// Create the main HTTP server.
335491
createServer(requestHandle).listen(port, hostname, () => {
336492
console.log(`NextJS listening on http://${hostname}:${port}`);
337493
}).on("error", (err) => {
@@ -341,7 +497,8 @@ createServer(requestHandle).listen(port, hostname, () => {
341497

342498
await grpcServer.ready();
343499

500+
// Start the gRPC server.
344501
grpcServer.listen({ host: hostname, port: port+1 }, (err, address) => {
345502
if (err) return console.error(err);
346503
console.log(`RPC listening on ${address}`);
347-
});
504+
});

0 commit comments

Comments
 (0)