Skip to content

Commit 6c57d2a

Browse files
committed
fix local codex review comment
- [P0] Delay streaming session persistence until after filter — packages/agents-core/src/run.ts:936-969 For streaming runs we call ensureStreamInputPersisted?.() on the first turn before #prepareModelCall runs (line 936). ensureStreamInputPersisted immediately persists sessionInputItemsToPersist and flips its internal persisted flag. Later, #prepareModelCall invokes applyCallModelInputFilter, which may redact or truncate the turn input and updates sessionInputItemsFiltered. However, the second ensureStreamInputPersisted?.() (line 968) is a no-op because the persisted flag is already true, so the session history now contains the unfiltered, unredacted input that the filter was supposed to scrub. This leaks exactly the data the caller asked us not to keep. We need to defer the first persistence until after the filter (or otherwise allow the post-filter call to write the sanitized items) so that streaming memory mirrors the filtered payload.
1 parent 0c5ba85 commit 6c57d2a

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

packages/agents-core/src/run.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,6 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
935935

936936
if (result.state._currentTurn === 1) {
937937
await this.#runInputGuardrails(result.state);
938-
await ensureStreamInputPersisted?.();
939938
}
940939

941940
const turnInput = serverConversationTracker

packages/agents-core/test/run.stream.test.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,66 @@ describe('Runner.run (streaming)', () => {
810810
});
811811
});
812812

813+
it('persists filtered streaming input instead of the raw turn payload', async () => {
814+
const saveInputSpy = vi
815+
.spyOn(runImplementation, 'saveStreamInputToSession')
816+
.mockResolvedValue();
817+
818+
const session = createSessionMock();
819+
820+
const agent = new Agent({
821+
name: 'StreamFiltered',
822+
model: new ImmediateStreamingModel({
823+
output: [fakeModelMessage('done')],
824+
usage: new Usage(),
825+
}),
826+
});
827+
828+
const runner = new Runner();
829+
830+
const secretInput = 'super secret';
831+
const redactedContent = '[filtered]';
832+
833+
const result = await runner.run(agent, secretInput, {
834+
stream: true,
835+
session,
836+
callModelInputFilter: ({ modelData }) => {
837+
const sanitizedInput = modelData.input.map((item) => {
838+
if (
839+
item.type === 'message' &&
840+
'role' in item &&
841+
item.role === 'user'
842+
) {
843+
return {
844+
...item,
845+
content: redactedContent,
846+
};
847+
}
848+
return item;
849+
});
850+
851+
return {
852+
...modelData,
853+
input: sanitizedInput,
854+
};
855+
},
856+
});
857+
858+
await result.completed;
859+
860+
expect(saveInputSpy).toHaveBeenCalledTimes(1);
861+
const [, persistedItems] = saveInputSpy.mock.calls[0];
862+
if (!Array.isArray(persistedItems)) {
863+
throw new Error('Expected persisted session items to be an array.');
864+
}
865+
expect(persistedItems).toHaveLength(1);
866+
expect(persistedItems[0]).toMatchObject({
867+
role: 'user',
868+
content: redactedContent,
869+
});
870+
expect(JSON.stringify(persistedItems)).not.toContain(secretInput);
871+
});
872+
813873
it('skips streaming session persistence when the server manages the conversation', async () => {
814874
const saveInputSpy = vi
815875
.spyOn(runImplementation, 'saveStreamInputToSession')

0 commit comments

Comments
 (0)