Skip to content

Commit 49d9883

Browse files
committed
fix: Fix aborting chat streaming
Improves the chat stream abortion process by ensuring that partial responses are saved before the abort signal is sent. This avoids a race condition where the onError callback could clear the streaming state before the partial response is saved. Additionally, the stream reading loop and callbacks are now checked for abort signals to prevent further processing after abortion.
1 parent 55ecb9b commit 49d9883

File tree

2 files changed

+73
-17
lines changed

2 files changed

+73
-17
lines changed

tools/server/webui/src/lib/services/chat.ts

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,16 @@ export class ChatService {
194194
}
195195

196196
if (stream) {
197-
return this.handleStreamResponse(
197+
await this.handleStreamResponse(
198198
response,
199199
onChunk,
200200
onComplete,
201201
onError,
202202
options.onReasoningChunk,
203-
conversationId
203+
conversationId,
204+
abortController.signal
204205
);
206+
return;
205207
} else {
206208
return this.handleNonStreamResponse(response, onComplete, onError);
207209
}
@@ -264,7 +266,8 @@ export class ChatService {
264266
) => void,
265267
onError?: (error: Error) => void,
266268
onReasoningChunk?: (chunk: string) => void,
267-
conversationId?: string
269+
conversationId?: string,
270+
abortSignal?: AbortSignal
268271
): Promise<void> {
269272
const reader = response.body?.getReader();
270273

@@ -282,14 +285,29 @@ export class ChatService {
282285
try {
283286
let chunk = '';
284287
while (true) {
288+
// Check if we've been aborted before reading more data
289+
if (abortSignal?.aborted) {
290+
break;
291+
}
292+
285293
const { done, value } = await reader.read();
286294
if (done) break;
287295

296+
// Check again after async read
297+
if (abortSignal?.aborted) {
298+
break;
299+
}
300+
288301
chunk += decoder.decode(value, { stream: true });
289302
const lines = chunk.split('\n');
290303
chunk = lines.pop() || ''; // Save incomplete line for next read
291304

292305
for (const line of lines) {
306+
// Check abort signal before processing each line
307+
if (abortSignal?.aborted) {
308+
break;
309+
}
310+
293311
if (line.startsWith('data: ')) {
294312
const data = line.slice(6);
295313
if (data === '[DONE]') {
@@ -317,19 +335,35 @@ export class ChatService {
317335
if (content) {
318336
hasReceivedData = true;
319337
aggregatedContent += content;
320-
onChunk?.(content);
338+
// Only call callback if not aborted
339+
if (!abortSignal?.aborted) {
340+
onChunk?.(content);
341+
}
321342
}
322343

323344
if (reasoningContent) {
324345
hasReceivedData = true;
325346
fullReasoningContent += reasoningContent;
326-
onReasoningChunk?.(reasoningContent);
347+
// Only call callback if not aborted
348+
if (!abortSignal?.aborted) {
349+
onReasoningChunk?.(reasoningContent);
350+
}
327351
}
328352
} catch (e) {
329353
console.error('Error parsing JSON chunk:', e);
330354
}
331355
}
332356
}
357+
358+
// Break outer loop if aborted during line processing
359+
if (abortSignal?.aborted) {
360+
break;
361+
}
362+
}
363+
364+
// Don't call onComplete if we've been aborted
365+
if (abortSignal?.aborted) {
366+
return;
333367
}
334368

335369
if (streamFinished) {

tools/server/webui/src/lib/stores/chat.svelte.ts

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -685,16 +685,24 @@ class ChatStore {
685685
* Stops the current message generation
686686
* Aborts ongoing requests and saves partial response if available
687687
*/
688-
stopGeneration(): void {
688+
async stopGeneration(): Promise<void> {
689+
if (!this.activeConversation) return;
690+
691+
const convId = this.activeConversation.id;
692+
693+
// Save partial response BEFORE aborting to avoid race condition
694+
// where onError callback clears the streaming state
695+
await this.savePartialResponseIfNeeded(convId);
696+
697+
// Stop streaming and abort the request
689698
slotsService.stopStreaming();
690-
chatService.abort();
691-
this.savePartialResponseIfNeeded();
699+
chatService.abort(convId);
692700

693-
// Clear all conversation loading and streaming states
694-
this.conversationLoadingStates.clear();
695-
this.conversationStreamingStates.clear();
696-
this.isLoading = false;
697-
this.currentResponse = '';
701+
// Clear loading and streaming states for the active conversation only
702+
// Note: onError callback will also clear these, but that's okay - idempotent operations
703+
this.setConversationLoading(convId, false);
704+
this.clearConversationStreaming(convId);
705+
slotsService.clearConversationState(convId);
698706
}
699707

700708
/**
@@ -718,12 +726,26 @@ class ChatStore {
718726
* Saves partial response if generation was interrupted
719727
* Preserves user's partial content and timing data when generation is stopped early
720728
*/
721-
private async savePartialResponseIfNeeded(): Promise<void> {
722-
if (!this.currentResponse.trim() || !this.activeMessages.length) {
729+
private async savePartialResponseIfNeeded(convId?: string): Promise<void> {
730+
// Use provided conversation ID or active conversation
731+
const conversationId = convId || this.activeConversation?.id;
732+
if (!conversationId) return;
733+
734+
// Get the streaming state for this conversation
735+
const streamingState = this.conversationStreamingStates.get(conversationId);
736+
if (!streamingState || !streamingState.response.trim()) {
723737
return;
724738
}
725739

726-
const lastMessage = this.activeMessages[this.activeMessages.length - 1];
740+
// Get messages for this conversation
741+
const messages =
742+
conversationId === this.activeConversation?.id
743+
? this.activeMessages
744+
: await DatabaseStore.getConversationMessages(conversationId);
745+
746+
if (!messages.length) return;
747+
748+
const lastMessage = messages[messages.length - 1];
727749

728750
if (lastMessage && lastMessage.role === 'assistant') {
729751
try {
@@ -732,7 +754,7 @@ class ChatStore {
732754
thinking?: string;
733755
timings?: ChatMessageTimings;
734756
} = {
735-
content: this.currentResponse
757+
content: streamingState.response
736758
};
737759

738760
if (lastMessage.thinking?.trim()) {

0 commit comments

Comments
 (0)