From fcca8390f04908d7879686ef31a90ecf8181ef22 Mon Sep 17 00:00:00 2001 From: Jun Lee Date: Mon, 7 Jul 2025 13:32:17 +0100 Subject: [PATCH 1/4] Adding that ReadableStream cancellation propagates from Worker to DO --- .../docs/workers/runtime-apis/streams/readablestream.mdx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx index be5ca91bd196291..e958fa26e50795c 100644 --- a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx +++ b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx @@ -47,7 +47,9 @@ let reader = readable.getReader({ mode: 'byob' }); * When `true`, errors in the source `ReadableStream` will no longer abort the destination `WritableStream`. `pipeTo` will return a rejected promise with the error from the source or any error that occurred while aborting the destination. +## ReadableStream with Durable Objects +In a setup where a Durable Object returns a readable stream to a Worker, if the Worker cancels the Durable Object's readable stream, the cancellation propagates to the Durable Object. *** From 6e9fc496f5ed3231cb48ce78734a80c9f412c418 Mon Sep 17 00:00:00 2001 From: Jun Lee Date: Mon, 7 Jul 2025 15:30:18 +0100 Subject: [PATCH 2/4] Adding DO example --- .../examples/readable-stream.mdx | 99 +++++++++++++++++++ .../runtime-apis/streams/readablestream.mdx | 4 - 2 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 src/content/docs/durable-objects/examples/readable-stream.mdx diff --git a/src/content/docs/durable-objects/examples/readable-stream.mdx b/src/content/docs/durable-objects/examples/readable-stream.mdx new file mode 100644 index 000000000000000..b26eeb8afd36e82 --- /dev/null +++ b/src/content/docs/durable-objects/examples/readable-stream.mdx @@ -0,0 +1,99 @@ +--- +type: example +summary: Stream ReadableStream from Durable Objects. +pcx_content_type: example +title: Use ReadableStream with Durable Object and Workers +sidebar: + order: 3 +description: Stream ReadableStream from Durable Objects. +--- + +import { GlossaryTooltip, WranglerConfig } from "~/components"; + +This example demonstrates: + +- A Worker receives a request, and forwards it to a Durable Object `my-id`. +- The Durable Object streams an incrementing number every second, until it receives `AbortSignal`. +- The Worker reads and logs the values from the stream. +- The Worker then cancels the stream after 5 values. + +```ts +import { DurableObject } from 'cloudflare:workers'; + +async function* dataSource(signal: AbortSignal) { + let counter = 0; + while (!signal.aborted) { + yield counter++; + await new Promise((resolve) => setTimeout(resolve, 1_000)); + } + + console.log('Data source cancelled'); +} + +export class MyDurableObject extends DurableObject { + async fetch(request: Request): Promise { + const abortController = new AbortController(); + + const stream = new ReadableStream({ + start(controller) { + if (request.signal.aborted) { + controller.close(); + return; + } + + (async () => { + for await (const value of dataSource(AbortSignal.any([request.signal, abortController.signal]))) { + controller.enqueue(new TextEncoder().encode(String(value))); + } + })(); + }, + cancel() { + console.log('Stream cancelled'); + abortController.abort(); + }, + }); + + const headers = new Headers({ + 'Content-Type': 'application/octet-stream', + }); + + return new Response(stream, { headers }); + } +} + +export default { + async fetch(request, env, ctx): Promise { + const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id'); + const stub = env.MY_DURABLE_OBJECT.get(id); + const response = await stub.fetch(request); + if (!response.ok || !response.body) { + return new Response('Invalid response', { status: 500 }); + } + + const reader = response.body.getReader(); + + // Cancel the stream after 5 messages + let i = 0; + while (true) { + if (i > 5) { + reader.cancel(); + break; + } + const { value, done } = await reader.read(); + console.log(`Got value ${new TextDecoder().decode(value)}`); + if (done) { + break; + } + i++; + } + + return new Response('Done', { status: 200 }); + }, +} satisfies ExportedHandler; +``` + +:::note + +In a setup where a Durable Object returns a readable stream to a Worker, if the Worker cancels the Durable Object's readable stream, the cancellation propagates to the Durable Object. + +::: \ No newline at end of file diff --git a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx index e958fa26e50795c..e3082ff1255db99 100644 --- a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx +++ b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx @@ -47,10 +47,6 @@ let reader = readable.getReader({ mode: 'byob' }); * When `true`, errors in the source `ReadableStream` will no longer abort the destination `WritableStream`. `pipeTo` will return a rejected promise with the error from the source or any error that occurred while aborting the destination. -## ReadableStream with Durable Objects - -In a setup where a Durable Object returns a readable stream to a Worker, if the Worker cancels the Durable Object's readable stream, the cancellation propagates to the Durable Object. - *** ## Related resources From 95d971f93aa3b203797226fd61c75ac102d47d4e Mon Sep 17 00:00:00 2001 From: Jun Lee Date: Fri, 11 Jul 2025 08:22:10 +0100 Subject: [PATCH 3/4] Updating stream section of the code --- .../durable-objects/examples/readable-stream.mdx | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/content/docs/durable-objects/examples/readable-stream.mdx b/src/content/docs/durable-objects/examples/readable-stream.mdx index b26eeb8afd36e82..9fc7c45cb7bba79 100644 --- a/src/content/docs/durable-objects/examples/readable-stream.mdx +++ b/src/content/docs/durable-objects/examples/readable-stream.mdx @@ -8,7 +8,7 @@ sidebar: description: Stream ReadableStream from Durable Objects. --- -import { GlossaryTooltip, WranglerConfig } from "~/components"; +import { GlossaryTooltip, WranglerConfig, TypeScriptExample } from "~/components"; This example demonstrates: @@ -17,9 +17,11 @@ This example demonstrates: - The Worker reads and logs the values from the stream. - The Worker then cancels the stream after 5 values. + ```ts import { DurableObject } from 'cloudflare:workers'; +// Send incremented counter value every second async function* dataSource(signal: AbortSignal) { let counter = 0; while (!signal.aborted) { @@ -35,17 +37,15 @@ export class MyDurableObject extends DurableObject { const abortController = new AbortController(); const stream = new ReadableStream({ - start(controller) { + async start(controller) { if (request.signal.aborted) { controller.close(); return; } - (async () => { - for await (const value of dataSource(AbortSignal.any([request.signal, abortController.signal]))) { - controller.enqueue(new TextEncoder().encode(String(value))); - } - })(); + for await (const value of dataSource(AbortSignal.any([request.signal, abortController.signal]))) { + controller.enqueue(new TextEncoder().encode(String(value))); + } }, cancel() { console.log('Stream cancelled'); @@ -91,6 +91,7 @@ export default { }, } satisfies ExportedHandler; ``` + :::note From 232812538f66bde579a653770d75200f87a6810b Mon Sep 17 00:00:00 2001 From: Jun Lee Date: Fri, 11 Jul 2025 09:31:26 +0100 Subject: [PATCH 4/4] Updating code --- .../examples/readable-stream.mdx | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/content/docs/durable-objects/examples/readable-stream.mdx b/src/content/docs/durable-objects/examples/readable-stream.mdx index 9fc7c45cb7bba79..7d76451db7d0b4a 100644 --- a/src/content/docs/durable-objects/examples/readable-stream.mdx +++ b/src/content/docs/durable-objects/examples/readable-stream.mdx @@ -40,10 +40,11 @@ export class MyDurableObject extends DurableObject { async start(controller) { if (request.signal.aborted) { controller.close(); + abortController.abort(); return; } - for await (const value of dataSource(AbortSignal.any([request.signal, abortController.signal]))) { + for await (const value of dataSource(abortController.signal)) { controller.enqueue(new TextEncoder().encode(String(value))); } }, @@ -65,29 +66,35 @@ export default { async fetch(request, env, ctx): Promise { const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id'); const stub = env.MY_DURABLE_OBJECT.get(id); - const response = await stub.fetch(request); + const response = await stub.fetch(request, { ...request }); if (!response.ok || !response.body) { return new Response('Invalid response', { status: 500 }); } - const reader = response.body.getReader(); + const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); - // Cancel the stream after 5 messages + let data = [] as string[]; let i = 0; while (true) { + // Cancel the stream after 5 messages if (i > 5) { reader.cancel(); break; } const { value, done } = await reader.read(); - console.log(`Got value ${new TextDecoder().decode(value)}`); + + if (value) { + console.log(`Got value ${value}`); + data = [...data, value]; + } + if (done) { break; } i++; } - return new Response('Done', { status: 200 }); + return Response.json(data); }, } satisfies ExportedHandler; ```