Skip to content

Commit 235b3c6

Browse files
committed
fix persistance, cleanup streams manager
1 parent 623f942 commit 235b3c6

File tree

1 file changed

+68
-74
lines changed

1 file changed

+68
-74
lines changed

packages/agents/src/resumable-stream-manager.ts

Lines changed: 68 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,38 @@ import type {
55
} from "ai";
66
import type { AgentContext } from "./";
77

8-
// Type for SQL query function to avoid using 'any'
98
type SqlQueryFunction = <T = Record<string, string | number | boolean | null>>(
109
strings: TemplateStringsArray,
1110
...values: (string | number | boolean | null)[]
1211
) => T[];
1312

13+
interface StreamStateRow {
14+
stream_id: string;
15+
seq: number;
16+
fetching: number;
17+
completed: number;
18+
created_at?: string;
19+
updated_at?: string;
20+
headers?: string;
21+
}
22+
23+
interface ChunkRow {
24+
stream_id: string;
25+
seq: number;
26+
chunk: string;
27+
created_at?: string;
28+
}
29+
30+
interface StreamStatusRow {
31+
content: string;
32+
position: number;
33+
completed: number;
34+
created_at: string;
35+
updated_at: string;
36+
}
37+
1438
const decoder = new TextDecoder();
1539

16-
/**
17-
* Manages resumable streaming functionality with persistence and resumption capabilities
18-
*/
1940
export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
2041
/** Map of stream IDs to their current state for resumable streams */
2142
private _activeStreams: Map<
@@ -45,7 +66,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
4566
* Initialize database tables for resumable streaming
4667
*/
4768
private _initializeTables(): void {
48-
// Initialize stream state table for resumable streams
4969
this.sql`create table if not exists cf_ai_http_chat_streams (
5070
stream_id text primary key,
5171
seq integer not null default 0,
@@ -63,15 +83,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
6383
created_at datetime default current_timestamp,
6484
primary key (stream_id, seq)
6585
)`;
66-
67-
// Initialize assistant messages table for accumulated text
68-
this.sql`create table if not exists cf_ai_http_chat_assistant_messages (
69-
stream_id text primary key,
70-
content text not null,
71-
message_id text not null,
72-
created_at datetime default current_timestamp,
73-
updated_at datetime default current_timestamp
74-
)`;
7586
}
7687

