Skip to content

Commit 12fe46c

Browse files
committed
fix local codex review comment
- [P0] Persist filtered session input — packages/agents-core/src/run.ts:1544-1580 Because sessionInputItemsToPersist is captured and cloned here before we call #applyCallModelInputFilter, we end up saving the raw, pre-filter items to session memory. In the streaming path this is even worse: ensureStreamInputPersisted runs in #runStreamLoop immediately after guardrails and before the filter executes, so the unredacted text is written to the session even if the filter later strips or sanitizes it. A caller who uses callModelInputFilter to redact PII (e.g. replacing a user SSN with [redacted]) will still have the original SSN stored in the session. We need to persist the filtered sourceItems (or otherwise defer persisting until after the filter returns) so session history reflects exactly what was allowed to reach the model.
1 parent e594dfe commit 12fe46c

File tree

1 file changed

+71
-7
lines changed
  • packages/agents-core/src

1 file changed

+71
-7
lines changed

packages/agents-core/src/run.ts

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
663663
startingAgent: TAgent,
664664
input: string | AgentInputItem[] | RunState<TContext, TAgent>,
665665
options: NonStreamRunOptions<TContext>,
666+
// sessionInputUpdate lets the caller adjust queued session items after filters run so we
667+
// persist exactly what we send to the model (e.g., after redactions or truncation).
668+
sessionInputUpdate?: (items: (AgentInputItem | undefined)[]) => void,
666669
): Promise<RunResult<TContext, TAgent>> {
667670
return withNewSpanContext(async () => {
668671
// if we have a saved state we use that one, otherwise we create a new one
@@ -828,6 +831,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
828831
);
829832

830833
serverConversationTracker?.markInputAsSent(filteredSourceItems);
834+
sessionInputUpdate?.(filteredSourceItems);
831835

832836
let modelSettings = {
833837
...this.config.modelSettings,
@@ -1095,6 +1099,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
10951099
options: StreamRunOptions<TContext>,
10961100
isResumedState: boolean,
10971101
ensureStreamInputPersisted?: () => Promise<void>,
1102+
sessionInputUpdate?: (items: (AgentInputItem | undefined)[]) => void,
10981103
): Promise<void> {
10991104
const serverConversationTracker =
11001105
options.conversationId || options.previousResponseId
@@ -1270,6 +1275,8 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
12701275
);
12711276

12721277
serverConversationTracker?.markInputAsSent(filteredSourceItems);
1278+
sessionInputUpdate?.(filteredSourceItems);
1279+
await ensureStreamInputPersisted?.();
12731280

12741281
for await (const event of model.getStreamedResponse({
12751282
systemInstructions: filteredModelInput.instructions,
@@ -1437,6 +1444,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
14371444
input: string | AgentInputItem[] | RunState<TContext, TAgent>,
14381445
options?: StreamRunOptions<TContext>,
14391446
ensureStreamInputPersisted?: () => Promise<void>,
1447+
sessionInputUpdate?: (items: (AgentInputItem | undefined)[]) => void,
14401448
): Promise<StreamedRunResult<TContext, TAgent>> {
14411449
options = options ?? ({} as StreamRunOptions<TContext>);
14421450
return withNewSpanContext(async () => {
@@ -1468,6 +1476,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
14681476
options,
14691477
isResumedState,
14701478
ensureStreamInputPersisted,
1479+
sessionInputUpdate,
14711480
).then(
14721481
() => {
14731482
result._done();
@@ -1533,6 +1542,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
15331542
// Likewise allow callers to override callModelInputFilter on individual runs.
15341543
const callModelInputFilter =
15351544
resolvedOptions.callModelInputFilter ?? this.config.callModelInputFilter;
1545+
const hasCallModelInputFilter = Boolean(callModelInputFilter);
15361546
const effectiveOptions = {
15371547
...resolvedOptions,
15381548
sessionInputCallback,
@@ -1542,6 +1552,46 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
15421552
const resumingFromState = input instanceof RunState;
15431553
let sessionInputItemsToPersist: AgentInputItem[] | undefined =
15441554
session && resumingFromState ? [] : undefined;
1555+
let sessionInputItemsFiltered: AgentInputItem[] | undefined = undefined;
1556+
let sessionNewInputCounts: Map<string, number> | undefined =
1557+
session && resumingFromState ? new Map() : undefined;
1558+
const updateSessionInputItems = (
1559+
sourceItems: (AgentInputItem | undefined)[],
1560+
) => {
1561+
const filtered: AgentInputItem[] = [];
1562+
const counts = sessionNewInputCounts;
1563+
for (const item of sourceItems) {
1564+
if (!item) {
1565+
continue;
1566+
}
1567+
if (!counts) {
1568+
filtered.push(structuredClone(item));
1569+
continue;
1570+
}
1571+
const key = getAgentInputItemKey(item);
1572+
const remaining = counts.get(key) ?? 0;
1573+
if (remaining <= 0) {
1574+
continue;
1575+
}
1576+
counts.set(key, remaining - 1);
1577+
filtered.push(structuredClone(item));
1578+
}
1579+
if (filtered.length > 0) {
1580+
sessionInputItemsFiltered = filtered;
1581+
} else if (sessionInputItemsFiltered === undefined) {
1582+
sessionInputItemsFiltered = [];
1583+
}
1584+
};
1585+
1586+
const resolveSessionItemsForPersistence = () => {
1587+
if (sessionInputItemsFiltered !== undefined) {
1588+
return sessionInputItemsFiltered;
1589+
}
1590+
if (hasCallModelInputFilter) {
1591+
return undefined;
1592+
}
1593+
return sessionInputItemsToPersist;
1594+
};
15451595

15461596
let preparedInput: typeof input = input;
15471597
if (!(preparedInput instanceof RunState)) {
@@ -1555,22 +1605,30 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
15551605
const items = prepared.sessionItems ?? [];
15561606
// Clone the items that will be persisted so later mutations (filters, hooks) cannot desync history.
15571607
sessionInputItemsToPersist = items.map((item) => structuredClone(item));
1608+
sessionNewInputCounts = new Map();
1609+
for (const item of items) {
1610+
const key = getAgentInputItemKey(item);
1611+
sessionNewInputCounts.set(
1612+
key,
1613+
(sessionNewInputCounts.get(key) ?? 0) + 1,
1614+
);
1615+
}
15581616
}
15591617
}
15601618

15611619
let ensureStreamInputPersisted: (() => Promise<void>) | undefined;
1562-
if (
1563-
session &&
1564-
sessionInputItemsToPersist &&
1565-
sessionInputItemsToPersist.length > 0
1566-
) {
1620+
if (session) {
15671621
let persisted = false;
15681622
ensureStreamInputPersisted = async () => {
15691623
if (persisted) {
15701624
return;
15711625
}
1626+
const itemsToPersist = resolveSessionItemsForPersistence();
1627+
if (!itemsToPersist || itemsToPersist.length === 0) {
1628+
return;
1629+
}
15721630
persisted = true;
1573-
await saveStreamInputToSession(session, sessionInputItemsToPersist);
1631+
await saveStreamInputToSession(session, itemsToPersist);
15741632
};
15751633
}
15761634

@@ -1581,15 +1639,21 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
15811639
preparedInput,
15821640
effectiveOptions,
15831641
ensureStreamInputPersisted,
1642+
updateSessionInputItems,
15841643
);
15851644
return streamResult;
15861645
}
15871646
const runResult = await this.#runIndividualNonStream(
15881647
agent,
15891648
preparedInput,
15901649
effectiveOptions,
1650+
updateSessionInputItems,
1651+
);
1652+
await saveToSession(
1653+
session,
1654+
resolveSessionItemsForPersistence(),
1655+
runResult,
15911656
);
1592-
await saveToSession(session, sessionInputItemsToPersist, runResult);
15931657
return runResult;
15941658
};
15951659

0 commit comments

Comments
 (0)