@@ -116,8 +116,19 @@ export function useAgentStream(
116
116
textContentRef . current = textContent ;
117
117
} , [ textContent ] ) ;
118
118
119
- // Update refs if threadId changes (no persistence across navigation)
119
+ // On thread change, ensure any existing stream is cleaned up to avoid stale subscriptions
120
120
useEffect ( ( ) => {
121
+ const previousThreadId = threadIdRef . current ;
122
+ if ( previousThreadId && previousThreadId !== threadId && streamCleanupRef . current ) {
123
+ // Close the existing stream for the previous thread
124
+ streamCleanupRef . current ( ) ;
125
+ streamCleanupRef . current = null ;
126
+ setStatus ( 'idle' ) ;
127
+ setTextContent ( [ ] ) ;
128
+ setToolCall ( null ) ;
129
+ setAgentRunId ( null ) ;
130
+ currentRunIdRef . current = null ;
131
+ }
121
132
threadIdRef . current = threadId ;
122
133
} , [ threadId ] ) ;
123
134
@@ -547,12 +558,38 @@ export function useAgentStream(
547
558
`[useAgentStream] Agent run ${ runId } confirmed running. Setting up EventSource.` ,
548
559
) ;
549
560
const cleanup = streamAgent ( runId , {
550
- onMessage : handleStreamMessage ,
551
- onError : handleStreamError ,
552
- onClose : handleStreamClose ,
561
+ onMessage : ( data ) => {
562
+ // Ignore messages if threadId changed while the EventSource stayed open
563
+ if ( threadIdRef . current !== threadId ) return ;
564
+ handleStreamMessage ( data ) ;
565
+ } ,
566
+ onError : ( err ) => {
567
+ if ( threadIdRef . current !== threadId ) return ;
568
+ handleStreamError ( err ) ;
569
+ } ,
570
+ onClose : ( ) => {
571
+ if ( threadIdRef . current !== threadId ) return ;
572
+ handleStreamClose ( ) ;
573
+ } ,
553
574
} ) ;
554
575
streamCleanupRef . current = cleanup ;
555
576
// Status will be updated to 'streaming' by the first message received in handleStreamMessage
577
+ // If for some reason no message arrives shortly, verify liveness again to avoid zombie state
578
+ setTimeout ( async ( ) => {
579
+ if ( ! isMountedRef . current ) return ;
580
+ if ( currentRunIdRef . current !== runId ) return ; // Another run started
581
+ if ( statusRef . current === 'streaming' ) return ; // Already streaming
582
+ try {
583
+ const latest = await getAgentStatus ( runId ) ;
584
+ if ( ! isMountedRef . current ) return ;
585
+ if ( currentRunIdRef . current !== runId ) return ;
586
+ if ( latest . status !== 'running' ) {
587
+ finalizeStream ( mapAgentStatus ( latest . status ) || 'agent_not_running' , runId ) ;
588
+ }
589
+ } catch {
590
+ // ignore
591
+ }
592
+ } , 1500 ) ;
556
593
} catch ( err ) {
557
594
if ( ! isMountedRef . current ) return ; // Check mount status after async call
558
595
0 commit comments