Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wild-chicken-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/ai-chat": patch
---

Fix resumable streaming to avoid delivering live chunks before resume ACK
41 changes: 40 additions & 1 deletion packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ export class AIChatAgent<
*/
private _lastCleanupTime = 0;

/**
* Set of connection IDs that are pending stream resume.
* These connections have received CF_AGENT_STREAM_RESUMING but haven't sent ACK yet.
* They should be excluded from live stream broadcasts until they ACK.
* @internal
*/
private _pendingResumeConnections: Set<string> = new Set();

/** Array of chat messages for the current conversation */
messages: ChatMessage[];

Expand Down Expand Up @@ -286,6 +294,20 @@ export class AIChatAgent<
return _onConnect(connection, ctx);
};

// Wrap onClose to clean up pending resume connections
const _onClose = this.onClose.bind(this);
this.onClose = async (
connection: Connection,
code: number,
reason: string,
wasClean: boolean
) => {
// Clean up pending resume state for this connection
this._pendingResumeConnections.delete(connection.id);
// Call consumer's onClose
return _onClose(connection, code, reason, wasClean);
};

// Wrap onMessage
const _onMessage = this.onMessage.bind(this);
this.onMessage = async (connection: Connection, message: WSMessage) => {
Expand Down Expand Up @@ -395,6 +417,7 @@ export class AIChatAgent<
this._activeStreamId = null;
this._activeRequestId = null;
this._streamChunkIndex = 0;
this._pendingResumeConnections.clear();
this.messages = [];
this._broadcastChatMessage(
{ type: MessageType.CF_AGENT_CHAT_CLEAR },
Expand All @@ -418,6 +441,8 @@ export class AIChatAgent<

// Handle stream resume acknowledgment
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_ACK) {
this._pendingResumeConnections.delete(connection.id);

if (
this._activeStreamId &&
this._activeRequestId &&
Expand Down Expand Up @@ -574,6 +599,10 @@ export class AIChatAgent<
return;
}

// Add connection to pending set - they'll be excluded from live broadcasts
// until they send ACK to receive the full stream replay
this._pendingResumeConnections.add(connection.id);

// Notify client - they will send ACK when ready
connection.send(
JSON.stringify({
Expand Down Expand Up @@ -726,6 +755,9 @@ export class AIChatAgent<
this._activeRequestId = null;
this._streamChunkIndex = 0;

// Clear pending resume connections - no active stream to resume
this._pendingResumeConnections.clear();

// Periodically clean up old streams (not on every completion)
this._maybeCleanupOldStreams();
}
Expand Down Expand Up @@ -756,7 +788,14 @@ export class AIChatAgent<
}

private _broadcastChatMessage(message: OutgoingMessage, exclude?: string[]) {
this.broadcast(JSON.stringify(message), exclude);
// Combine explicit exclusions with connections pending stream resume.
// Pending connections should not receive live stream chunks until they ACK,
// at which point they'll receive the full replay via _sendStreamChunks.
const allExclusions = [
...(exclude || []),
...this._pendingResumeConnections
];
this.broadcast(JSON.stringify(message), allExclusions);
}

/**
Expand Down
78 changes: 78 additions & 0 deletions packages/ai-chat/src/tests/resumable-streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,84 @@ describe("Resumable Streaming", () => {
ws2.close();
});

it("does not deliver live chunks before ACK to resuming connections", async () => {
const room = crypto.randomUUID();

// First connection - start a stream
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages1 = collectMessages(ws1);
await new Promise((r) => setTimeout(r, 50));

const agentStub = env.TestChatAgent.get(
env.TestChatAgent.idFromName(room)
);
const streamId = await agentStub.testStartStream("req-live");

// Second connection - will be notified to resume
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);

await new Promise((r) => setTimeout(r, 100));

// Broadcast a live chunk while ws2 is pending resume (no ACK yet)
await agentStub.testBroadcastLiveChunk(
"req-live",
streamId,
'{"type":"text-delta","id":"0","delta":"A"}'
);

await new Promise((r) => setTimeout(r, 100));

// ws2 should NOT receive live chunks before ACK
const preAckChunks = messages2.filter(isUseChatResponseMessage);
expect(preAckChunks.length).toBe(0);

// ws1 should receive the live chunk
const ws1Chunks = messages1.filter(isUseChatResponseMessage);
expect(ws1Chunks.length).toBe(1);
expect(ws1Chunks[0].body).toBe(
'{"type":"text-delta","id":"0","delta":"A"}'
);

// Send ACK to resume
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: "req-live"
})
);

await new Promise((r) => setTimeout(r, 100));

// After ACK, ws2 should receive the replayed chunk
const postAckChunks = messages2.filter(isUseChatResponseMessage);
expect(postAckChunks.length).toBeGreaterThanOrEqual(1);
expect(postAckChunks[0].body).toBe(
'{"type":"text-delta","id":"0","delta":"A"}'
);

// Live chunks after ACK should be delivered
await agentStub.testBroadcastLiveChunk(
"req-live",
streamId,
'{"type":"text-delta","id":"0","delta":"B"}'
);

await new Promise((r) => setTimeout(r, 100));

const finalChunks = messages2.filter(isUseChatResponseMessage);
expect(finalChunks.some((m) => m.body?.includes('"delta":"B"'))).toBe(
true
);

ws1.close();
ws2.close();
});

it("ignores ACK with wrong request ID", async () => {
const room = crypto.randomUUID();

Expand Down
24 changes: 24 additions & 0 deletions packages/ai-chat/src/tests/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AIChatAgent } from "../";
import type { UIMessage as ChatMessage } from "ai";
import { callable, getCurrentAgent, routeAgentRequest } from "agents";
import { MessageType, type OutgoingMessage } from "../types";

// Type helper for tool call parts - extracts from ChatMessage parts
type TestToolCallPart = Extract<
Expand Down Expand Up @@ -148,6 +149,29 @@ export class TestChatAgent extends AIChatAgent<Env> {
this._storeStreamChunk(streamId, body);
}

@callable()
testBroadcastLiveChunk(
requestId: string,
streamId: string,
body: string
): void {
this._storeStreamChunk(streamId, body);
const message: OutgoingMessage = {
body,
done: false,
id: requestId,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
};
(
this as unknown as {
_broadcastChatMessage: (
msg: OutgoingMessage,
exclude?: string[]
) => void;
}
)._broadcastChatMessage(message);
}

@callable()
testFlushChunkBuffer(): void {
this._flushChunkBuffer();
Expand Down
Loading