Skip to content

Commit ce85e41

Browse files
committed
fix local codex review
1 parent 8bfeeda commit ce85e41

File tree

5 files changed

+111
-27
lines changed

5 files changed

+111
-27
lines changed

packages/agents-core/src/run.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,33 +1348,37 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
13481348
};
13491349
const session = effectiveOptions.session;
13501350
const resumingFromState = input instanceof RunState;
1351-
let sessionOriginalInput: string | AgentInputItem[] | undefined;
1352-
if (resumingFromState) {
1353-
if (session) {
1354-
sessionOriginalInput = [] as AgentInputItem[];
1355-
}
1356-
} else {
1357-
sessionOriginalInput = input as string | AgentInputItem[];
1358-
}
1351+
let sessionInputItemsToPersist: AgentInputItem[] | undefined =
1352+
session && resumingFromState ? [] : undefined;
13591353

13601354
let preparedInput: typeof input = input;
13611355
if (!(preparedInput instanceof RunState)) {
1362-
preparedInput = await prepareInputItemsWithSession(
1356+
const prepared = await prepareInputItemsWithSession(
13631357
preparedInput,
13641358
session,
13651359
sessionInputCallback,
13661360
);
1361+
preparedInput = prepared.preparedInput;
1362+
if (session) {
1363+
const items = prepared.sessionItems ?? [];
1364+
// Clone the items that will be persisted so later mutations (filters, hooks) cannot desync history.
1365+
sessionInputItemsToPersist = items.map((item) => structuredClone(item));
1366+
}
13671367
}
13681368

13691369
let ensureStreamInputPersisted: (() => Promise<void>) | undefined;
1370-
if (session && typeof sessionOriginalInput !== 'undefined') {
1370+
if (
1371+
session &&
1372+
sessionInputItemsToPersist &&
1373+
sessionInputItemsToPersist.length > 0
1374+
) {
13711375
let persisted = false;
13721376
ensureStreamInputPersisted = async () => {
13731377
if (persisted) {
13741378
return;
13751379
}
13761380
persisted = true;
1377-
await saveStreamInputToSession(session, sessionOriginalInput);
1381+
await saveStreamInputToSession(session, sessionInputItemsToPersist);
13781382
};
13791383
}
13801384

@@ -1393,7 +1397,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
13931397
preparedInput,
13941398
effectiveOptions,
13951399
);
1396-
await saveToSession(session, sessionOriginalInput, runResult);
1400+
await saveToSession(session, sessionInputItemsToPersist, runResult);
13971401
return runResult;
13981402
};
13991403

