Skip to content

Commit f99a46d

Browse files
authored
[DO] Adding that ReadableStream cancellation propagates from Worker to DO (#23478)
* Adding that ReadableStream cancellation propagates from Worker to DO * Adding DO example * Updating stream section of the code * Updating code
1 parent 37d460f commit f99a46d

File tree

2 files changed

+107
-2
lines changed

2 files changed

+107
-2
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
---
2+
type: example
3+
summary: Stream ReadableStream from Durable Objects.
4+
pcx_content_type: example
5+
title: Use ReadableStream with Durable Object and Workers
6+
sidebar:
7+
order: 3
8+
description: Stream ReadableStream from Durable Objects.
9+
---
10+
11+
import { GlossaryTooltip, WranglerConfig, TypeScriptExample } from "~/components";
12+
13+
This example demonstrates:
14+
15+
- A Worker receives a request, and forwards it to a Durable Object `my-id`.
16+
- The Durable Object streams an incrementing number every second, until it receives `AbortSignal`.
17+
- The Worker reads and logs the values from the stream.
18+
- The Worker then cancels the stream after 5 values.
19+
20+
<TypeScriptExample>
21+
```ts
22+
import { DurableObject } from 'cloudflare:workers';
23+
24+
// Send incremented counter value every second
25+
async function* dataSource(signal: AbortSignal) {
26+
let counter = 0;
27+
while (!signal.aborted) {
28+
yield counter++;
29+
await new Promise((resolve) => setTimeout(resolve, 1_000));
30+
}
31+
32+
console.log('Data source cancelled');
33+
}
34+
35+
export class MyDurableObject extends DurableObject<Env> {
36+
async fetch(request: Request): Promise<Response> {
37+
const abortController = new AbortController();
38+
39+
const stream = new ReadableStream({
40+
async start(controller) {
41+
if (request.signal.aborted) {
42+
controller.close();
43+
abortController.abort();
44+
return;
45+
}
46+
47+
for await (const value of dataSource(abortController.signal)) {
48+
controller.enqueue(new TextEncoder().encode(String(value)));
49+
}
50+
},
51+
cancel() {
52+
console.log('Stream cancelled');
53+
abortController.abort();
54+
},
55+
});
56+
57+
const headers = new Headers({
58+
'Content-Type': 'application/octet-stream',
59+
});
60+
61+
return new Response(stream, { headers });
62+
}
63+
}
64+
65+
export default {
66+
async fetch(request, env, ctx): Promise<Response> {
67+
const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id');
68+
const stub = env.MY_DURABLE_OBJECT.get(id);
69+
const response = await stub.fetch(request, { ...request });
70+
if (!response.ok || !response.body) {
71+
return new Response('Invalid response', { status: 500 });
72+
}
73+
74+
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
75+
76+
let data = [] as string[];
77+
let i = 0;
78+
while (true) {
79+
// Cancel the stream after 5 messages
80+
if (i > 5) {
81+
reader.cancel();
82+
break;
83+
}
84+
const { value, done } = await reader.read();
85+
86+
if (value) {
87+
console.log(`Got value ${value}`);
88+
data = [...data, value];
89+
}
90+
91+
if (done) {
92+
break;
93+
}
94+
i++;
95+
}
96+
97+
return Response.json(data);
98+
},
99+
} satisfies ExportedHandler<Env>;
100+
```
101+
</TypeScriptExample>
102+
103+
:::note
104+
105+
In a setup where a Durable Object returns a readable stream to a Worker, if the Worker cancels the Durable Object's readable stream, the cancellation propagates to the Durable Object.
106+
107+
:::

src/content/docs/workers/runtime-apis/streams/readablestream.mdx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ let reader = readable.getReader({ mode: 'byob' });
4747

4848
* When `true`, errors in the source `ReadableStream` will no longer abort the destination `WritableStream`. `pipeTo` will return a rejected promise with the error from the source or any error that occurred while aborting the destination.
4949

50-
51-
5250
***
5351

5452
## Related resources

0 commit comments

Comments
 (0)