Skip to content

Commit cbca760

Browse files
committed
fix: stream fix again
1 parent 6347db1 commit cbca760

File tree

3 files changed

+180
-28
lines changed

3 files changed

+180
-28
lines changed

frontend/src/app/(dashboard)/projects/[projectId]/thread/[threadId]/page.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ export default function ThreadPage({
137137
const agent = threadAgentData?.agent;
138138
const workflowId = threadQuery.data?.metadata?.workflow_id;
139139

140+
// Invalidate cache when thread changes to get fresh data
141+
useEffect(() => {
142+
queryClient.invalidateQueries({ queryKey: threadKeys.agentRuns(threadId) });
143+
queryClient.invalidateQueries({ queryKey: threadKeys.messages(threadId) });
144+
}, [threadId, queryClient]);
145+
140146
// Set initial selected agent from thread data
141147
useEffect(() => {
142148
if (threadAgentData?.agent && !selectedAgentId) {

frontend/src/app/(dashboard)/projects/[projectId]/thread/_hooks/useThreadData.ts

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,44 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
4040
const messagesLoadedRef = useRef(false);
4141
const agentRunsCheckedRef = useRef(false);
4242
const hasInitiallyScrolled = useRef<boolean>(false);
43+
44+
// Store messages by thread ID to preserve across navigation
45+
const threadMessagesRef = useRef<Map<string, UnifiedMessage[]>>(new Map());
46+
// Track the last active thread id to correctly cache on navigation
47+
const lastThreadIdRef = useRef<string>(threadId);
4348

4449
const threadQuery = useThreadQuery(threadId);
4550
const messagesQuery = useMessagesQuery(threadId);
4651
const projectQuery = useProjectQuery(projectId);
4752
const agentRunsQuery = useAgentRunsQuery(threadId);
48-
53+
54+
// (debug logs removed)
4955

5056
useEffect(() => {
5157
let isMounted = true;
5258

59+
// Store current messages before switching threads using the actual last thread id
60+
const previousThreadId = lastThreadIdRef.current;
61+
if (previousThreadId && previousThreadId !== threadId && messages.length > 0) {
62+
threadMessagesRef.current.set(previousThreadId, [...messages]);
63+
}
64+
5365
// Reset refs when thread changes
66+
// (debug logs removed)
5467
agentRunsCheckedRef.current = false;
5568
messagesLoadedRef.current = false;
5669
initialLoadCompleted.current = false;
70+
71+
// Restore cached messages for this thread if available
72+
const cachedMessages = threadMessagesRef.current.get(threadId);
73+
if (cachedMessages && cachedMessages.length > 0) {
74+
setMessages(cachedMessages);
75+
} else {
76+
setMessages([]);
77+
}
78+
79+
// Update last thread id tracker to the new thread
80+
lastThreadIdRef.current = threadId;
5781

5882
async function initializeData() {
5983
if (!initialLoadCompleted.current) setIsLoading(true);
@@ -78,7 +102,7 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
78102
}
79103

80104
if (messagesQuery.data && !messagesLoadedRef.current) {
81-
105+
// (debug logs removed)
82106

83107
const unifiedMessages = (messagesQuery.data || [])
84108
.filter((msg) => msg.type !== 'status')
@@ -91,9 +115,30 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
91115
metadata: msg.metadata || '{}',
92116
created_at: msg.created_at || new Date().toISOString(),
93117
updated_at: msg.updated_at || new Date().toISOString(),
118+
agent_id: (msg as any).agent_id,
119+
agents: (msg as any).agents,
94120
}));
95121

96-
setMessages(unifiedMessages);
122+
// Merge with any local messages that are not present in server data yet
123+
const serverIds = new Set(
124+
unifiedMessages.map((m) => m.message_id).filter(Boolean) as string[],
125+
);
126+
const localExtras = (messages || []).filter(
127+
(m) =>
128+
!m.message_id ||
129+
(typeof m.message_id === 'string' && m.message_id.startsWith('temp-')) ||
130+
!serverIds.has(m.message_id as string),
131+
);
132+
const mergedMessages = [...unifiedMessages, ...localExtras].sort((a, b) => {
133+
const aTime = a.created_at ? new Date(a.created_at).getTime() : 0;
134+
const bTime = b.created_at ? new Date(b.created_at).getTime() : 0;
135+
return aTime - bTime;
136+
});
137+
138+
setMessages(mergedMessages);
139+
// Update cache for this thread
140+
threadMessagesRef.current.set(threadId, [...mergedMessages]);
141+
// (debug logs removed)
97142
messagesLoadedRef.current = true;
98143

99144
if (!hasInitiallyScrolled.current) {
@@ -102,7 +147,7 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
102147
}
103148

104149
if (agentRunsQuery.data && !agentRunsCheckedRef.current && isMounted) {
105-
150+
// (debug logs removed)
106151

107152
agentRunsCheckedRef.current = true;
108153

@@ -169,12 +214,17 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
169214
agentRunsQuery.data
170215
]);
171216

172-
// Disabled automatic message replacement to prevent optimistic message deletion
173-
// Messages are now only loaded on initial page load and updated via streaming
217+
// Force message reload when thread changes or new data arrives
174218
useEffect(() => {
175-
if (messagesQuery.data && messagesQuery.status === 'success') {
176-
// Only load messages on initial load, not when agent status changes
177-
if (!isLoading && messages.length === 0) {
219+
if (messagesQuery.data && messagesQuery.status === 'success' && !isLoading) {
220+
// (debug logs removed)
221+
222+
// Always reload messages when thread data changes or we have more raw messages than processed
223+
const shouldReload = messages.length === 0 || messagesQuery.data.length > messages.length + 50; // Allow for status messages
224+
225+
if (shouldReload) {
226+
// (debug logs removed)
227+
178228
const unifiedMessages = (messagesQuery.data || [])
179229
.filter((msg) => msg.type !== 'status')
180230
.map((msg: ApiMessageType) => ({
@@ -190,14 +240,47 @@ export function useThreadData(threadId: string, projectId: string): UseThreadDat
190240
agents: (msg as any).agents,
191241
}));
192242

193-
setMessages(unifiedMessages);
243+
// Merge strategy: preserve any local (optimistic/streamed) messages not in server yet
244+
setMessages((prev) => {
245+
const serverIds = new Set(
246+
unifiedMessages.map((m) => m.message_id).filter(Boolean) as string[],
247+
);
248+
const localExtras = (prev || []).filter(
249+
(m) =>
250+
!m.message_id ||
251+
(typeof m.message_id === 'string' && m.message_id.startsWith('temp-')) ||
252+
!serverIds.has(m.message_id as string),
253+
);
254+
const merged = [...unifiedMessages, ...localExtras].sort((a, b) => {
255+
const aTime = a.created_at ? new Date(a.created_at).getTime() : 0;
256+
const bTime = b.created_at ? new Date(b.created_at).getTime() : 0;
257+
return aTime - bTime;
258+
});
259+
260+
// Update cache for this thread
261+
threadMessagesRef.current.set(threadId, [...merged]);
262+
// (debug logs removed)
263+
return merged;
264+
});
265+
} else {
266+
// (debug logs removed)
194267
}
195268
}
196269
}, [messagesQuery.data, messagesQuery.status, isLoading, messages.length, threadId]);
197270

271+
// Wrap setMessages to also update the cache
272+
const setMessagesWithCache = (messagesOrUpdater: UnifiedMessage[] | ((prev: UnifiedMessage[]) => UnifiedMessage[])) => {
273+
setMessages((prev) => {
274+
const newMessages = typeof messagesOrUpdater === 'function' ? messagesOrUpdater(prev) : messagesOrUpdater;
275+
// Update cache whenever messages change
276+
threadMessagesRef.current.set(threadId, [...newMessages]);
277+
return newMessages;
278+
});
279+
};
280+
198281
return {
199282
messages,
200-
setMessages,
283+
setMessages: setMessagesWithCache,
201284
project,
202285
sandboxId,
203286
projectName,

frontend/src/hooks/useAgentStream.ts

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,77 @@ export function useAgentStream(
9898
.reduce((acc, curr) => acc + curr.content, '');
9999
}, [textContent]);
100100

101+
// Refs to capture current state for persistence
102+
const statusRef = useRef(status);
103+
const agentRunIdRef = useRef(agentRunId);
104+
const textContentRef = useRef(textContent);
105+
106+
// Update refs whenever state changes
107+
useEffect(() => {
108+
statusRef.current = status;
109+
}, [status]);
110+
111+
useEffect(() => {
112+
agentRunIdRef.current = agentRunId;
113+
}, [agentRunId]);
114+
115+
useEffect(() => {
116+
textContentRef.current = textContent;
117+
}, [textContent]);
118+
101119
// Update refs if threadId or setMessages changes
102120
useEffect(() => {
121+
const previousThreadId = threadIdRef.current;
122+
123+
// If we're switching threads and have an active stream, persist it and clean up
124+
if (previousThreadId && previousThreadId !== threadId && streamCleanupRef.current) {
125+
// Use refs to get current state values
126+
const currentState = {
127+
status: statusRef.current,
128+
agentRunId: agentRunIdRef.current,
129+
textContent: textContentRef.current,
130+
timestamp: Date.now()
131+
};
132+
sessionStorage.setItem(`stream_state_${previousThreadId}`, JSON.stringify(currentState));
133+
134+
// Clean up current stream
135+
streamCleanupRef.current();
136+
streamCleanupRef.current = null;
137+
setStatus('idle');
138+
setTextContent([]);
139+
setToolCall(null);
140+
setAgentRunId(null);
141+
currentRunIdRef.current = null;
142+
}
143+
103144
threadIdRef.current = threadId;
145+
146+
// Check if we have persisted stream state for this thread
147+
const persistedKey = `stream_state_${threadId}`;
148+
const persistedState = sessionStorage.getItem(persistedKey);
149+
150+
if (persistedState && previousThreadId !== threadId) {
151+
try {
152+
const parsed = JSON.parse(persistedState);
153+
const stateAge = Date.now() - (parsed.timestamp || 0);
154+
155+
// Only restore if state is recent (< 5 minutes) and was streaming
156+
if (parsed.status === 'streaming' && parsed.agentRunId && stateAge < 5 * 60 * 1000) {
157+
setStatus('streaming');
158+
setAgentRunId(parsed.agentRunId);
159+
currentRunIdRef.current = parsed.agentRunId;
160+
setTextContent(parsed.textContent || []);
161+
162+
// Clear the persisted state since we've restored it
163+
sessionStorage.removeItem(persistedKey);
164+
} else {
165+
sessionStorage.removeItem(persistedKey);
166+
}
167+
} catch (e) {
168+
console.warn('Failed to parse persisted stream state:', e);
169+
sessionStorage.removeItem(persistedKey);
170+
}
171+
}
104172
}, [threadId]);
105173

106174
useEffect(() => {
@@ -171,6 +239,10 @@ export function useAgentStream(
171239
updateStatus(finalStatus);
172240
setAgentRunId(null);
173241
currentRunIdRef.current = null;
242+
243+
// Clear persisted state when stream completes
244+
const persistedKey = `stream_state_${currentThreadId}`;
245+
sessionStorage.removeItem(persistedKey);
174246

175247
// Message refetch disabled - optimistic messages will handle updates
176248

@@ -470,23 +542,14 @@ export function useAgentStream(
470542
useEffect(() => {
471543
isMountedRef.current = true;
472544

473-
// Cleanup function for when the component unmounts or agentRunId changes
545+
// Cleanup function - be more conservative about stream cleanup
474546
return () => {
475547
isMountedRef.current = false;
476-
console.log(
477-
'[useAgentStream] Unmounting or agentRunId changing. Cleaning up stream.',
478-
);
479-
if (streamCleanupRef.current) {
480-
streamCleanupRef.current();
481-
streamCleanupRef.current = null;
482-
}
483-
// Reset state on unmount if needed, though finalizeStream should handle most cases
484-
setStatus('idle');
485-
setTextContent([]);
486-
setToolCall(null);
487-
setError(null);
488-
setAgentRunId(null);
489-
currentRunIdRef.current = null;
548+
console.log('[useAgentStream] Component unmounting/navigation.');
549+
550+
// Don't automatically cleanup streams on navigation
551+
// Only set mounted flag to false to prevent new operations
552+
// Streams will be cleaned up when they naturally complete or on explicit stop
490553
};
491554
}, []); // Empty dependency array for mount/unmount effect
492555

@@ -496,13 +559,13 @@ export function useAgentStream(
496559
async (runId: string) => {
497560
if (!isMountedRef.current) return;
498561
console.log(
499-
`[useAgentStream] Received request to start streaming for ${runId}`,
562+
`[STREAM_FIX] Thread ${threadId} received request to start streaming for ${runId}`,
500563
);
501564

502565
// Clean up any previous stream
503566
if (streamCleanupRef.current) {
504567
console.log(
505-
'[useAgentStream] Cleaning up existing stream before starting new one.',
568+
`[STREAM_FIX] Thread ${threadId} cleaning up existing stream before starting new one.`,
506569
);
507570
streamCleanupRef.current();
508571
streamCleanupRef.current = null;

0 commit comments

Comments
 (0)