From 8788ed73b2e1f402ed7db49552642b74e15fd913 Mon Sep 17 00:00:00 2001 From: magnus Date: Thu, 20 Mar 2025 20:46:45 +0100 Subject: [PATCH 1/8] add routehandler and api route --- examples/app-router/app/streaming/route.ts | 42 +++++++++++++++ .../src/pages/api/streaming/index.ts | 51 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 examples/app-router/app/streaming/route.ts create mode 100644 examples/pages-router/src/pages/api/streaming/index.ts diff --git a/examples/app-router/app/streaming/route.ts b/examples/app-router/app/streaming/route.ts new file mode 100644 index 000000000..be75bdfd3 --- /dev/null +++ b/examples/app-router/app/streaming/route.ts @@ -0,0 +1,42 @@ +// https://developer.mozilla.org/docs/Web/API/ReadableStream#convert_async_iterator_to_stream +function iteratorToStream(iterator: any) { + return new ReadableStream({ + async pull(controller) { + const { value, done } = await iterator.next(); + + if (done) { + controller.close(); + } else { + controller.enqueue(value); + } + }, + }); +} + +function sleep(time: number) { + return new Promise((resolve) => { + setTimeout(resolve, time); + }); +} + +const encoder = new TextEncoder(); + +async function* makeIterator() { + for (let i = 1; i <= 10; i++) { + yield encoder.encode(`

${i}

`); + await sleep(1000); + } +} + +export async function GET() { + const iterator = makeIterator(); + const stream = iteratorToStream(iterator); + + return new Response(stream, { + headers: { + "Content-Type": "text/html; charset=utf-8", + Connection: "keep-alive", + "Cache-Control": "no-cache, no-transform", + }, + }); +} diff --git a/examples/pages-router/src/pages/api/streaming/index.ts b/examples/pages-router/src/pages/api/streaming/index.ts new file mode 100644 index 000000000..fb9d24df1 --- /dev/null +++ b/examples/pages-router/src/pages/api/streaming/index.ts @@ -0,0 +1,51 @@ +import { Readable } from "node:stream"; +import type { NextApiRequest, NextApiResponse } from "next"; + +function iteratorToStream(iterator: AsyncIterator) { + return new ReadableStream({ + async pull(controller) { + const { value, done } = await iterator.next(); + + if (done) { + controller.close(); + } else { + controller.enqueue(value); + } + }, + }); +} + +function sleep(time: number) { + return new Promise((resolve) => { + setTimeout(resolve, time); + }); +} + +const encoder = new TextEncoder(); + +async function* makeIterator() { + for (let i = 1; i <= 10; i++) { + yield encoder.encode(`

${i}

