Skip to content

Commit d0812f7

Browse files
Send RESUME_NONE when no active stream (#1059)
When a client sends CF_AGENT_STREAM_RESUME_REQUEST and no active stream exists, the agent now responds with CF_AGENT_STREAM_RESUME_NONE instead of waiting for a 5s timeout. Added MessageType.CF_AGENT_STREAM_RESUME_NONE and outgoing message typing, server-side send in index.ts, and client handling in react.tsx to call handleStreamResumeNone. The WebSocket transport gains a _resumeNoneResolver, handleStreamResumeNone(), and wiring in reconnectToStream to resolve immediately with null; timeouts are preserved as a safety net. Tests updated/added to assert the RESUME_NONE message and transport behavior. This eliminates the UI stall on open/switch/refresh when there's no active stream by collapsing the previous 5-second wait to a single WS round-trip.
1 parent 054e65d commit d0812f7

File tree

7 files changed

+96
-3
lines changed

7 files changed

+96
-3
lines changed

.changeset/resume-none-response.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@cloudflare/ai-chat": patch
3+
---
4+
5+
Server now responds with `CF_AGENT_STREAM_RESUME_NONE` when a client sends `CF_AGENT_STREAM_RESUME_REQUEST` and no active stream exists. This collapses the previous 5-second timeout to a single WebSocket round-trip, fixing the UI stall on every conversation open/switch/refresh when there is no active stream.

packages/ai-chat/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,12 @@ export class AIChatAgent<
539539
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_REQUEST) {
540540
if (this._resumableStream.hasActiveStream()) {
541541
this._notifyStreamResuming(connection);
542+
} else {
543+
connection.send(
544+
JSON.stringify({
545+
type: MessageType.CF_AGENT_STREAM_RESUME_NONE
546+
})
547+
);
542548
}
543549
return;
544550
}

packages/ai-chat/src/react.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,12 @@ export function useAgentChat<
10521052
});
10531053
break;
10541054

1055+
case MessageType.CF_AGENT_STREAM_RESUME_NONE:
1056+
// Server confirmed no active stream — let the transport
1057+
// resolve reconnectToStream immediately with null.
1058+
customTransport.handleStreamResumeNone();
1059+
break;
1060+
10551061
case MessageType.CF_AGENT_STREAM_RESUMING:
10561062
if (!resume) return;
10571063
// Let the transport handle it if reconnectToStream is waiting.

packages/ai-chat/src/tests/resumable-streaming.test.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ describe("Resumable Streaming", () => {
601601
ws2.close(1000);
602602
});
603603

