-
Notifications
You must be signed in to change notification settings - Fork 10k
[DO] Adding that ReadableStream cancellation propagates from Worker to DO #23478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Howdy and thanks for contributing to our repo. The Cloudflare team reviews new, external PRs within two (2) weeks. If it's been two weeks or longer without any movement, please tag the PR Assignees in a comment. We review internal PRs within 1 week. If it's something urgent or has been sitting without a comment, start a thread in the Developer Docs space internally. PR Change SummaryEnhanced documentation to clarify the propagation of ReadableStream cancellation from Workers to Durable Objects.
Modified Files
How can I customize these reviews?Check out the Hyperlint AI Reviewer docs for more information on how to customize the review. If you just want to ignore it on this PR, you can add the Note specifically for link checks, we only check the first 30 links in a file and we cache the results for several hours (for instance, if you just added a page, you might experience this). Our recommendation is to add |
|
This pull request requires reviews from CODEOWNERS as it changes files that match the following patterns:
|
TimoWilhelm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return the values and simplify the cancellation a bit
import { DurableObject } from 'cloudflare:workers';
async function* dataSource(signal: AbortSignal) {
let counter = 0;
while (!signal.aborted) {
yield counter++;
await new Promise((resolve) => setTimeout(resolve, 1_000));
}
console.log('Data source cancelled');
}
export class MyDurableObject extends DurableObject<Env> {
async fetch(request: Request): Promise<Response> {
const abortController = new AbortController();
const stream = new ReadableStream({
async start(controller) {
if (request.signal.aborted) {
controller.close();
abortController.abort();
return;
}
for await (const value of dataSource(abortController.signal)) {
controller.enqueue(new TextEncoder().encode(String(value)));
}
},
cancel() {
console.log('Stream cancelled');
abortController.abort();
},
});
const headers = new Headers({
'Content-Type': 'application/octet-stream',
});
return new Response(stream, { headers });
}
}
export default {
async fetch(request, env, ctx): Promise<Response> {
const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id');
const stub = env.MY_DURABLE_OBJECT.get(id);
const response = await stub.fetch(request, { ...request });
if (!response.ok || !response.body) {
return new Response('Invalid response', { status: 500 });
}
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
let data = [] as string[];
let i = 0;
while (true) {
// Cancel the stream after 5 messages
if (i > 5) {
reader.cancel();
break;
}
const { value, done } = await reader.read();
if (value) {
console.log(`Got value ${value}`);
data = [...data, value];
}
if (done) {
break;
}
i++;
}
return Response.json(data);
},
} satisfies ExportedHandler<Env>;…o DO (cloudflare#23478) * Adding that ReadableStream cancellation propagates from Worker to DO * Adding DO example * Updating stream section of the code * Updating code
…o DO (#23478) * Adding that ReadableStream cancellation propagates from Worker to DO * Adding DO example * Updating stream section of the code * Updating code
Summary
Screenshots (optional)
Documentation checklist