Skip to content

Commit a20aaf6

Browse files
whoiskatrinthreepointone
authored andcommitted
more refactorting / more tests
1 parent a66c0c8 commit a20aaf6

File tree

5 files changed

+191
-163
lines changed

5 files changed

+191
-163
lines changed

packages/agents/package.json

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@
4141
"devDependencies": {
4242
"@cloudflare/workers-oauth-provider": "^0.1.0",
4343
"@types/yargs": "^17.0.34",
44+
"@testing-library/react": "^16.3.0",
45+
"@testing-library/react-hooks": "^8.0.1",
46+
"msw": "^2.11.3",
4447
"react": "*",
4548
"vitest-browser-react": "^1.0.1",
4649
"x402": "^0.7.1"
@@ -139,16 +142,6 @@
139142
"types": "./dist/use-agent-chat-http.d.ts",
140143
"import": "./dist/use-agent-chat-http.js",
141144
"require": "./dist/use-agent-chat-http.js"
142-
},
143-
"./ai-chat-agent-resumable-http": {
144-
"types": "./dist/ai-chat-agent-resumable-http.d.ts",
145-
"import": "./dist/ai-chat-agent-resumable-http.js",
146-
"require": "./dist/ai-chat-agent-resumable-http.js"
147-
},
148-
"./use-agent-chat-resumable-http": {
149-
"types": "./dist/use-agent-chat-resumable-http.d.ts",
150-
"import": "./dist/use-agent-chat-resumable-http.js",
151-
"require": "./dist/use-agent-chat-resumable-http.js"
152145
}
153146
},
154147
"publishConfig": {

packages/agents/src/ai-chat-agent-http.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@ export class AIHttpChatAgent<
5959
return this._handlePostChat(request);
6060
}
6161

62-
// GET /stream/{streamId} - Resume interrupted stream
63-
if (pathname.includes("/stream/") && request.method === "GET") {
64-
const streamId = pathname.split("/stream/")[1];
65-
return this._handleResumeStream(streamId);
66-
}
67-
6862
// POST /stream/{streamId}/cancel - Cancel active stream
6963
if (
7064
pathname.includes("/stream/") &&
@@ -85,6 +79,12 @@ export class AIHttpChatAgent<
8579
return this._handleStreamStatus(streamId);
8680
}
8781