packages/agents-core/src/runImplementation.ts

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1737,13 +1737,13 @@ export function extractOutputItemsFromRunItems(
17371737
*/
17381738
export async function saveToSession(
17391739
session: Session | undefined,
1740-
originalInput: string | AgentInputItem[] | undefined,
1740+
sessionInputItems: AgentInputItem[] | undefined,
17411741
result: RunResult<any, any>,
17421742
): Promise<void> {
17431743
if (!session) {
17441744
return;
17451745
}
1746-
const inputItems = originalInput ? toInputItemList(originalInput) : [];
1746+
const inputItems = sessionInputItems ?? [];
17471747
const state = result.state;
17481748
const alreadyPersisted = state._currentTurnPersistedItemCount ?? 0;
17491749
// Persist only the portion of _generatedItems that has not yet been stored for this turn.
@@ -1771,19 +1771,15 @@ export async function saveToSession(
17711771
*/
17721772
export async function saveStreamInputToSession(
17731773
session: Session | undefined,
1774-
originalInput: string | AgentInputItem[] | undefined,
1774+
sessionInputItems: AgentInputItem[] | undefined,
17751775
): Promise<void> {
17761776
if (!session) {
17771777
return;
17781778
}
1779-
if (!originalInput) {
1779+
if (!sessionInputItems || sessionInputItems.length === 0) {
17801780
return;
17811781
}
1782-
const itemsToSave = toInputItemList(originalInput);
1783-
if (itemsToSave.length === 0) {
1784-
return;
1785-
}
1786-
await session.addItems(itemsToSave);
1782+
await session.addItems(sessionInputItems);
17871783
}
17881784

17891785
/**
@@ -1814,13 +1810,21 @@ export async function saveStreamResultToSession(
18141810
* @internal
18151811
* If a session is provided, expands the input with session history; otherwise returns the input.
18161812
*/
1813+
export type PreparedInputWithSessionResult = {
1814+
preparedInput: string | AgentInputItem[];
1815+
sessionItems?: AgentInputItem[];
1816+
};
1817+
18171818
export async function prepareInputItemsWithSession(
18181819
input: string | AgentInputItem[],
18191820
session?: Session,
18201821
sessionInputCallback?: SessionInputCallback,
1821-
): Promise<string | AgentInputItem[]> {
1822+
): Promise<PreparedInputWithSessionResult> {
18221823
if (!session) {
1823-
return input;
1824+
return {
1825+
preparedInput: input,
1826+
sessionItems: undefined,
1827+
};
18241828
}
18251829

18261830
const history = await session.getItems();
@@ -1839,7 +1843,10 @@ export async function prepareInputItemsWithSession(
18391843
}
18401844

18411845
if (!sessionInputCallback) {
1842-
return [...history, ...newInputItems];
1846+
return {
1847+
preparedInput: [...history, ...newInputItems],
1848+
sessionItems: newInputItems,
1849+
};
18431850
}
18441851

18451852
// Delegate history reconciliation to the user-supplied callback. It must return a concrete list
@@ -1851,5 +1858,26 @@ export async function prepareInputItemsWithSession(
18511858
);
18521859
}
18531860

1854-
return combined;
1861+
const historySet = new Set(history);
1862+
const newInputSet = new Set(newInputItems);
1863+
const appended: AgentInputItem[] = [];
1864+
for (const item of combined) {
1865+
if (historySet.has(item) || !newInputSet.has(item)) {
1866+
if (!historySet.has(item) && !newInputSet.has(item)) {
1867+
appended.push(item);
1868+
}
1869+
continue;
1870+
}
1871+
appended.push(item);
1872+
}
1873+
1874+
if (appended.length === 0 && combined.length > history.length) {
1875+
// When callbacks replace every new item with fresh objects, fall back to the tail slice.
1876+
appended.push(...combined.slice(history.length));
1877+
}
1878+
1879+
return {
1880+
preparedInput: combined,
1881+
sessionItems: appended.length > 0 ? appended : [],
1882+
};
18551883
}

packages/agents-core/test/run.stream.test.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,16 @@ describe('Runner.run (streaming)', () => {
714714
await result.completed;
715715

716716
expect(saveInputSpy).toHaveBeenCalledTimes(1);
717-
expect(saveInputSpy).toHaveBeenCalledWith(session, 'hello world');
717+
const [sessionArg, persistedItems] = saveInputSpy.mock.calls[0];
718+
expect(sessionArg).toBe(session);
719+
if (!Array.isArray(persistedItems)) {
720+
throw new Error('Expected persisted session items to be an array.');
721+
}
722+
expect(persistedItems).toHaveLength(1);
723+
expect(persistedItems[0]).toMatchObject({
724+
role: 'user',
725+
content: 'hello world',
726+
});
718727
});
719728

720729
it('skips persisting streaming input when an input guardrail triggers', async () => {

packages/agents-core/test/run.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,44 @@ describe('Runner.run', () => {
775775
expect(getFirstTextContent(recordedInput[1])).toBe('Fresh input');
776776
});
777777

778+
it('persists transformed session input from callback', async () => {
779+
const model = new RecordingModel([
780+
{
781+
...TEST_MODEL_RESPONSE_BASIC,
782+
output: [fakeModelMessage('session response')],
783+
},
784+
]);
785+
const agent = new Agent({ name: 'SessionTransform', model });
786+
const session = new MemorySession();
787+
const runner = new Runner();
788+
const original = 'Sensitive payload';
789+
const redacted = '[redacted]';
790+
791+
await runner.run(agent, original, {
792+
session,
793+
sessionInputCallback: (history, newItems) => {
794+
expect(history).toHaveLength(0);
795+
if (newItems[0] && typeof newItems[0] === 'object') {
796+
(newItems[0] as protocol.UserMessageItem).content = redacted;
797+
}
798+
return history.concat(newItems);
799+
},
800+
});
801+
802+
const recordedInput = model.lastRequest?.input as AgentInputItem[];
803+
expect(recordedInput[recordedInput.length - 1]).toMatchObject({
804+
role: 'user',
805+
content: redacted,
806+
});
807+
808+
expect(session.added).toHaveLength(1);
809+
const persistedTurn = session.added[0];
810+
expect(persistedTurn[0]).toMatchObject({
811+
role: 'user',
812+
content: redacted,
813+
});
814+
});
815+
778816
it('throws when session input callback returns invalid data', async () => {
779817
const model = new RecordingModel([
780818
{

packages/agents-core/test/runImplementation.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
streamStepItemsToRunResult,
3333
saveToSession,
3434
executeInterruptedToolsAndSideEffects,
35+
toInputItemList,
3536
} from '../src/runImplementation';
3637
import {
3738
FunctionTool,
@@ -195,7 +196,11 @@ describe('saveToSession', () => {
195196
state._generatedItems = [new ToolApprovalItem(functionCall, textAgent)];
196197

197198
const preApprovalResult = new RunResult(state);
198-
await saveToSession(session, state._originalInput, preApprovalResult);
199+
await saveToSession(
200+
session,
201+
toInputItemList(state._originalInput),
202+
preApprovalResult,
203+
);
199204

200205
expect(session.items).toEqual([
201206
{

0 commit comments

Comments
 (0)