`); + await sleep(1000); + } +} + +export default async function handler( + req: NextApiRequest, + res: NextApiResponse, +) { + if (req.method !== "GET") { + return res.status(405).json({ message: "Method not allowed" }); + } + + res.setHeader("Content-Type", "text/html; charset=utf-8"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + + // create and pipe the stream + const iterator = makeIterator(); + const stream = iteratorToStream(iterator); + + // @ts-ignore - not sure how to make typescript happy here + return Readable.fromWeb(stream).pipe(res); +} From 02e6d0426405cabdf3d14b5a87c1d808719d84d8 Mon Sep 17 00:00:00 2001 From: magnus Date: Thu, 20 Mar 2025 20:47:00 +0100 Subject: [PATCH 2/8] add streaming to pages-router --- examples/pages-router/open-next.config.ts | 6 +++++- examples/sst/stacks/PagesRouter.ts | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/pages-router/open-next.config.ts b/examples/pages-router/open-next.config.ts index 54e09d87d..f5f7c73b9 100644 --- a/examples/pages-router/open-next.config.ts +++ b/examples/pages-router/open-next.config.ts @@ -1,5 +1,9 @@ const config = { - default: {}, + default: { + override: { + wrapper: "aws-lambda-streaming", + }, + }, functions: {}, buildCommand: "npx turbo build", }; diff --git a/examples/sst/stacks/PagesRouter.ts b/examples/sst/stacks/PagesRouter.ts index 75ddd227b..00372d51e 100644 --- a/examples/sst/stacks/PagesRouter.ts +++ b/examples/sst/stacks/PagesRouter.ts @@ -3,6 +3,9 @@ import { OpenNextCdkReferenceImplementation } from "./OpenNextReferenceImplement export function PagesRouter({ stack }) { const site = new OpenNextCdkReferenceImplementation(stack, "pagesrouter", { path: "../pages-router", + environment: { + OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE: "true", + }, }); // const site = new NextjsSite(stack, "pagesrouter", { // path: "../pages-router", From a078d078f9b4f981c9fffde8ae4c20e189b29bb7 Mon Sep 17 00:00:00 2001 From: magnus Date: Thu, 20 Mar 2025 20:47:12 +0100 Subject: [PATCH 3/8] add e2e for both --- examples/app-router/app/streaming/route.ts | 5 +- .../src/pages/api/streaming/index.ts | 5 +- .../tests/appRouter/streaming.test.ts | 55 +++++++++++++++++++ .../tests/pagesRouter/streaming.test.ts | 55 +++++++++++++++++++ 4 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 packages/tests-e2e/tests/appRouter/streaming.test.ts create mode 100644 packages/tests-e2e/tests/pagesRouter/streaming.test.ts diff --git a/examples/app-router/app/streaming/route.ts b/examples/app-router/app/streaming/route.ts index be75bdfd3..87e2cae42 100644 --- a/examples/app-router/app/streaming/route.ts +++ b/examples/app-router/app/streaming/route.ts @@ -23,7 +23,10 @@ const encoder = new TextEncoder(); async function* makeIterator() { for (let i = 1; i <= 10; i++) { - yield encoder.encode(`

${i}

`); + const timestamp = Date.now(); + yield encoder.encode( + `

${i}

`, + ); await sleep(1000); } } diff --git a/examples/pages-router/src/pages/api/streaming/index.ts b/examples/pages-router/src/pages/api/streaming/index.ts index fb9d24df1..e69b35e85 100644 --- a/examples/pages-router/src/pages/api/streaming/index.ts +++ b/examples/pages-router/src/pages/api/streaming/index.ts @@ -25,7 +25,10 @@ const encoder = new TextEncoder(); async function* makeIterator() { for (let i = 1; i <= 10; i++) { - yield encoder.encode(`

${i}

`); + const timestamp = Date.now(); + yield encoder.encode( + `

${i}