604-
it("CF_AGENT_STREAM_RESUME_REQUEST with no active stream is a no-op", async () => {
604+
it("CF_AGENT_STREAM_RESUME_REQUEST with no active stream sends RESUME_NONE", async () => {
605605
const room = crypto.randomUUID();
606606
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
607607
const messages = collectMessages(ws);
@@ -621,6 +621,16 @@ describe("Resumable Streaming", () => {
621621
const resumeMsg = messages.find(isStreamResumingMessage);
622622
expect(resumeMsg).toBeUndefined();
623623

624+
// Should get CF_AGENT_STREAM_RESUME_NONE
625+
const noneMsg = messages.find(
626+
(m) =>
627+
typeof m === "object" &&
628+
m !== null &&
629+
"type" in m &&
630+
m.type === MessageType.CF_AGENT_STREAM_RESUME_NONE
631+
);
632+
expect(noneMsg).toBeDefined();
633+
624634
ws.close(1000);
625635
});
626636

packages/ai-chat/src/tests/ws-transport-resume.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,44 @@ describe("WebSocketChatTransport reconnectToStream + handleStreamResuming", () =
109109
expect(activeRequestIds.has("req-tracked")).toBe(true);
110110
});
111111

112+
// ── handleStreamResumeNone basics ────────────────────────────────────
113+
114+
it("handleStreamResumeNone returns false when no reconnectToStream is pending", () => {
115+
expect(transport.handleStreamResumeNone()).toBe(false);
116+
});
117+
118+
it("handleStreamResumeNone resolves reconnectToStream with null immediately", async () => {
119+
const promise = transport.reconnectToStream({ chatId: "chat-1" });
120+
121+
const handled = transport.handleStreamResumeNone();
122+
expect(handled).toBe(true);
123+
124+
const result = await promise;
125+
expect(result).toBeNull();
126+
});
127+
128+
it("handleStreamResumeNone clears both resolvers so subsequent calls return false", async () => {
129+
const promise = transport.reconnectToStream({ chatId: "chat-1" });
130+
131+
transport.handleStreamResumeNone();
132+
await promise;
133+
134+
expect(transport.handleStreamResumeNone()).toBe(false);
135+
expect(transport.handleStreamResuming({ id: "late" })).toBe(false);
136+
});
137+
138+
it("handleStreamResuming after handleStreamResumeNone does not double-resolve", async () => {
139+
const promise = transport.reconnectToStream({ chatId: "chat-1" });
140+
141+
// RESUME_NONE arrives first
142+
transport.handleStreamResumeNone();
143+
const result = await promise;
144+
expect(result).toBeNull();
145+
146+
// Late STREAM_RESUMING should be ignored
147+
expect(transport.handleStreamResuming({ id: "req-late" })).toBe(false);
148+
});
149+
112150
// ── Timeout behavior ─────────────────────────────────────────────────
113151

114152
it("resolves null after timeout when no handleStreamResuming is called", async () => {

packages/ai-chat/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ export enum MessageType {
1616
CF_AGENT_STREAM_RESUME_ACK = "cf_agent_stream_resume_ack",
1717
/** Sent by client after message handler is ready, requesting stream resume check */
1818
CF_AGENT_STREAM_RESUME_REQUEST = "cf_agent_stream_resume_request",
19+
/** Sent by server when client requests resume but no active stream exists */
20+
CF_AGENT_STREAM_RESUME_NONE = "cf_agent_stream_resume_none",
1921

2022
/** Client sends tool result to server (for client-side tools) */
2123
CF_AGENT_TOOL_RESULT = "cf_agent_tool_result",
@@ -68,6 +70,10 @@ export type OutgoingMessage<ChatMessage extends UIMessage = UIMessage> =
6870
type: MessageType.CF_AGENT_MESSAGE_UPDATED;
6971
/** The updated message */
7072
message: ChatMessage;
73+
}
74+
| {
75+
/** Server responds to resume request when no active stream exists */
76+
type: MessageType.CF_AGENT_STREAM_RESUME_NONE;
7177
};
7278

7379
/**

packages/ai-chat/src/ws-chat-transport.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ export class WebSocketChatTransport<
6666
// Pending resume resolver — set by reconnectToStream, called by
6767
// handleStreamResuming when onAgentMessage sees CF_AGENT_STREAM_RESUMING.
6868
private _resumeResolver: ((data: { id: string }) => void) | null = null;
69+
// Pending "no stream" resolver — called by handleStreamResumeNone
70+
// when onAgentMessage sees CF_AGENT_STREAM_RESUME_NONE.
71+
private _resumeNoneResolver: (() => void) | null = null;
6972

7073
constructor(options: WebSocketChatTransportOptions<ChatMessage>) {
7174
this.agent = options.agent;
@@ -85,6 +88,17 @@ export class WebSocketChatTransport<
8588
return true;
8689
}
8790

91+
/**
92+
* Called by onAgentMessage when it receives CF_AGENT_STREAM_RESUME_NONE.
93+
* If reconnectToStream is waiting, resolves the promise with null
94+
* immediately (no 5-second timeout). Returns true if handled.
95+
*/
96+
handleStreamResumeNone(): boolean {
97+
if (!this._resumeNoneResolver) return false;
98+
this._resumeNoneResolver();
99+
return true;
100+
}
101+
88102
async sendMessages(options: {
89103
chatId: string;
90104
messages: ChatMessage[];
@@ -253,10 +267,16 @@ export class WebSocketChatTransport<
253267
if (resolved) return;
254268
resolved = true;
255269
this._resumeResolver = null;
270+
this._resumeNoneResolver = null;
256271
if (timeout) clearTimeout(timeout);
257272
resolve(value);
258273
};
259274

275+
// Set the "no stream" resolver that handleStreamResumeNone() will call.
276+
// When onAgentMessage sees CF_AGENT_STREAM_RESUME_NONE, it calls
277+
// handleStreamResumeNone() which resolves immediately with null.
278+
this._resumeNoneResolver = () => done(null);
279+
260280
// Set the resolver that handleStreamResuming() will call.
261281
// When onAgentMessage sees CF_AGENT_STREAM_RESUMING, it calls
262282
// handleStreamResuming() which invokes this callback.
@@ -292,8 +312,10 @@ export class WebSocketChatTransport<
292312
// WebSocket may already be closed
293313
}
294314

295-
// Timeout: if no CF_AGENT_STREAM_RESUMING arrives (no active
296-
// stream, or WebSocket never connects), resolve null.
315+
// Safety-net timeout: if the WebSocket never connects or the
316+
// server is unreachable, resolve null. Under normal operation
317+
// the server responds with STREAM_RESUMING or STREAM_RESUME_NONE
318+
// well before this fires.
297319
timeout = setTimeout(() => done(null), 5000);
298320
});
299321
}

0 commit comments

Comments
 (0)