82+
// GET /stream/{streamId} - Resume interrupted stream
83+
if (pathname.includes("/stream/") && request.method === "GET") {
84+
const streamId = pathname.split("/stream/")[1];
85+
return this._handleResumeStream(streamId);
86+
}
87+
8888
// DELETE /messages - Clear message history
8989
if (pathname.endsWith("/messages") && request.method === "DELETE") {
9090
return this._handleClearMessages();

packages/agents/src/resumable-stream-manager.ts

Lines changed: 83 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ interface StreamStateRow {
1616
headers?: string;
1717
}
1818

19-
interface ChunkRow {
19+
interface TextDeltaRow {
2020
stream_id: string;
2121
seq: number;
22-
chunk: string;
22+
text_delta: string;
2323
created_at?: string;
2424
}
2525

2626
interface StreamStatusRow {
27-
content: string;
28-
position: number;
27+
seq: number;
28+
fetching: number;
2929
completed: number;
3030
created_at: string;
3131
updated_at: string;
@@ -41,6 +41,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
4141
seq: number; // Current chunk sequence number
4242
fetching: boolean; // Is upstream still fetching?
4343
completed: boolean;
44+
upstreamReader?: ReadableStreamDefaultReader<Uint8Array>; // Reader for upstream response
4445
timestamp: number;
4546
readers: Set<
4647
WritableStreamDefaultWriter | ReadableStreamDefaultController
@@ -80,11 +81,11 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
8081
updated_at datetime default current_timestamp
8182
)`;
8283

83-
// Initialize stream chunks table
84-
this.sql`create table if not exists cf_ai_http_chat_chunks (
84+
// Initialize stream text deltas table
85+
this.sql`create table if not exists cf_ai_http_chat_text_deltas (
8586
stream_id text not null,
8687
seq integer not null,
87-
chunk blob not null,
88+
text_delta text not null,
8889
created_at datetime default current_timestamp,
8990
primary key (stream_id, seq)
9091
)`;
@@ -216,14 +217,14 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
216217
* Cancel an active stream
217218
*/
218219
async cancelStream(streamId: string): Promise<Response> {
219-
// Mark stream as completed to stop further processing
220-
this.sql`
221-
update cf_ai_http_chat_streams
222-
set completed = 1, updated_at = current_timestamp
223-
where stream_id = ${streamId}
224-
`;
220+
const state = this._activeStreams.get(streamId);
221+
if (state) {
222+
try {
223+
await state.upstreamReader?.cancel();
224+
} catch {}
225+
}
225226

226-
this._activeStreams.delete(streamId);
227+
this._markStreamCompleted(streamId);
227228

228229
return new Response(
229230
JSON.stringify({ success: true, message: "Stream cancelled" }),
@@ -236,7 +237,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
236237
*/
237238
async getStreamStatus(streamId: string): Promise<Response> {
238239
const streamState = this.sql`
239-
select * from cf_ai_http_chat_streams
240+
select seq, fetching, completed, created_at, updated_at from cf_ai_http_chat_streams
240241
where stream_id = ${streamId}
241242
`[0] as unknown as StreamStatusRow | undefined;
242243

@@ -250,8 +251,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
250251
return new Response(
251252
JSON.stringify({
252253
streamId,
253-
position: streamState.position,
254-
contentLength: streamState.content.length,
254+
position: streamState.seq,
255255
completed: Boolean(streamState.completed),
256256
createdAt: streamState.created_at,
257257
updatedAt: streamState.updated_at
@@ -261,11 +261,11 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
261261
}
262262

263263
/**
264-
* Clear all streams and chunks
264+
* Clear all streams and text deltas
265265
*/
266266
async clearStreams(): Promise<void> {
267267
this.sql`delete from cf_ai_http_chat_streams`;
268-
this.sql`delete from cf_ai_http_chat_chunks`;
268+
this.sql`delete from cf_ai_http_chat_text_deltas`;
269269
this._activeStreams.clear();
270270
}
271271

@@ -279,7 +279,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
279279

280280
// Drop all tables
281281
this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_streams`;
282-
this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_chunks`;
282+
this.sql`DROP TABLE IF EXISTS cf_ai_http_chat_text_deltas`;
283283
}
284284

285285
/**
@@ -296,7 +296,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
296296
`;
297297

298298
this.sql`
299-
delete from cf_ai_http_chat_chunks
299+
delete from cf_ai_http_chat_text_deltas
300300
where stream_id in (
301301
select stream_id from cf_ai_http_chat_streams
302302
where completed = 1 and updated_at < ${cutoffTime}
@@ -376,6 +376,8 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
376376
const streamState = this._activeStreams.get(streamId);
377377
if (!streamState) return;
378378

379+
streamState.upstreamReader = reader;
380+
379381
let assistantMessageText = "";
380382
const assistantMessageId = `assistant_${nanoid()}`;
381383
let buffer = "";
@@ -410,49 +412,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
410412
break;
411413
}
412414

413-
// Store raw chunk with sequence number atomically
414-
const chunkBase64 = btoa(String.fromCharCode(...value));
415-
416-
try {
417-
// Atomically get next sequence number and insert chunk
418-
const seqResult = this.sql`
419-
update cf_ai_http_chat_streams
420-
set seq = seq + 1, updated_at = current_timestamp
421-
where stream_id = ${streamId}
422-
returning seq
423-
`;
424-
425-
const seq = Number(seqResult[0]?.seq) || streamState.seq++;
426-
427-
this.sql`
428-
insert into cf_ai_http_chat_chunks (stream_id, seq, chunk)
429-
values (${streamId}, ${seq}, ${chunkBase64})
430-
`;
431-
432-
// Update in-memory state to match database
433-
streamState.seq = seq + 1;
434-
} catch (sqlError) {
435-
console.error(
436-
`[ResumableStreamManager] SQL error for stream ${streamId}:`,
437-
sqlError
438-
);
439-
// Fall back to in-memory sequence if SQL fails
440-
const seq = streamState.seq++;
441-
try {
442-
this.sql`
443-
insert into cf_ai_http_chat_chunks (stream_id, seq, chunk)
444-
values (${streamId}, ${seq}, ${chunkBase64})
445-
`;
446-
} catch (fallbackError) {
447-
console.error(
448-
`[ResumableStreamManager] Fallback SQL error for stream ${streamId}:`,
449-
fallbackError
450-
);
451-
// Continue processing even if storage fails
452-
}
453-
}
454-
455-
// Parse for assistant message content
415+
// Parse SSE chunk for text content first
456416
const chunk = decoder.decode(value, { stream: true });
457417
buffer += chunk;
458418

@@ -468,6 +428,30 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
468428
const data = JSON.parse(dataStr);
469429
if (data.type === "text-delta" && data.delta) {
470430
assistantMessageText += data.delta;
431+
432+
// Store the parsed text delta
433+
try {
434+
const seqResult = this.sql`
435+
update cf_ai_http_chat_streams
436+
set seq = seq + 1, updated_at = current_timestamp
437+
where stream_id = ${streamId}
438+
returning seq
439+
`;
440+
441+
const seq = Number(seqResult[0]?.seq) || streamState.seq++;
442+
443+
this.sql`
444+
insert into cf_ai_http_chat_text_deltas (stream_id, seq, text_delta)
445+
values (${streamId}, ${seq}, ${data.delta})
446+
`;
447+
448+
streamState.seq = seq + 1;
449+
} catch (sqlError) {
450+
console.error(
451+
`[ResumableStreamManager] SQL error storing text delta for ${streamId}:`,
452+
sqlError
453+
);
454+
}
471455
}
472456
} catch {
473457
// Ignore parse errors
@@ -508,13 +492,17 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
508492
await persistMessages([...messages, assistantMessage]);
509493
}
510494
} finally {
495+
// Clear the upstream reader reference
496+
const currentState = this._activeStreams.get(streamId);
497+
if (currentState) {
498+
currentState.upstreamReader = undefined;
499+
}
511500
// Only mark as completed if stream finished naturally, not if interrupted
512501
if (completedNaturally) {
513502
this._markStreamCompleted(streamId);
514503
} else {
515504
// Stream was interrupted - update fetching state but don't mark as completed
516-
const currentState = this._activeStreams.get(streamId);
517-
if (currentState) {
505+
if (currentState && !currentState.completed) {
518506
currentState.fetching = false;
519507
}
520508
try {
@@ -554,7 +542,10 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
554542

555543
if (!dbState) {
556544
console.log(`[ResumableStreamManager] No DB state found for ${streamId}`);
557-
return new Response("Stream not found", { status: 404 });
545+
return new Response(JSON.stringify({ error: "Stream not found" }), {
546+
status: 404,
547+
headers: { "Content-Type": "application/json" }
548+
});
558549
}
559550

560551
let streamState = this._activeStreams.get(streamId);
@@ -592,22 +583,22 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
592583
// Replay stored chunks and setup live streaming
593584
(async () => {
594585
try {
595-
// 1. Replay stored chunks with concurrency control
586+
// 1. Replay stored text deltas
596587
await this.ctx.blockConcurrencyWhile(async () => {
597-
const chunks = this.sql`
598-
select seq, chunk from cf_ai_http_chat_chunks
588+
const textDeltas = this.sql`
589+
select seq, text_delta from cf_ai_http_chat_text_deltas
599590
where stream_id = ${streamId}
600591
order by seq asc
601-
` as unknown as Pick<ChunkRow, "seq" | "chunk">[];
602-
603-
for (const row of chunks) {
604-
// Decode base64 back to Uint8Array
605-
const chunkBase64 = row.chunk;
606-
const binaryString = atob(chunkBase64);
607-
const bytes = new Uint8Array(binaryString.length);
608-
for (let i = 0; i < binaryString.length; i++) {
609-
bytes[i] = binaryString.charCodeAt(i);
610-
}
592+
` as unknown as Pick<TextDeltaRow, "seq" | "text_delta">[];
593+
594+
for (const row of textDeltas) {
595+
// Reconstruct SSE format from stored text delta
596+
const sseData = {
597+
type: "text-delta",
598+
delta: row.text_delta
599+
};
600+
const sseChunk = `data: ${JSON.stringify(sseData)}\n\n`;
601+
const bytes = new TextEncoder().encode(sseChunk);
611602
await writer.write(bytes);
612603
lastSeenSeq = row.seq;
613604
}
@@ -664,15 +655,12 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
664655
// Add stream metadata headers
665656
headers["X-Stream-Id"] = streamId;
666657
headers["X-Resumable"] = "true";
658+
headers["X-Stream-Complete"] = String(Boolean(dbState?.completed));
667659

668660
// Include messages in header if requested
669661
if (includeMessages) {
670662
try {
671-
// Use base64 encoding to avoid header encoding issues
672663
headers["X-Messages"] = encodeURIComponent(JSON.stringify(messages));
673-
674-
// Note: Assistant message content is delivered through the stream itself
675-
// No need to duplicate it in headers since it's already available via persistMessages()
676664
} catch (e) {
677665
console.error("Failed to add messages to header:", e);
678666
}
@@ -682,7 +670,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
682670
}
683671

684672
/**
685-
* Backfill any chunks that were written while this writer was joining
673+
* Backfill any text deltas that were written while this writer was joining
686674
*/
687675
private async _backfillGaps(
688676
streamId: string,
@@ -695,19 +683,20 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
695683
let cursor = startSeq;
696684
while (cursor < streamState.seq) {
697685
const gaps = this.sql`
698-
select seq, chunk from cf_ai_http_chat_chunks
686+
select seq, text_delta from cf_ai_http_chat_text_deltas
699687
where stream_id = ${streamId} and seq >= ${cursor} and seq < ${streamState.seq}
700688
order by seq asc
701-
` as unknown as Pick<ChunkRow, "seq" | "chunk">[];
689+
` as unknown as Pick<TextDeltaRow, "seq" | "text_delta">[];
702690

703691
for (const row of gaps) {
704692
try {
705-
const chunkBase64 = row.chunk;
706-
const binaryString = atob(chunkBase64);
707-
const bytes = new Uint8Array(binaryString.length);
708-
for (let i = 0; i < binaryString.length; i++) {
709-
bytes[i] = binaryString.charCodeAt(i);
710-
}
693+
// Reconstruct SSE format from stored text delta
694+
const sseData = {
695+
type: "text-delta",
696+
delta: row.text_delta
697+
};
698+
const sseChunk = `data: ${JSON.stringify(sseData)}\n\n`;
699+
const bytes = new TextEncoder().encode(sseChunk);
711700
await writer.write(bytes);
712701
cursor = row.seq + 1;
713702
} catch {
@@ -716,7 +705,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
716705
}
717706
}
718707

719-
// Check if more chunks arrived while we were backfilling
708+
// Check if more text deltas arrived while we were backfilling
720709
if (cursor >= streamState.seq) break;
721710
}
722711
}

0 commit comments

Comments
 (0)