`, + ); await sleep(1000); } } diff --git a/packages/tests-e2e/tests/appRouter/streaming.test.ts b/packages/tests-e2e/tests/appRouter/streaming.test.ts new file mode 100644 index 000000000..b9069926f --- /dev/null +++ b/packages/tests-e2e/tests/appRouter/streaming.test.ts @@ -0,0 +1,55 @@ +import { expect, test } from "@playwright/test"; + +test("streaming should work in route handler", async ({ page }) => { + const ITERATOR_LENGTH = 10; + + const res = await page.goto("/streaming", { + // we set waitUntil: "commit" to ensure that the response is streamed + // without this option, the response would be buffered and sent all at once + // we could also drop the `await` aswell, but then we can't see the headers first. + waitUntil: "commit", + }); + + expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8"); + expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform"); + // AWS API Gateway remaps the connection header to `x-amzn-remapped-connection` + expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive"); + + // wait for first number to be present + await page.getByTestId("iteratorCount").first().waitFor(); + + const seenNumbers: Array<{ number: string; time: number }> = []; + const startTime = Date.now(); + + const initialParagraphs = await page.getByTestId("iteratorCount").count(); + // fail if all paragraphs appear at once + // this is a safeguard to ensure that the response is streamed and not buffered all at once + expect(initialParagraphs).toBe(1); + + while ( + seenNumbers.length < ITERATOR_LENGTH && + Date.now() - startTime < 11000 + ) { + const elements = await page.getByTestId("iteratorCount").all(); + if (elements.length > seenNumbers.length) { + expect(elements.length).toBe(seenNumbers.length + 1); + const newElement = elements[elements.length - 1]; + const timestamp = await newElement.getAttribute("data-timestamp"); + seenNumbers.push({ + number: await newElement.innerText(), + time: Number.parseInt(timestamp || "0", 10), + }); + } + await page.waitForTimeout(100); + } + + expect(seenNumbers.map((n) => n.number)).toEqual( + [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), + ); + + // verify streaming timing using server timestamps + for (let i = 1; i < seenNumbers.length; i++) { + const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; + expect(timeDiff).toBeGreaterThanOrEqual(900); + } +}); diff --git a/packages/tests-e2e/tests/pagesRouter/streaming.test.ts b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts new file mode 100644 index 000000000..5851fafaf --- /dev/null +++ b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts @@ -0,0 +1,55 @@ +import { expect, test } from "@playwright/test"; + +test("streaming should work in api route", async ({ page }) => { + const ITERATOR_LENGTH = 10; + + const res = await page.goto("/api/streaming", { + // we set waitUntil: "commit" to ensure that the response is streamed + // without this option, the response would be buffered and sent all at once + // we could also drop the `await` aswell, but then we can't see the headers first. + waitUntil: "commit", + }); + + expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8"); + expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform"); + // AWS API Gateway remaps the connection header to `x-amzn-remapped-connection` + expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive"); + + // wait for first number to be present + await page.getByTestId("iteratorCount").first().waitFor(); + + const seenNumbers: Array<{ number: string; time: number }> = []; + const startTime = Date.now(); + + const initialParagraphs = await page.getByTestId("iteratorCount").count(); + // fail if all paragraphs appear at once + // this is a safeguard to ensure that the response is streamed and not buffered all at once + expect(initialParagraphs).toBe(1); + + while ( + seenNumbers.length < ITERATOR_LENGTH && + Date.now() - startTime < 11000 + ) { + const elements = await page.getByTestId("iteratorCount").all(); + if (elements.length > seenNumbers.length) { + expect(elements.length).toBe(seenNumbers.length + 1); + const newElement = elements[elements.length - 1]; + const timestamp = await newElement.getAttribute("data-timestamp"); + seenNumbers.push({ + number: await newElement.innerText(), + time: Number.parseInt(timestamp || "0", 10), + }); + } + await page.waitForTimeout(100); + } + + expect(seenNumbers.map((n) => n.number)).toEqual( + [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), + ); + + // verify streaming timing using server timestamps + for (let i = 1; i < seenNumbers.length; i++) { + const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; + expect(timeDiff).toBeGreaterThanOrEqual(800); + } +}); From cfe323e443903a9fd1020434e8f23b4f048271ad Mon Sep 17 00:00:00 2001 From: magnus Date: Fri, 21 Mar 2025 09:04:20 +0100 Subject: [PATCH 4/8] fix e2e --- examples/app-router/app/streaming/route.ts | 5 +---- examples/pages-router/src/pages/api/streaming/index.ts | 5 +---- packages/tests-e2e/tests/appRouter/streaming.test.ts | 7 +++---- packages/tests-e2e/tests/pagesRouter/streaming.test.ts | 7 +++---- 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/examples/app-router/app/streaming/route.ts b/examples/app-router/app/streaming/route.ts index 87e2cae42..be75bdfd3 100644 --- a/examples/app-router/app/streaming/route.ts +++ b/examples/app-router/app/streaming/route.ts @@ -23,10 +23,7 @@ const encoder = new TextEncoder(); async function* makeIterator() { for (let i = 1; i <= 10; i++) { - const timestamp = Date.now(); - yield encoder.encode( - `

${i}

`, - ); + yield encoder.encode(`

${i}

`); await sleep(1000); } } diff --git a/examples/pages-router/src/pages/api/streaming/index.ts b/examples/pages-router/src/pages/api/streaming/index.ts index e69b35e85..fb9d24df1 100644 --- a/examples/pages-router/src/pages/api/streaming/index.ts +++ b/examples/pages-router/src/pages/api/streaming/index.ts @@ -25,10 +25,7 @@ const encoder = new TextEncoder(); async function* makeIterator() { for (let i = 1; i <= 10; i++) { - const timestamp = Date.now(); - yield encoder.encode( - `

${i}

`, - ); + yield encoder.encode(`

${i}

