Skip to content

Commit 9c4d159

Browse files
authored
Fix(quality): Refine Stream Validation Logic (#9150)
1 parent fcffcfb commit 9c4d159

File tree

2 files changed

+192
-81
lines changed

2 files changed

+192
-81
lines changed

packages/core/src/core/geminiChat.test.ts

Lines changed: 160 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,12 @@ vi.mock('../fallback/handler.js', () => ({
6969
handleFallback: mockHandleFallback,
7070
}));
7171

72-
const { mockLogInvalidChunk, mockLogContentRetry, mockLogContentRetryFailure } =
73-
vi.hoisted(() => ({
74-
mockLogInvalidChunk: vi.fn(),
75-
mockLogContentRetry: vi.fn(),
76-
mockLogContentRetryFailure: vi.fn(),
77-
}));
72+
const { mockLogContentRetry, mockLogContentRetryFailure } = vi.hoisted(() => ({
73+
mockLogContentRetry: vi.fn(),
74+
mockLogContentRetryFailure: vi.fn(),
75+
}));
7876

7977
vi.mock('../telemetry/loggers.js', () => ({
80-
logInvalidChunk: mockLogInvalidChunk,
8178
logContentRetry: mockLogContentRetry,
8279
logContentRetryFailure: mockLogContentRetryFailure,
8380
}));
@@ -454,7 +451,7 @@ describe('GeminiChat', () => {
454451
'This is the visible text that should not be lost.',
455452
);
456453
});
457-
it('should add a placeholder model turn when a tool call is followed by an empty stream response', async () => {
454+
it('should throw an error when a tool call is followed by an empty stream response', async () => {
458455
// 1. Setup: A history where the model has just made a function call.
459456
const initialHistory: Content[] = [
460457
{
@@ -503,23 +500,164 @@ describe('GeminiChat', () => {
503500
},
504501
'prompt-id-stream-1',
505502
);
506-
for await (const _ of stream) {
507-
// This loop consumes the stream to trigger the internal logic.
508-
}
509503

510-
// 4. Assert: The history should now have four valid, alternating turns.
511-
const history = chat.getHistory();
512-
expect(history.length).toBe(4);
504+
// 4. Assert: The stream processing should throw an EmptyStreamError.
505+
await expect(
506+
(async () => {
507+
for await (const _ of stream) {
508+
// This loop consumes the stream to trigger the internal logic.
509+
}
510+
})(),
511+
).rejects.toThrow(EmptyStreamError);
512+
});
513+
514+
it('should succeed when there is a tool call without finish reason', async () => {
515+
// Setup: Stream with tool call but no finish reason
516+
const streamWithToolCall = (async function* () {
517+
yield {
518+
candidates: [
519+
{
520+
content: {
521+
role: 'model',
522+
parts: [
523+
{
524+
functionCall: {
525+
name: 'test_function',
526+
args: { param: 'value' },
527+
},
528+
},
529+
],
530+
},
531+
// No finishReason
532+
},
533+
],
534+
} as unknown as GenerateContentResponse;
535+
})();
536+
537+
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
538+
streamWithToolCall,
539+
);
540+
541+
const stream = await chat.sendMessageStream(
542+
'test-model',
543+
{ message: 'test' },
544+
'prompt-id-1',
545+
);
546+
547+
// Should not throw an error
548+
await expect(
549+
(async () => {
550+
for await (const _ of stream) {
551+
// consume stream
552+
}
553+
})(),
554+
).resolves.not.toThrow();
555+
});
556+
557+
it('should throw EmptyStreamError when no tool call and no finish reason', async () => {
558+
// Setup: Stream with text but no finish reason and no tool call
559+
const streamWithoutFinishReason = (async function* () {
560+
yield {
561+
candidates: [
562+
{
563+
content: {
564+
role: 'model',
565+
parts: [{ text: 'some response' }],
566+
},
567+
// No finishReason
568+
},
569+
],
570+
} as unknown as GenerateContentResponse;
571+
})();
572+
573+
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
574+
streamWithoutFinishReason,
575+
);
513576

514-
// The final turn must be the empty model placeholder.
515-
const lastTurn = history[3]!;
516-
expect(lastTurn.role).toBe('model');
517-
expect(lastTurn?.parts?.length).toBe(0);
577+
const stream = await chat.sendMessageStream(
578+
'test-model',
579+
{ message: 'test' },
580+
'prompt-id-1',
581+
);
518582

519-
// The second-to-last turn must be the function response we sent.
520-
const secondToLastTurn = history[2]!;
521-
expect(secondToLastTurn.role).toBe('user');
522-
expect(secondToLastTurn?.parts![0]!.functionResponse).toBeDefined();
583+
await expect(
584+
(async () => {
585+
for await (const _ of stream) {
586+
// consume stream
587+
}
588+
})(),
589+
).rejects.toThrow(EmptyStreamError);
590+
});
591+
592+
it('should throw EmptyStreamError when no tool call and empty response text', async () => {
593+
// Setup: Stream with finish reason but empty response (only thoughts)
594+
const streamWithEmptyResponse = (async function* () {
595+
yield {
596+
candidates: [
597+
{
598+
content: {
599+
role: 'model',
600+
parts: [{ thought: 'thinking...' }],
601+
},
602+
finishReason: 'STOP',
603+
},
604+
],
605+
} as unknown as GenerateContentResponse;
606+
})();
607+
608+
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
609+
streamWithEmptyResponse,
610+
);
611+
612+
const stream = await chat.sendMessageStream(
613+
'test-model',
614+
{ message: 'test' },
615+
'prompt-id-1',
616+
);
617+
618+
await expect(
619+
(async () => {
620+
for await (const _ of stream) {
621+
// consume stream
622+
}
623+
})(),
624+
).rejects.toThrow(EmptyStreamError);
625+
});
626+
627+
it('should succeed when there is finish reason and response text', async () => {
628+
// Setup: Stream with both finish reason and text content
629+
const validStream = (async function* () {
630+
yield {
631+
candidates: [
632+
{
633+
content: {
634+
role: 'model',
635+
parts: [{ text: 'valid response' }],
636+
},
637+
finishReason: 'STOP',
638+
},
639+
],
640+
} as unknown as GenerateContentResponse;
641+
})();
642+
643+
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
644+
validStream,
645+
);
646+
647+
const stream = await chat.sendMessageStream(
648+
'test-model',
649+
{ message: 'test' },
650+
'prompt-id-1',
651+
);
652+
653+
// Should not throw an error
654+
await expect(
655+
(async () => {
656+
for await (const _ of stream) {
657+
// consume stream
658+
}
659+
})(),
660+
).resolves.not.toThrow();
523661
});
524662

