Skip to content

Commit 4bb4595

Browse files
authored
Merge pull request #1260 from kubet/fix/stream-restore-on-thread-navigation
Fix/stream restore on thread navigation
2 parents a9087fd + 42c656d commit 4bb4595

File tree

3 files changed

+94
-29
lines changed

3 files changed

+94
-29
lines changed

frontend/src/app/(dashboard)/projects/[projectId]/thread/[threadId]/page.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ export default function ThreadPage({
137137
const agent = threadAgentData?.agent;
138138
const workflowId = threadQuery.data?.metadata?.workflow_id;
139139

140+
// Invalidate cache when thread changes to get fresh data
141+
useEffect(() => {
142+
queryClient.invalidateQueries({ queryKey: threadKeys.agentRuns(threadId) });
143+
queryClient.invalidateQueries({ queryKey: threadKeys.messages(threadId) });
144+
}, [threadId, queryClient]);
145+
140146
// Set initial selected agent from thread data
141147
useEffect(() => {
142148
if (threadAgentData?.agent && !selectedAgentId) {

frontend/src/app/(dashboard)/projects/[projectId]/thread/_hooks/useThreadData.ts

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
4040
const messagesLoadedRef = useRef(false);
4141
const agentRunsCheckedRef = useRef(false);
4242
const hasInitiallyScrolled = useRef<boolean>(false);
43+
4344

4445
const threadQuery = useThreadQuery(threadId);
4546
const messagesQuery = useMessagesQuery(threadId);
4647
const projectQuery = useProjectQuery(projectId);
4748
const agentRunsQuery = useAgentRunsQuery(threadId);
48-
49+
50+
// (debug logs removed)
4951

5052
useEffect(() => {
5153
let isMounted = true;
@@ -54,6 +56,9 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
5456
agentRunsCheckedRef.current = false;
5557
messagesLoadedRef.current = false;
5658
initialLoadCompleted.current = false;
59+
60+
// Clear messages on thread change; fresh data will set messages
61+
setMessages([]);
5762

5863
async function initializeData() {
5964
if (!initialLoadCompleted.current) setIsLoading(true);
@@ -78,7 +83,7 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
7883
}
7984

8085
if (messagesQuery.data && !messagesLoadedRef.current) {
81-
86+
// (debug logs removed)
8287

8388
const unifiedMessages = (messagesQuery.data || [])
8489
.filter((msg) => msg.type !== 'status')
@@ -91,9 +96,28 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
9196
metadata: msg.metadata || '{}',
9297
created_at: msg.created_at || new Date().toISOString(),
9398
updated_at: msg.updated_at || new Date().toISOString(),
99+
agent_id: (msg as any).agent_id,
100+
agents: (msg as any).agents,
94101
}));
95102

96-
setMessages(unifiedMessages);
103+
// Merge with any local messages that are not present in server data yet
104+
const serverIds = new Set(
105+
unifiedMessages.map((m) => m.message_id).filter(Boolean) as string[],
106+
);
107+
const localExtras = (messages || []).filter(
108+
(m) =>
109+
!m.message_id ||
110+
(typeof m.message_id === 'string' && m.message_id.startsWith('temp-')) ||
111+
!serverIds.has(m.message_id as string),
112+
);
113+
const mergedMessages = [...unifiedMessages, ...localExtras].sort((a, b) => {
114+
const aTime = a.created_at ? new Date(a.created_at).getTime() : 0;
115+
const bTime = b.created_at ? new Date(b.created_at).getTime() : 0;
116+
return aTime - bTime;
117+
});
118+
119+
setMessages(mergedMessages);
120+
// Messages set only from server merge; no cross-thread cache
97121
messagesLoadedRef.current = true;
98122

99123
if (!hasInitiallyScrolled.current) {
@@ -102,7 +126,7 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
102126
}
103127

104128
if (agentRunsQuery.data && !agentRunsCheckedRef.current && isMounted) {
105-
129+
// (debug logs removed)
106130

107131
agentRunsCheckedRef.current = true;
108132

@@ -169,12 +193,17 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
169193
agentRunsQuery.data
170194
]);
171195

172-
// Disabled automatic message replacement to prevent optimistic message deletion
173-
// Messages are now only loaded on initial page load and updated via streaming
196+
// Force message reload when thread changes or new data arrives
174197
useEffect(() => {
175-
if (messagesQuery.data && messagesQuery.status === 'success') {
176-
// Only load messages on initial load, not when agent status changes
177-
if (!isLoading && messages.length === 0) {
198+
if (messagesQuery.data && messagesQuery.status === 'success' && !isLoading) {
199+
// (debug logs removed)
200+
201+
// Always reload messages when thread data changes or we have more raw messages than processed
202+
const shouldReload = messages.length === 0 || messagesQuery.data.length > messages.length + 50; // Allow for status messages
203+
204+
if (shouldReload) {
205+
// (debug logs removed)
206+
178207
const unifiedMessages = (messagesQuery.data || [])
179208
.filter((msg) => msg.type !== 'status')
180209
.map((msg: ApiMessageType) => ({
@@ -190,7 +219,28 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
190219
agents: (msg as any).agents,
191220
}));
192221

193-
setMessages(unifiedMessages);
222+
// Merge strategy: preserve any local (optimistic/streamed) messages not in server yet
223+
setMessages((prev) => {
224+
const serverIds = new Set(
225+
unifiedMessages.map((m) => m.message_id).filter(Boolean) as string[],
226+
);
227+
const localExtras = (prev || []).filter(
228+
(m) =>
229+
!m.message_id ||
230+
(typeof m.message_id === 'string' && m.message_id.startsWith('temp-')) ||
231+
!serverIds.has(m.message_id as string),
232+
);
233+
const merged = [...unifiedMessages, ...localExtras].sort((a, b) => {
234+
const aTime = a.created_at ? new Date(a.created_at).getTime() : 0;
235+
const bTime = b.created_at ? new Date(b.created_at).getTime() : 0;
236+
return aTime - bTime;
237+
});
238+
239+
// Messages set only from server merge; no cross-thread cache
240+
return merged;
241+
});
242+
} else {
243+
// (debug logs removed)
194244
}
195245
}
196246
}, [messagesQuery.data, messagesQuery.status, isLoading, messages.length, threadId]);

frontend/src/hooks/useAgentStream.ts

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,25 @@ export function useAgentStream(
9898
.reduce((acc, curr) => acc + curr.content, '');
9999
}, [textContent]);
100100

101-
// Update refs if threadId or setMessages changes
101+
// Refs to capture current state for persistence
102+
const statusRef = useRef(status);
103+
const agentRunIdRef = useRef(agentRunId);
104+
const textContentRef = useRef(textContent);
105+
106+
// Update refs whenever state changes
107+
useEffect(() => {
108+
statusRef.current = status;
109+
}, [status]);
110+
111+
useEffect(() => {
112+
agentRunIdRef.current = agentRunId;
113+
}, [agentRunId]);
114+
115+
useEffect(() => {
116+
textContentRef.current = textContent;
117+
}, [textContent]);
118+
119+
// Update refs if threadId changes (no persistence across navigation)
102120
useEffect(() => {
103121
threadIdRef.current = threadId;
104122
}, [threadId]);
@@ -171,7 +189,7 @@ export function useAgentStream(
171189
updateStatus(finalStatus);
172190
setAgentRunId(null);
173191
currentRunIdRef.current = null;
174-
192+
175193
// Message refetch disabled - optimistic messages will handle updates
176194

177195
// If the run was stopped or completed, try to get final status to update nonRunning set (keep this)
@@ -470,23 +488,14 @@ export function useAgentStream(
470488
useEffect(() => {
471489
isMountedRef.current = true;
472490

473-
// Cleanup function for when the component unmounts or agentRunId changes
491+
// Cleanup function - be more conservative about stream cleanup
474492
return () => {
475493
isMountedRef.current = false;
476-
console.log(
477-
'[useAgentStream] Unmounting or agentRunId changing. Cleaning up stream.',
478-
);
479-
if (streamCleanupRef.current) {
480-
streamCleanupRef.current();
481-
streamCleanupRef.current = null;
482-
}
483-
// Reset state on unmount if needed, though finalizeStream should handle most cases
484-
setStatus('idle');
485-
setTextContent([]);
486-
setToolCall(null);
487-
setError(null);
488-
setAgentRunId(null);
489-
currentRunIdRef.current = null;
494+
console.log('[useAgentStream] Component unmounting/navigation.');
495+
496+
// Don't automatically cleanup streams on navigation
497+
// Only set mounted flag to false to prevent new operations
498+
// Streams will be cleaned up when they naturally complete or on explicit stop
490499
};
491500
}, []); // Empty dependency array for mount/unmount effect
492501

@@ -496,13 +505,13 @@ export function useAgentStream(
496505
async (runId: string) => {
497506
if (!isMountedRef.current) return;
498507
console.log(
499-
`[useAgentStream] Received request to start streaming for ${runId}`,
508+
`[STREAM_FIX] Thread ${threadId} received request to start streaming for ${runId}`,
500509
);
501510

502511
// Clean up any previous stream
503512
if (streamCleanupRef.current) {
504513
console.log(
505-
'[useAgentStream] Cleaning up existing stream before starting new one.',
514+
`[STREAM_FIX] Thread ${threadId} cleaning up existing stream before starting new one.`,
506515
);
507516
streamCleanupRef.current();
508517
streamCleanupRef.current = null;

0 commit comments

Comments
 (0)