`); await sleep(1000); } } diff --git a/packages/tests-e2e/tests/appRouter/streaming.test.ts b/packages/tests-e2e/tests/appRouter/streaming.test.ts index b9069926f..8aafabf51 100644 --- a/packages/tests-e2e/tests/appRouter/streaming.test.ts +++ b/packages/tests-e2e/tests/appRouter/streaming.test.ts @@ -34,10 +34,9 @@ test("streaming should work in route handler", async ({ page }) => { if (elements.length > seenNumbers.length) { expect(elements.length).toBe(seenNumbers.length + 1); const newElement = elements[elements.length - 1]; - const timestamp = await newElement.getAttribute("data-timestamp"); seenNumbers.push({ number: await newElement.innerText(), - time: Number.parseInt(timestamp || "0", 10), + time: Date.now() - startTime, }); } await page.waitForTimeout(100); @@ -47,9 +46,9 @@ test("streaming should work in route handler", async ({ page }) => { [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), ); - // verify streaming timing using server timestamps + // verify streaming timing for (let i = 1; i < seenNumbers.length; i++) { const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; - expect(timeDiff).toBeGreaterThanOrEqual(900); + expect(timeDiff).toBeGreaterThanOrEqual(100); } }); diff --git a/packages/tests-e2e/tests/pagesRouter/streaming.test.ts b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts index 5851fafaf..e62150616 100644 --- a/packages/tests-e2e/tests/pagesRouter/streaming.test.ts +++ b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts @@ -34,10 +34,9 @@ test("streaming should work in api route", async ({ page }) => { if (elements.length > seenNumbers.length) { expect(elements.length).toBe(seenNumbers.length + 1); const newElement = elements[elements.length - 1]; - const timestamp = await newElement.getAttribute("data-timestamp"); seenNumbers.push({ number: await newElement.innerText(), - time: Number.parseInt(timestamp || "0", 10), + time: Date.now() - startTime, }); } await page.waitForTimeout(100); @@ -47,9 +46,9 @@ test("streaming should work in api route", async ({ page }) => { [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), ); - // verify streaming timing using server timestamps + // verify streaming timing for (let i = 1; i < seenNumbers.length; i++) { const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; - expect(timeDiff).toBeGreaterThanOrEqual(800); + expect(timeDiff).toBeGreaterThanOrEqual(100); } }); From 6e659b35a4b65f3039088b0ea80278f0606b638c Mon Sep 17 00:00:00 2001 From: magnus Date: Fri, 21 Mar 2025 14:11:58 +0100 Subject: [PATCH 5/8] make typescript happy --- examples/pages-router/src/pages/api/streaming/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/pages-router/src/pages/api/streaming/index.ts b/examples/pages-router/src/pages/api/streaming/index.ts index fb9d24df1..0184d958a 100644 --- a/examples/pages-router/src/pages/api/streaming/index.ts +++ b/examples/pages-router/src/pages/api/streaming/index.ts @@ -1,4 +1,5 @@ import { Readable } from "node:stream"; +import { ReadableStream } from "node:stream/web"; import type { NextApiRequest, NextApiResponse } from "next"; function iteratorToStream(iterator: AsyncIterator) { @@ -46,6 +47,6 @@ export default async function handler( const iterator = makeIterator(); const stream = iteratorToStream(iterator); - // @ts-ignore - not sure how to make typescript happy here + // we need to import ReadableStream from `node:stream/web` to make TypeScript happy return Readable.fromWeb(stream).pipe(res); } From ada149c8b9d87c80cfa68019e6068064d60151ac Mon Sep 17 00:00:00 2001 From: magnus Date: Mon, 24 Mar 2025 10:07:25 +0100 Subject: [PATCH 6/8] rm from app router --- examples/app-router/app/streaming/route.ts | 42 --------------- .../tests/appRouter/streaming.test.ts | 54 ------------------- 2 files changed, 96 deletions(-) delete mode 100644 examples/app-router/app/streaming/route.ts delete mode 100644 packages/tests-e2e/tests/appRouter/streaming.test.ts diff --git a/examples/app-router/app/streaming/route.ts b/examples/app-router/app/streaming/route.ts deleted file mode 100644 index be75bdfd3..000000000 --- a/examples/app-router/app/streaming/route.ts +++ /dev/null @@ -1,42 +0,0 @@ -// https://developer.mozilla.org/docs/Web/API/ReadableStream#convert_async_iterator_to_stream -function iteratorToStream(iterator: any) { - return new ReadableStream({ - async pull(controller) { - const { value, done } = await iterator.next(); - - if (done) { - controller.close(); - } else { - controller.enqueue(value); - } - }, - }); -} - -function sleep(time: number) { - return new Promise((resolve) => { - setTimeout(resolve, time); - }); -} - -const encoder = new TextEncoder(); - -async function* makeIterator() { - for (let i = 1; i <= 10; i++) { - yield encoder.encode(`

