Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/app-router/app/streaming/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// https://developer.mozilla.org/docs/Web/API/ReadableStream#convert_async_iterator_to_stream
function iteratorToStream(iterator: any) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next();

if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}

function sleep(time: number) {
return new Promise((resolve) => {
setTimeout(resolve, time);
});
}

const encoder = new TextEncoder();

async function* makeIterator() {
for (let i = 1; i <= 10; i++) {
yield encoder.encode(`<p data-testid="iteratorCount">${i}</p>`);
await sleep(1000);
}
}

export async function GET() {
const iterator = makeIterator();
const stream = iteratorToStream(iterator);

return new Response(stream, {
headers: {
"Content-Type": "text/html; charset=utf-8",
Connection: "keep-alive",
"Cache-Control": "no-cache, no-transform",
},
});
}
6 changes: 5 additions & 1 deletion examples/pages-router/open-next.config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
const config = {
default: {},
default: {
override: {
wrapper: "aws-lambda-streaming",
},
},
functions: {},
buildCommand: "npx turbo build",
};
Expand Down
52 changes: 52 additions & 0 deletions examples/pages-router/src/pages/api/streaming/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Readable } from "node:stream";
import { ReadableStream } from "node:stream/web";
import type { NextApiRequest, NextApiResponse } from "next";

function iteratorToStream(iterator: AsyncIterator<Uint8Array>) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next();

if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}

function sleep(time: number) {
return new Promise((resolve) => {
setTimeout(resolve, time);
});
}

const encoder = new TextEncoder();

async function* makeIterator() {
for (let i = 1; i <= 10; i++) {
yield encoder.encode(`<p data-testid="iteratorCount">${i}</p>`);
await sleep(1000);
}
}

export default async function handler(
req: NextApiRequest,
res: NextApiResponse,
) {
if (req.method !== "GET") {
return res.status(405).json({ message: "Method not allowed" });
}

res.setHeader("Content-Type", "text/html; charset=utf-8");
res.setHeader("Connection", "keep-alive");
res.setHeader("Cache-Control", "no-cache, no-transform");

// create and pipe the stream
const iterator = makeIterator();
const stream = iteratorToStream(iterator);

// we need to import ReadableStream from `node:stream/web` to make TypeScript happy
return Readable.fromWeb(stream).pipe(res);
}
3 changes: 3 additions & 0 deletions examples/sst/stacks/PagesRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { OpenNextCdkReferenceImplementation } from "./OpenNextReferenceImplement
export function PagesRouter({ stack }) {
const site = new OpenNextCdkReferenceImplementation(stack, "pagesrouter", {
path: "../pages-router",
environment: {
OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE: "true",
},
});
// const site = new NextjsSite(stack, "pagesrouter", {
// path: "../pages-router",
Expand Down
54 changes: 54 additions & 0 deletions packages/tests-e2e/tests/appRouter/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { expect, test } from "@playwright/test";

test("streaming should work in route handler", async ({ page }) => {
const ITERATOR_LENGTH = 10;

const res = await page.goto("/streaming", {
// we set waitUntil: "commit" to ensure that the response is streamed
// without this option, the response would be buffered and sent all at once
// we could also drop the `await` aswell, but then we can't see the headers first.
waitUntil: "commit",
});

expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8");
expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform");
// AWS API Gateway remaps the connection header to `x-amzn-remapped-connection`
expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive");

// wait for first number to be present
await page.getByTestId("iteratorCount").first().waitFor();

const seenNumbers: Array<{ number: string; time: number }> = [];
const startTime = Date.now();

const initialParagraphs = await page.getByTestId("iteratorCount").count();
// fail if all paragraphs appear at once
// this is a safeguard to ensure that the response is streamed and not buffered all at once
expect(initialParagraphs).toBe(1);

while (
seenNumbers.length < ITERATOR_LENGTH &&
Date.now() - startTime < 11000
) {
const elements = await page.getByTestId("iteratorCount").all();
if (elements.length > seenNumbers.length) {
expect(elements.length).toBe(seenNumbers.length + 1);
const newElement = elements[elements.length - 1];
seenNumbers.push({
number: await newElement.innerText(),
time: Date.now() - startTime,
});
}
await page.waitForTimeout(100);
}

expect(seenNumbers.map((n) => n.number)).toEqual(
[...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)),
);

// verify streaming timing
for (let i = 1; i < seenNumbers.length; i++) {
const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time;
expect(timeDiff).toBeGreaterThanOrEqual(100);
}
});
54 changes: 54 additions & 0 deletions packages/tests-e2e/tests/pagesRouter/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { expect, test } from "@playwright/test";

test("streaming should work in api route", async ({ page }) => {
const ITERATOR_LENGTH = 10;

const res = await page.goto("/api/streaming", {
// we set waitUntil: "commit" to ensure that the response is streamed
// without this option, the response would be buffered and sent all at once
// we could also drop the `await` aswell, but then we can't see the headers first.
waitUntil: "commit",
});

expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8");
expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform");
// AWS API Gateway remaps the connection header to `x-amzn-remapped-connection`
expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive");

// wait for first number to be present
await page.getByTestId("iteratorCount").first().waitFor();

const seenNumbers: Array<{ number: string; time: number }> = [];
const startTime = Date.now();

const initialParagraphs = await page.getByTestId("iteratorCount").count();
// fail if all paragraphs appear at once
// this is a safeguard to ensure that the response is streamed and not buffered all at once
expect(initialParagraphs).toBe(1);

while (
seenNumbers.length < ITERATOR_LENGTH &&
Date.now() - startTime < 11000
) {
const elements = await page.getByTestId("iteratorCount").all();
if (elements.length > seenNumbers.length) {
expect(elements.length).toBe(seenNumbers.length + 1);
const newElement = elements[elements.length - 1];
seenNumbers.push({
number: await newElement.innerText(),
time: Date.now() - startTime,
});
}
await page.waitForTimeout(100);
}

expect(seenNumbers.map((n) => n.number)).toEqual(
[...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)),
);

// verify streaming timing
for (let i = 1; i < seenNumbers.length; i++) {
const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time;
expect(timeDiff).toBeGreaterThanOrEqual(100);
}
});
Loading