525663
it('should call generateContentStream with the correct parameters', async () => {
@@ -690,7 +828,6 @@ describe('GeminiChat', () => {
690828
}
691829

692830
// Assertions
693-
expect(mockLogInvalidChunk).toHaveBeenCalledTimes(1);
694831
expect(mockLogContentRetry).toHaveBeenCalledTimes(1);
695832
expect(mockLogContentRetryFailure).not.toHaveBeenCalled();
696833
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
@@ -758,7 +895,6 @@ describe('GeminiChat', () => {
758895
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
759896
3,
760897
);
761-
expect(mockLogInvalidChunk).toHaveBeenCalledTimes(3);
762898
expect(mockLogContentRetry).toHaveBeenCalledTimes(2);
763899
expect(mockLogContentRetryFailure).toHaveBeenCalledTimes(1);
764900

packages/core/src/core/geminiChat.ts

Lines changed: 32 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ import type { StructuredError } from './turn.js';
2929
import {
3030
logContentRetry,
3131
logContentRetryFailure,
32-
logInvalidChunk,
3332
} from '../telemetry/loggers.js';
3433
import { ChatRecordingService } from '../services/chatRecordingService.js';
3534
import {
3635
ContentRetryEvent,
3736
ContentRetryFailureEvent,
38-
InvalidChunkEvent,
3937
} from '../telemetry/types.js';
4038
import { handleFallback } from '../fallback/handler.js';
4139
import { isFunctionResponse } from '../utils/messageInspectors.js';
@@ -491,19 +489,14 @@ export class GeminiChat {
491489
streamResponse: AsyncGenerator<GenerateContentResponse>,
492490
): AsyncGenerator<GenerateContentResponse> {
493491
const modelResponseParts: Part[] = [];
494-
let hasReceivedAnyChunk = false;
495-
let hasReceivedValidChunk = false;
492+
496493
let hasToolCall = false;
497-
let lastChunk: GenerateContentResponse | null = null;
498-
let lastChunkIsInvalid = false;
494+
let hasFinishReason = false;
499495

500496
for await (const chunk of this.stopBeforeSecondMutator(streamResponse)) {
501-
hasReceivedAnyChunk = true;
502-
lastChunk = chunk;
503-
497+
hasFinishReason =
498+
chunk?.candidates?.some((candidate) => candidate.finishReason) ?? false;
504499
if (isValidResponse(chunk)) {
505-
hasReceivedValidChunk = true;
506-
lastChunkIsInvalid = false;
507500
const content = chunk.candidates?.[0]?.content;
508501
if (content?.parts) {
509502
if (content.parts.some((part) => part.thought)) {
@@ -518,12 +511,6 @@ export class GeminiChat {
518511
...content.parts.filter((part) => !part.thought),
519512
);
520513
}
521-
} else {
522-
logInvalidChunk(
523-
this.config,
524-
new InvalidChunkEvent('Invalid chunk received from stream.'),
525-
);
526-
lastChunkIsInvalid = true;
527514
}
528515

529516
// Record token usage if this chunk has usageMetadata
@@ -539,46 +526,6 @@ export class GeminiChat {
539526
yield chunk; // Yield every chunk to the UI immediately.
540527
}
541528

542-
if (!hasReceivedAnyChunk) {
543-
throw new EmptyStreamError('Model stream completed without any chunks.');
544-
}
545-
546-
const hasFinishReason = lastChunk?.candidates?.some(
547-
(candidate) => candidate.finishReason,
548-
);
549-
550-
// Stream validation logic: A stream is considered successful if:
551-
// 1. There's a tool call (tool calls can end without explicit finish reasons), OR
552-
// 2. There's a finish reason AND the last chunk is valid (or we haven't received any valid chunks)
553-
//
554-
// We throw an error only when there's no tool call AND:
555-
// - No finish reason, OR
556-
// - Last chunk is invalid after receiving valid content
557-
if (
558-
!hasToolCall &&
559-
(!hasFinishReason || (lastChunkIsInvalid && !hasReceivedValidChunk))
560-
) {
561-
throw new EmptyStreamError(
562-
'Model stream ended with an invalid chunk or missing finish reason.',
563-
);
564-
}
565-
566-
// Record model response text from the collected parts
567-
if (modelResponseParts.length > 0) {
568-
const responseText = modelResponseParts
569-
.filter((part) => part.text)
570-
.map((part) => part.text)
571-
.join('');
572-
573-
if (responseText.trim()) {
574-
this.chatRecordingService.recordMessage({
575-
model,
576-
type: 'gemini',
577-
content: responseText,
578-
});
579-
}
580-
}
581-
582529
// String thoughts and consolidate text parts.
583530
const consolidatedParts: Part[] = [];
584531
for (const part of modelResponseParts) {
@@ -594,6 +541,34 @@ export class GeminiChat {
594541
}
595542
}
596543

544+
const responseText = consolidatedParts
545+
.filter((part) => part.text)
546+
.map((part) => part.text)
547+
.join('')
548+
.trim();
549+
550+
// Record model response text from the collected parts
551+
if (responseText) {
552+
this.chatRecordingService.recordMessage({
553+
model,
554+
type: 'gemini',
555+
content: responseText,
556+
});
557+
}
558+
559+
// Stream validation logic: A stream is considered successful if:
560+
// 1. There's a tool call (tool calls can end without explicit finish reasons), OR
561+
// 2. There's a finish reason AND we have non-empty response text
562+
//
563+
// We throw an error only when there's no tool call AND:
564+
// - No finish reason, OR
565+
// - Empty response text (e.g., only thoughts with no actual content)
566+
if (!hasToolCall && (!hasFinishReason || !responseText)) {
567+
throw new EmptyStreamError(
568+
'Model stream ended with an invalid chunk or missing finish reason.',
569+
);
570+
}
571+
597572
this.history.push({ role: 'model', parts: consolidatedParts });
598573
}
599574

0 commit comments

Comments
 (0)