${i}

`); - await sleep(1000); - } -} - -export async function GET() { - const iterator = makeIterator(); - const stream = iteratorToStream(iterator); - - return new Response(stream, { - headers: { - "Content-Type": "text/html; charset=utf-8", - Connection: "keep-alive", - "Cache-Control": "no-cache, no-transform", - }, - }); -} diff --git a/packages/tests-e2e/tests/appRouter/streaming.test.ts b/packages/tests-e2e/tests/appRouter/streaming.test.ts deleted file mode 100644 index 8aafabf51..000000000 --- a/packages/tests-e2e/tests/appRouter/streaming.test.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { expect, test } from "@playwright/test"; - -test("streaming should work in route handler", async ({ page }) => { - const ITERATOR_LENGTH = 10; - - const res = await page.goto("/streaming", { - // we set waitUntil: "commit" to ensure that the response is streamed - // without this option, the response would be buffered and sent all at once - // we could also drop the `await` aswell, but then we can't see the headers first. - waitUntil: "commit", - }); - - expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8"); - expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform"); - // AWS API Gateway remaps the connection header to `x-amzn-remapped-connection` - expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive"); - - // wait for first number to be present - await page.getByTestId("iteratorCount").first().waitFor(); - - const seenNumbers: Array<{ number: string; time: number }> = []; - const startTime = Date.now(); - - const initialParagraphs = await page.getByTestId("iteratorCount").count(); - // fail if all paragraphs appear at once - // this is a safeguard to ensure that the response is streamed and not buffered all at once - expect(initialParagraphs).toBe(1); - - while ( - seenNumbers.length < ITERATOR_LENGTH && - Date.now() - startTime < 11000 - ) { - const elements = await page.getByTestId("iteratorCount").all(); - if (elements.length > seenNumbers.length) { - expect(elements.length).toBe(seenNumbers.length + 1); - const newElement = elements[elements.length - 1]; - seenNumbers.push({ - number: await newElement.innerText(), - time: Date.now() - startTime, - }); - } - await page.waitForTimeout(100); - } - - expect(seenNumbers.map((n) => n.number)).toEqual( - [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), - ); - - // verify streaming timing - for (let i = 1; i < seenNumbers.length; i++) { - const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; - expect(timeDiff).toBeGreaterThanOrEqual(100); - } -}); From 9d1fad896e667778cb94fc319479b15cd025e25e Mon Sep 17 00:00:00 2001 From: magnus Date: Mon, 24 Mar 2025 14:26:22 +0100 Subject: [PATCH 7/8] add comment --- examples/sst/stacks/PagesRouter.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/sst/stacks/PagesRouter.ts b/examples/sst/stacks/PagesRouter.ts index 00372d51e..8b699f9dc 100644 --- a/examples/sst/stacks/PagesRouter.ts +++ b/examples/sst/stacks/PagesRouter.ts @@ -3,6 +3,11 @@ import { OpenNextCdkReferenceImplementation } from "./OpenNextReferenceImplement export function PagesRouter({ stack }) { const site = new OpenNextCdkReferenceImplementation(stack, "pagesrouter", { path: "../pages-router", + /* + * We need to set this environment variable to not break other E2E tests that have an empty body. (i.e: /redirect) + * https://opennext.js.org/aws/common_issues#empty-body-in-response-when-streaming-in-aws-lambda + * + */ environment: { OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE: "true", }, From 68a0c5e1296be989f548976883ce937d9474d6de Mon Sep 17 00:00:00 2001 From: magnus Date: Mon, 24 Mar 2025 16:44:48 +0100 Subject: [PATCH 8/8] review fix --- .../src/pages/api/streaming/index.ts | 61 +++++++-------- examples/pages-router/src/pages/sse/index.tsx | 74 +++++++++++++++++++ .../tests/pagesRouter/streaming.test.ts | 74 +++++++++---------- 3 files changed, 137 insertions(+), 72 deletions(-) create mode 100644 examples/pages-router/src/pages/sse/index.tsx diff --git a/examples/pages-router/src/pages/api/streaming/index.ts b/examples/pages-router/src/pages/api/streaming/index.ts index 0184d958a..6a61fa590 100644 --- a/examples/pages-router/src/pages/api/streaming/index.ts +++ b/examples/pages-router/src/pages/api/streaming/index.ts @@ -1,36 +1,22 @@ -import { Readable } from "node:stream"; -import { ReadableStream } from "node:stream/web"; import type { NextApiRequest, NextApiResponse } from "next"; -function iteratorToStream(iterator: AsyncIterator) { - return new ReadableStream({ - async pull(controller) { - const { value, done } = await iterator.next(); - - if (done) { - controller.close(); - } else { - controller.enqueue(value); - } - }, - }); -} - -function sleep(time: number) { +const SADE_SMOOTH_OPERATOR_LYRIC = `Diamond life, lover boy +He move in space with minimum waste and maximum joy +City lights and business nights +When you require streetcar desire for higher heights +No place for beginners or sensitive hearts +When sentiment is left to chance +No place to be ending but somewhere to start +No need to ask, he's a smooth operator +Smooth operator, smooth operator +Smooth operator`; + +function sleep(ms: number) { return new Promise((resolve) => { - setTimeout(resolve, time); + setTimeout(resolve, ms); }); } -const encoder = new TextEncoder(); - -async function* makeIterator() { - for (let i = 1; i <= 10; i++) { - yield encoder.encode(`