7788
/**
@@ -96,7 +107,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
96107
const dbState = this.sql`
97108
select * from cf_ai_http_chat_streams
98109
where stream_id = ${actualStreamId}
99-
`[0] as { seq: number; fetching: number; completed: number } | undefined;
110+
`[0] as unknown as StreamStateRow | undefined;
100111

101112
if (dbState) {
102113
console.log(
@@ -183,7 +194,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
183194
const streamState = this.sql`
184195
select * from cf_ai_http_chat_streams
185196
where stream_id = ${streamId}
186-
`[0] as { seq: number; fetching: number; completed: number } | undefined;
197+
`[0] as unknown as StreamStateRow | undefined;
187198

188199
if (!streamState) {
189200
return new Response(JSON.stringify({ error: "Stream not found" }), {
@@ -222,15 +233,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
222233
const streamState = this.sql`
223234
select * from cf_ai_http_chat_streams
224235
where stream_id = ${streamId}
225-
`[0] as
226-
| {
227-
content: string;
228-
position: number;
229-
completed: number;
230-
created_at: string;
231-
updated_at: string;
232-
}
233-
| undefined;
236+
`[0] as unknown as StreamStatusRow | undefined;
234237

235238
if (!streamState) {
236239
return new Response(JSON.stringify({ error: "Stream not found" }), {
@@ -258,7 +261,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
258261
async clearStreams(): Promise<void> {
259262
this.sql`delete from cf_ai_http_chat_streams`;
260263
this.sql`delete from cf_ai_http_chat_chunks`;
261-
this.sql`delete from cf_ai_http_chat_assistant_messages`;
262264
this._activeStreams.clear();
263265
}
264266

@@ -282,14 +284,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
282284
where completed = 1 and updated_at < ${cutoffTime}
283285
)
284286
`;
285-
286-
this.sql`
287-
delete from cf_ai_http_chat_assistant_messages
288-
where stream_id in (
289-
select stream_id from cf_ai_http_chat_streams
290-
where completed = 1 and updated_at < ${cutoffTime}
291-
)
292-
`;
293287
}
294288

295289
/**
@@ -310,7 +304,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
310304
);
311305

312306
const response = await onChatMessage(
313-
async (_finishResult) => {
307+
async () => {
314308
// Mark stream as completed
315309
console.log(`[ResumableStreamManager] Stream ${streamId} finished`);
316310
this._markStreamCompleted(streamId);
@@ -422,36 +416,36 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
422416
}
423417

424418
// Broadcast to all active readers (writers)
425-
for (const writer of streamState.readers) {
419+
for (const readerOrWriter of streamState.readers) {
426420
try {
427-
if (writer instanceof WritableStreamDefaultWriter) {
428-
writer.write(value);
421+
if (readerOrWriter instanceof WritableStreamDefaultWriter) {
422+
readerOrWriter.write(value);
429423
} else {
430-
// Legacy support for ReadableStreamDefaultController
431-
(writer as ReadableStreamDefaultController).enqueue(value);
424+
// Handle ReadableStreamDefaultController
425+
if (
426+
"enqueue" in readerOrWriter &&
427+
typeof readerOrWriter.enqueue === "function"
428+
) {
429+
readerOrWriter.enqueue(value);
430+
}
432431
}
433432
} catch {
434433
// Reader might be closed
435-
streamState.readers.delete(writer);
434+
streamState.readers.delete(readerOrWriter);
436435
}
437436
}
438437
}
439438

440439
// Save assistant message if we collected any text
441440
if (assistantMessageText) {
442-
const assistantMessage: Message = {
441+
// Create assistant message with proper typing
442+
const assistantMessage = {
443443
id: assistantMessageId,
444-
role: "assistant",
445-
parts: [{ type: "text", text: assistantMessageText }]
446-
} as unknown as Message;
444+
role: "assistant" as const,
445+
parts: [{ type: "text" as const, text: assistantMessageText }]
446+
} as Message;
447447

448448
await persistMessages([...messages, assistantMessage]);
449-
450-
// Store accumulated assistant message for quick resume
451-
this.sql`
452-
insert into cf_ai_http_chat_assistant_messages (stream_id, content, message_id)
453-
values (${streamId}, ${assistantMessageText}, ${assistantMessageId})
454-
`;
455449
}
456450
} finally {
457451
// Mark stream as completed
@@ -470,7 +464,13 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
470464
if (readerOrWriter instanceof WritableStreamDefaultWriter) {
471465
readerOrWriter.close();
472466
} else {
473-
(readerOrWriter as ReadableStreamDefaultController).close();
467+
// Handle ReadableStreamDefaultController
468+
if (
469+
"close" in readerOrWriter &&
470+
typeof readerOrWriter.close === "function"
471+
) {
472+
readerOrWriter.close();
473+
}
474474
}
475475
} catch {}
476476
}
@@ -490,11 +490,10 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
490490
`[ResumableStreamManager] Creating client stream for ${streamId}`
491491
);
492492

493-
// Load from database (single source of truth)
494493
const dbState = this.sql`
495494
select * from cf_ai_http_chat_streams
496495
where stream_id = ${streamId}
497-
`[0] as { seq: number; fetching: number; completed: number } | undefined;
496+
`[0] as unknown as StreamStateRow | undefined;
498497

499498
console.log(`[ResumableStreamManager] DB state for ${streamId}:`, dbState);
500499

@@ -503,7 +502,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
503502
return new Response("Stream not found", { status: 404 });
504503
}
505504

506-
// Get or create in-memory state for active readers tracking
507505
let streamState = this._activeStreams.get(streamId);
508506
if (!streamState) {
509507
console.log(
@@ -545,18 +543,18 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
545543
select seq, chunk from cf_ai_http_chat_chunks
546544
where stream_id = ${streamId}
547545
order by seq asc
548-
`;
546+
` as unknown as Pick<ChunkRow, "seq" | "chunk">[];
549547

550548
for (const row of chunks) {
551549
// Decode base64 back to Uint8Array
552-
const chunkBase64 = row.chunk as string;
550+
const chunkBase64 = row.chunk;
553551
const binaryString = atob(chunkBase64);
554552
const bytes = new Uint8Array(binaryString.length);
555553
for (let i = 0; i < binaryString.length; i++) {
556554
bytes[i] = binaryString.charCodeAt(i);
557555
}
558556
await writer.write(bytes);
559-
lastSeenSeq = row.seq as number;
557+
lastSeenSeq = row.seq;
560558
}
561559
});
562560

@@ -602,18 +600,8 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
602600
// Use base64 encoding to avoid header encoding issues
603601
headers["X-Messages"] = encodeURIComponent(JSON.stringify(messages));
604602

605-
// Include accumulated assistant message if exists
606-
const assistantMsg = this.sql`
607-
select content, message_id from cf_ai_http_chat_assistant_messages
608-
where stream_id = ${streamId}
609-
`[0] as { content: string; message_id: string } | undefined;
610-
611-
if (assistantMsg) {
612-
headers["X-Assistant-Content"] = encodeURIComponent(
613-
assistantMsg.content
614-
);
615-
headers["X-Assistant-Id"] = assistantMsg.message_id;
616-
}
603+
// Note: Assistant message content is delivered through the stream itself
604+
// No need to duplicate it in headers since it's already available via persistMessages()
617605
} catch (e) {
618606
console.error("Failed to add messages to header:", e);
619607
}
@@ -639,18 +627,18 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
639627
select seq, chunk from cf_ai_http_chat_chunks
640628
where stream_id = ${streamId} and seq >= ${cursor} and seq < ${streamState.seq}
641629
order by seq asc
642-
`;
630+
` as unknown as Pick<ChunkRow, "seq" | "chunk">[];
643631

644632
for (const row of gaps) {
645633
try {
646-
const chunkBase64 = row.chunk as string;
634+
const chunkBase64 = row.chunk;
647635
const binaryString = atob(chunkBase64);
648636
const bytes = new Uint8Array(binaryString.length);
649637
for (let i = 0; i < binaryString.length; i++) {
650638
bytes[i] = binaryString.charCodeAt(i);
651639
}
652640
await writer.write(bytes);
653-
cursor = (row.seq as number) + 1;
641+
cursor = row.seq + 1;
654642
} catch {
655643
// Writer closed
656644
return;
@@ -678,7 +666,13 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
678666
if (readerOrWriter instanceof WritableStreamDefaultWriter) {
679667
readerOrWriter.close();
680668
} else {
681-
(readerOrWriter as ReadableStreamDefaultController).close();
669+
// Handle ReadableStreamDefaultController
670+
if (
671+
"close" in readerOrWriter &&
672+
typeof readerOrWriter.close === "function"
673+
) {
674+
readerOrWriter.close();
675+
}
682676
}
683677
} catch {}
684678
}

0 commit comments

Comments
 (0)