Skip to content

Commit c95f197

Browse files
committed
fix local codex review comment:
- [P1] Persist streaming session input on model errors — packages/agents-core/src/run.ts:1045-1086 When we stream a run with a session, ensureStreamInputPersisted is only invoked in the final_output and interruption branches. If the model stream rejects before either branch is reached (for example a transient provider/network error, an abort signal, or a thrown tool error), we exit the loop via the catch path without ever calling ensureStreamInputPersisted, so saveStreamInputToSession never runs. That drops the user’s latest turn from the session, which prevents resuming the conversation precisely when we need memory most. We should persist the input as soon as we hand it to the model (or at least in a finally block) so streaming sessions remain consistent even when the stream fails early.
1 parent 7a489f4 commit c95f197

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

packages/agents-core/src/run.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,16 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
877877
})
878878
: undefined;
879879

880+
let handedInputToModel = false;
881+
let streamInputPersisted = false;
882+
const persistStreamInputIfNeeded = async () => {
883+
if (streamInputPersisted || !ensureStreamInputPersisted) {
884+
return;
885+
}
886+
await ensureStreamInputPersisted();
887+
streamInputPersisted = true;
888+
};
889+
880890
if (serverConversationTracker && isResumedState) {
881891
serverConversationTracker.primeFromState({
882892
originalInput: result.state._originalInput,
@@ -983,7 +993,8 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
983993
sessionInputUpdate,
984994
);
985995

986-
await ensureStreamInputPersisted?.();
996+
handedInputToModel = true;
997+
await persistStreamInputIfNeeded();
987998

988999
for await (const event of preparedCall.model.getStreamedResponse({
9891000
systemInstructions: preparedCall.modelInput.instructions,
@@ -1081,7 +1092,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
10811092
result.state,
10821093
result.state._currentStep.output,
10831094
);
1084-
await ensureStreamInputPersisted?.();
1095+
await persistStreamInputIfNeeded();
10851096
// Guardrails must succeed before persisting session memory to avoid storing blocked outputs.
10861097
if (!serverManagesConversation) {
10871098
await saveStreamResultToSession(options.session, result);
@@ -1102,7 +1113,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
11021113
result.state._currentStep.type === 'next_step_interruption'
11031114
) {
11041115
// we are done for now. Don't run any output guardrails
1105-
await ensureStreamInputPersisted?.();
1116+
await persistStreamInputIfNeeded();
11061117
if (!serverManagesConversation) {
11071118
await saveStreamResultToSession(options.session, result);
11081119
}
@@ -1129,6 +1140,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
11291140
}
11301141
}
11311142
} catch (error) {
1143+
if (handedInputToModel && !streamInputPersisted) {
1144+
await persistStreamInputIfNeeded();
1145+
}
11321146
if (result.state._currentAgentSpan) {
11331147
result.state._currentAgentSpan.setError({
11341148
message: 'Error in agent run',

packages/agents-core/test/run.stream.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,40 @@ describe('Runner.run (streaming)', () => {
810810
});
811811
});
812812

813+
it('persists streaming input when the model stream rejects before completion', async () => {
814+
const saveInputSpy = vi
815+
.spyOn(runImplementation, 'saveStreamInputToSession')
816+
.mockResolvedValue();
817+
818+
const session = createSessionMock();
819+
const streamError = new Error('model stream failed');
820+
821+
const agent = new Agent({
822+
name: 'StreamFailurePersistsInput',
823+
model: new RejectingStreamingModel(streamError),
824+
});
825+
826+
const runner = new Runner();
827+
828+
const result = await runner.run(agent, 'save me please', {
829+
stream: true,
830+
session,
831+
});
832+
833+
await expect(result.completed).rejects.toThrow('model stream failed');
834+
835+
expect(saveInputSpy).toHaveBeenCalledTimes(1);
836+
const [, persistedItems] = saveInputSpy.mock.calls[0];
837+
if (!Array.isArray(persistedItems)) {
838+
throw new Error('Expected persisted session items to be an array.');
839+
}
840+
expect(persistedItems).toHaveLength(1);
841+
expect(persistedItems[0]).toMatchObject({
842+
role: 'user',
843+
content: 'save me please',
844+
});
845+
});
846+
813847
it('persists filtered streaming input instead of the raw turn payload', async () => {
814848
const saveInputSpy = vi
815849
.spyOn(runImplementation, 'saveStreamInputToSession')
@@ -971,6 +1005,27 @@ class ImmediateStreamingModel implements Model {
9711005
}
9721006
}
9731007

1008+
class RejectingStreamingModel implements Model {
1009+
constructor(private readonly error: Error) {}
1010+
1011+
async getResponse(_request: ModelRequest): Promise<ModelResponse> {
1012+
throw this.error;
1013+
}
1014+
1015+
getStreamedResponse(_request: ModelRequest): AsyncIterable<StreamEvent> {
1016+
const error = this.error;
1017+
return {
1018+
[Symbol.asyncIterator]() {
1019+
return {
1020+
async next() {
1021+
throw error;
1022+
},
1023+
} satisfies AsyncIterator<StreamEvent>;
1024+
},
1025+
} satisfies AsyncIterable<StreamEvent>;
1026+
}
1027+
}
1028+
9741029
function createSessionMock(): Session {
9751030
return {
9761031
getSessionId: vi.fn().mockResolvedValue('session-id'),

0 commit comments

Comments
 (0)