${i}

`); - await sleep(1000); - } -} - export default async function handler( req: NextApiRequest, res: NextApiResponse, @@ -39,14 +25,23 @@ export default async function handler( return res.status(405).json({ message: "Method not allowed" }); } - res.setHeader("Content-Type", "text/html; charset=utf-8"); + res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Connection", "keep-alive"); res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Transfer-Encoding", "chunked"); + + res.write( + `data: ${JSON.stringify({ type: "start", model: "ai-lyric-model" })}\n\n`, + ); + await sleep(1000); + + const lines = SADE_SMOOTH_OPERATOR_LYRIC.split("\n"); + for (const line of lines) { + res.write(`data: ${JSON.stringify({ type: "content", body: line })}\n\n`); + await sleep(1000); + } - // create and pipe the stream - const iterator = makeIterator(); - const stream = iteratorToStream(iterator); + res.write(`data: ${JSON.stringify({ type: "complete" })}\n\n`); - // we need to import ReadableStream from `node:stream/web` to make TypeScript happy - return Readable.fromWeb(stream).pipe(res); + res.end(); } diff --git a/examples/pages-router/src/pages/sse/index.tsx b/examples/pages-router/src/pages/sse/index.tsx new file mode 100644 index 000000000..dbc5f8ee9 --- /dev/null +++ b/examples/pages-router/src/pages/sse/index.tsx @@ -0,0 +1,74 @@ +"use client"; + +import { useEffect, useState } from "react"; + +type Event = { + type: "start" | "content" | "complete"; + model?: string; + body?: string; +}; + +export default function SSE() { + const [events, setEvents] = useState([]); + const [finished, setFinished] = useState(false); + + useEffect(() => { + const e = new EventSource("/api/streaming"); + + e.onmessage = (msg) => { + console.log(msg); + try { + const data = JSON.parse(msg.data) as Event; + if (data.type === "complete") { + e.close(); + setFinished(true); + } + if (data.type === "content") { + setEvents((prev) => prev.concat(data)); + } + } catch (err) { + console.error(err, msg); + } + }; + }, []); + + return ( +
+

+ Sade - Smooth Operator +

+
+ {events.map((e, i) => ( +

+ {e.body} +

+ ))} +
+ {finished && ( + + )} +
+ ); +} diff --git a/packages/tests-e2e/tests/pagesRouter/streaming.test.ts b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts index e62150616..ebfe1a57d 100644 --- a/packages/tests-e2e/tests/pagesRouter/streaming.test.ts +++ b/packages/tests-e2e/tests/pagesRouter/streaming.test.ts @@ -1,54 +1,50 @@ import { expect, test } from "@playwright/test"; -test("streaming should work in api route", async ({ page }) => { - const ITERATOR_LENGTH = 10; - - const res = await page.goto("/api/streaming", { - // we set waitUntil: "commit" to ensure that the response is streamed - // without this option, the response would be buffered and sent all at once - // we could also drop the `await` aswell, but then we can't see the headers first. - waitUntil: "commit", - }); +const SADE_SMOOTH_OPERATOR_LYRIC = `Diamond life, lover boy +He move in space with minimum waste and maximum joy +City lights and business nights +When you require streetcar desire for higher heights +No place for beginners or sensitive hearts +When sentiment is left to chance +No place to be ending but somewhere to start +No need to ask, he's a smooth operator +Smooth operator, smooth operator +Smooth operator`; - expect(res?.headers()["content-type"]).toBe("text/html; charset=utf-8"); - expect(res?.headers()["cache-control"]).toBe("no-cache, no-transform"); - // AWS API Gateway remaps the connection header to `x-amzn-remapped-connection` - expect(res?.headers()["x-amzn-remapped-connection"]).toBe("keep-alive"); +test("streaming should work in api route", async ({ page }) => { + await page.goto("/sse"); - // wait for first number to be present - await page.getByTestId("iteratorCount").first().waitFor(); + // wait for first line to be present + await page.getByTestId("line").first().waitFor(); + const initialLines = await page.getByTestId("line").count(); + // fail if all lines appear at once + // this is a safeguard to ensure that the response is streamed and not buffered all at once + expect(initialLines).toBe(1); - const seenNumbers: Array<{ number: string; time: number }> = []; + const seenLines: Array<{ line: string; time: number }> = []; const startTime = Date.now(); - const initialParagraphs = await page.getByTestId("iteratorCount").count(); - // fail if all paragraphs appear at once - // this is a safeguard to ensure that the response is streamed and not buffered all at once - expect(initialParagraphs).toBe(1); - - while ( - seenNumbers.length < ITERATOR_LENGTH && - Date.now() - startTime < 11000 - ) { - const elements = await page.getByTestId("iteratorCount").all(); - if (elements.length > seenNumbers.length) { - expect(elements.length).toBe(seenNumbers.length + 1); - const newElement = elements[elements.length - 1]; - seenNumbers.push({ - number: await newElement.innerText(), + // we loop until we see all lines + while (seenLines.length < SADE_SMOOTH_OPERATOR_LYRIC.split("\n").length) { + const lines = await page.getByTestId("line").all(); + if (lines.length > seenLines.length) { + expect(lines.length).toBe(seenLines.length + 1); + const newLine = lines[lines.length - 1]; + seenLines.push({ + line: await newLine.innerText(), time: Date.now() - startTime, }); } - await page.waitForTimeout(100); + // wait for a bit before checking again + await page.waitForTimeout(200); } - expect(seenNumbers.map((n) => n.number)).toEqual( - [...Array(ITERATOR_LENGTH)].map((_, i) => String(i + 1)), + expect(seenLines.map((n) => n.line)).toEqual( + SADE_SMOOTH_OPERATOR_LYRIC.split("\n"), ); - - // verify streaming timing - for (let i = 1; i < seenNumbers.length; i++) { - const timeDiff = seenNumbers[i].time - seenNumbers[i - 1].time; - expect(timeDiff).toBeGreaterThanOrEqual(100); + for (let i = 1; i < seenLines.length; i++) { + expect(seenLines[i].time - seenLines[i - 1].time).toBeGreaterThan(500); } + + await expect(page.getByTestId("video")).toBeVisible(); });