@@ -5,17 +5,38 @@ import type {
55} from "ai" ;
66import type { AgentContext } from "./" ;
77
8- // Type for SQL query function to avoid using 'any'
98type SqlQueryFunction = < T = Record < string , string | number | boolean | null > > (
109 strings : TemplateStringsArray ,
1110 ...values : ( string | number | boolean | null ) [ ]
1211) => T [ ] ;
1312
13+ interface StreamStateRow {
14+ stream_id : string ;
15+ seq : number ;
16+ fetching : number ;
17+ completed : number ;
18+ created_at ?: string ;
19+ updated_at ?: string ;
20+ headers ?: string ;
21+ }
22+
23+ interface ChunkRow {
24+ stream_id : string ;
25+ seq : number ;
26+ chunk : string ;
27+ created_at ?: string ;
28+ }
29+
30+ interface StreamStatusRow {
31+ content : string ;
32+ position : number ;
33+ completed : number ;
34+ created_at : string ;
35+ updated_at : string ;
36+ }
37+
1438const decoder = new TextDecoder ( ) ;
1539
16- /**
17- * Manages resumable streaming functionality with persistence and resumption capabilities
18- */
1940export class ResumableStreamManager < Message extends ChatMessage = ChatMessage > {
2041 /** Map of stream IDs to their current state for resumable streams */
2142 private _activeStreams : Map <
@@ -45,7 +66,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
4566 * Initialize database tables for resumable streaming
4667 */
4768 private _initializeTables ( ) : void {
48- // Initialize stream state table for resumable streams
4969 this . sql `create table if not exists cf_ai_http_chat_streams (
5070 stream_id text primary key,
5171 seq integer not null default 0,
@@ -63,15 +83,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
6383 created_at datetime default current_timestamp,
6484 primary key (stream_id, seq)
6585 )` ;
66-
67- // Initialize assistant messages table for accumulated text
68- this . sql `create table if not exists cf_ai_http_chat_assistant_messages (
69- stream_id text primary key,
70- content text not null,
71- message_id text not null,
72- created_at datetime default current_timestamp,
73- updated_at datetime default current_timestamp
74- )` ;
7586 }
7687
7788 /**
@@ -96,7 +107,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
96107 const dbState = this . sql `
97108 select * from cf_ai_http_chat_streams
98109 where stream_id = ${ actualStreamId }
99- ` [ 0 ] as { seq : number ; fetching : number ; completed : number } | undefined ;
110+ ` [ 0 ] as unknown as StreamStateRow | undefined ;
100111
101112 if ( dbState ) {
102113 console . log (
@@ -183,7 +194,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
183194 const streamState = this . sql `
184195 select * from cf_ai_http_chat_streams
185196 where stream_id = ${ streamId }
186- ` [ 0 ] as { seq : number ; fetching : number ; completed : number } | undefined ;
197+ ` [ 0 ] as unknown as StreamStateRow | undefined ;
187198
188199 if ( ! streamState ) {
189200 return new Response ( JSON . stringify ( { error : "Stream not found" } ) , {
@@ -222,15 +233,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
222233 const streamState = this . sql `
223234 select * from cf_ai_http_chat_streams
224235 where stream_id = ${ streamId }
225- ` [ 0 ] as
226- | {
227- content : string ;
228- position : number ;
229- completed : number ;
230- created_at : string ;
231- updated_at : string ;
232- }
233- | undefined ;
236+ ` [ 0 ] as unknown as StreamStatusRow | undefined ;
234237
235238 if ( ! streamState ) {
236239 return new Response ( JSON . stringify ( { error : "Stream not found" } ) , {
@@ -258,7 +261,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
258261 async clearStreams ( ) : Promise < void > {
259262 this . sql `delete from cf_ai_http_chat_streams` ;
260263 this . sql `delete from cf_ai_http_chat_chunks` ;
261- this . sql `delete from cf_ai_http_chat_assistant_messages` ;
262264 this . _activeStreams . clear ( ) ;
263265 }
264266
@@ -282,14 +284,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
282284 where completed = 1 and updated_at < ${ cutoffTime }
283285 )
284286 ` ;
285-
286- this . sql `
287- delete from cf_ai_http_chat_assistant_messages
288- where stream_id in (
289- select stream_id from cf_ai_http_chat_streams
290- where completed = 1 and updated_at < ${ cutoffTime }
291- )
292- ` ;
293287 }
294288
295289 /**
@@ -310,7 +304,7 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
310304 ) ;
311305
312306 const response = await onChatMessage (
313- async ( _finishResult ) => {
307+ async ( ) => {
314308 // Mark stream as completed
315309 console . log ( `[ResumableStreamManager] Stream ${ streamId } finished` ) ;
316310 this . _markStreamCompleted ( streamId ) ;
@@ -422,36 +416,36 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
422416 }
423417
424418 // Broadcast to all active readers (writers)
425- for ( const writer of streamState . readers ) {
419+ for ( const readerOrWriter of streamState . readers ) {
426420 try {
427- if ( writer instanceof WritableStreamDefaultWriter ) {
428- writer . write ( value ) ;
421+ if ( readerOrWriter instanceof WritableStreamDefaultWriter ) {
422+ readerOrWriter . write ( value ) ;
429423 } else {
430- // Legacy support for ReadableStreamDefaultController
431- ( writer as ReadableStreamDefaultController ) . enqueue ( value ) ;
424+ // Handle ReadableStreamDefaultController
425+ if (
426+ "enqueue" in readerOrWriter &&
427+ typeof readerOrWriter . enqueue === "function"
428+ ) {
429+ readerOrWriter . enqueue ( value ) ;
430+ }
432431 }
433432 } catch {
434433 // Reader might be closed
435- streamState . readers . delete ( writer ) ;
434+ streamState . readers . delete ( readerOrWriter ) ;
436435 }
437436 }
438437 }
439438
440439 // Save assistant message if we collected any text
441440 if ( assistantMessageText ) {
442- const assistantMessage : Message = {
441+ // Create assistant message with proper typing
442+ const assistantMessage = {
443443 id : assistantMessageId ,
444- role : "assistant" ,
445- parts : [ { type : "text" , text : assistantMessageText } ]
446- } as unknown as Message ;
444+ role : "assistant" as const ,
445+ parts : [ { type : "text" as const , text : assistantMessageText } ]
446+ } as Message ;
447447
448448 await persistMessages ( [ ...messages , assistantMessage ] ) ;
449-
450- // Store accumulated assistant message for quick resume
451- this . sql `
452- insert into cf_ai_http_chat_assistant_messages (stream_id, content, message_id)
453- values (${ streamId } , ${ assistantMessageText } , ${ assistantMessageId } )
454- ` ;
455449 }
456450 } finally {
457451 // Mark stream as completed
@@ -470,7 +464,13 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
470464 if ( readerOrWriter instanceof WritableStreamDefaultWriter ) {
471465 readerOrWriter . close ( ) ;
472466 } else {
473- ( readerOrWriter as ReadableStreamDefaultController ) . close ( ) ;
467+ // Handle ReadableStreamDefaultController
468+ if (
469+ "close" in readerOrWriter &&
470+ typeof readerOrWriter . close === "function"
471+ ) {
472+ readerOrWriter . close ( ) ;
473+ }
474474 }
475475 } catch { }
476476 }
@@ -490,11 +490,10 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
490490 `[ResumableStreamManager] Creating client stream for ${ streamId } `
491491 ) ;
492492
493- // Load from database (single source of truth)
494493 const dbState = this . sql `
495494 select * from cf_ai_http_chat_streams
496495 where stream_id = ${ streamId }
497- ` [ 0 ] as { seq : number ; fetching : number ; completed : number } | undefined ;
496+ ` [ 0 ] as unknown as StreamStateRow | undefined ;
498497
499498 console . log ( `[ResumableStreamManager] DB state for ${ streamId } :` , dbState ) ;
500499
@@ -503,7 +502,6 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
503502 return new Response ( "Stream not found" , { status : 404 } ) ;
504503 }
505504
506- // Get or create in-memory state for active readers tracking
507505 let streamState = this . _activeStreams . get ( streamId ) ;
508506 if ( ! streamState ) {
509507 console . log (
@@ -545,18 +543,18 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
545543 select seq, chunk from cf_ai_http_chat_chunks
546544 where stream_id = ${ streamId }
547545 order by seq asc
548- ` ;
546+ ` as unknown as Pick < ChunkRow , "seq" | "chunk" > [ ] ;
549547
550548 for ( const row of chunks ) {
551549 // Decode base64 back to Uint8Array
552- const chunkBase64 = row . chunk as string ;
550+ const chunkBase64 = row . chunk ;
553551 const binaryString = atob ( chunkBase64 ) ;
554552 const bytes = new Uint8Array ( binaryString . length ) ;
555553 for ( let i = 0 ; i < binaryString . length ; i ++ ) {
556554 bytes [ i ] = binaryString . charCodeAt ( i ) ;
557555 }
558556 await writer . write ( bytes ) ;
559- lastSeenSeq = row . seq as number ;
557+ lastSeenSeq = row . seq ;
560558 }
561559 } ) ;
562560
@@ -602,18 +600,8 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
602600 // Use base64 encoding to avoid header encoding issues
603601 headers [ "X-Messages" ] = encodeURIComponent ( JSON . stringify ( messages ) ) ;
604602
605- // Include accumulated assistant message if exists
606- const assistantMsg = this . sql `
607- select content, message_id from cf_ai_http_chat_assistant_messages
608- where stream_id = ${ streamId }
609- ` [ 0 ] as { content : string ; message_id : string } | undefined ;
610-
611- if ( assistantMsg ) {
612- headers [ "X-Assistant-Content" ] = encodeURIComponent (
613- assistantMsg . content
614- ) ;
615- headers [ "X-Assistant-Id" ] = assistantMsg . message_id ;
616- }
603+ // Note: Assistant message content is delivered through the stream itself
604+ // No need to duplicate it in headers since it's already available via persistMessages()
617605 } catch ( e ) {
618606 console . error ( "Failed to add messages to header:" , e ) ;
619607 }
@@ -639,18 +627,18 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
639627 select seq, chunk from cf_ai_http_chat_chunks
640628 where stream_id = ${ streamId } and seq >= ${ cursor } and seq < ${ streamState . seq }
641629 order by seq asc
642- ` ;
630+ ` as unknown as Pick < ChunkRow , "seq" | "chunk" > [ ] ;
643631
644632 for ( const row of gaps ) {
645633 try {
646- const chunkBase64 = row . chunk as string ;
634+ const chunkBase64 = row . chunk ;
647635 const binaryString = atob ( chunkBase64 ) ;
648636 const bytes = new Uint8Array ( binaryString . length ) ;
649637 for ( let i = 0 ; i < binaryString . length ; i ++ ) {
650638 bytes [ i ] = binaryString . charCodeAt ( i ) ;
651639 }
652640 await writer . write ( bytes ) ;
653- cursor = ( row . seq as number ) + 1 ;
641+ cursor = row . seq + 1 ;
654642 } catch {
655643 // Writer closed
656644 return ;
@@ -678,7 +666,13 @@ export class ResumableStreamManager<Message extends ChatMessage = ChatMessage> {
678666 if ( readerOrWriter instanceof WritableStreamDefaultWriter ) {
679667 readerOrWriter . close ( ) ;
680668 } else {
681- ( readerOrWriter as ReadableStreamDefaultController ) . close ( ) ;
669+ // Handle ReadableStreamDefaultController
670+ if (
671+ "close" in readerOrWriter &&
672+ typeof readerOrWriter . close === "function"
673+ ) {
674+ readerOrWriter . close ( ) ;
675+ }
682676 }
683677 } catch { }
684678 }
0 commit comments