diff --git a/src/content/docs/durable-objects/examples/readable-stream.mdx b/src/content/docs/durable-objects/examples/readable-stream.mdx new file mode 100644 index 00000000000000..7d76451db7d0b4 --- /dev/null +++ b/src/content/docs/durable-objects/examples/readable-stream.mdx @@ -0,0 +1,107 @@ +--- +type: example +summary: Stream ReadableStream from Durable Objects. +pcx_content_type: example +title: Use ReadableStream with Durable Object and Workers +sidebar: + order: 3 +description: Stream ReadableStream from Durable Objects. +--- + +import { GlossaryTooltip, WranglerConfig, TypeScriptExample } from "~/components"; + +This example demonstrates: + +- A Worker receives a request, and forwards it to a Durable Object `my-id`. +- The Durable Object streams an incrementing number every second, until it receives `AbortSignal`. +- The Worker reads and logs the values from the stream. +- The Worker then cancels the stream after 5 values. + + +```ts +import { DurableObject } from 'cloudflare:workers'; + +// Send incremented counter value every second +async function* dataSource(signal: AbortSignal) { + let counter = 0; + while (!signal.aborted) { + yield counter++; + await new Promise((resolve) => setTimeout(resolve, 1_000)); + } + + console.log('Data source cancelled'); +} + +export class MyDurableObject extends DurableObject { + async fetch(request: Request): Promise { + const abortController = new AbortController(); + + const stream = new ReadableStream({ + async start(controller) { + if (request.signal.aborted) { + controller.close(); + abortController.abort(); + return; + } + + for await (const value of dataSource(abortController.signal)) { + controller.enqueue(new TextEncoder().encode(String(value))); + } + }, + cancel() { + console.log('Stream cancelled'); + abortController.abort(); + }, + }); + + const headers = new Headers({ + 'Content-Type': 'application/octet-stream', + }); + + return new Response(stream, { headers }); + } +} + +export default { + async fetch(request, env, ctx): Promise { + const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id'); + const stub = env.MY_DURABLE_OBJECT.get(id); + const response = await stub.fetch(request, { ...request }); + if (!response.ok || !response.body) { + return new Response('Invalid response', { status: 500 }); + } + + const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); + + let data = [] as string[]; + let i = 0; + while (true) { + // Cancel the stream after 5 messages + if (i > 5) { + reader.cancel(); + break; + } + const { value, done } = await reader.read(); + + if (value) { + console.log(`Got value ${value}`); + data = [...data, value]; + } + + if (done) { + break; + } + i++; + } + + return Response.json(data); + }, +} satisfies ExportedHandler; +``` + + +:::note + +In a setup where a Durable Object returns a readable stream to a Worker, if the Worker cancels the Durable Object's readable stream, the cancellation propagates to the Durable Object. + +::: \ No newline at end of file diff --git a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx index be5ca91bd19629..e3082ff1255db9 100644 --- a/src/content/docs/workers/runtime-apis/streams/readablestream.mdx +++ b/src/content/docs/workers/runtime-apis/streams/readablestream.mdx @@ -47,8 +47,6 @@ let reader = readable.getReader({ mode: 'byob' }); * When `true`, errors in the source `ReadableStream` will no longer abort the destination `WritableStream`. `pipeTo` will return a rejected promise with the error from the source or any error that occurred while aborting the destination. - - *** ## Related resources