Skip to content

Commit c767837

Browse files
committed
fix local codex review comment
- [P0] Track session diff deterministically — packages/agents-core/src/runImplementation.ts:1870-1884 In the session merge logic we decrement historyCounts before newInputCounts. If a sessionInputCallback reorders the combined list so that a newly supplied item (e.g. a repeated user message) appears before the matching historical entry, the new turn is treated as history and filtered out of appended. The run will therefore skip persisting that user input (sessionItems becomes empty), causing the session store to silently drop real customer messages whenever they match the shape of an older entry. This is a blocking data-loss bug for any callback that returns new items first or otherwise reorders items. We need to differentiate originals by identity (or at least consult newInputCounts
1 parent 252c919 commit c767837

File tree

2 files changed

+90
-3
lines changed

2 files changed

+90
-3
lines changed

packages/agents-core/src/runImplementation.ts

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,13 +1865,15 @@ export async function prepareInputItemsWithSession(
18651865

18661866
const historyCounts = buildItemFrequencyMap(history);
18671867
const newInputCounts = buildItemFrequencyMap(newInputItems);
1868+
const historyRefs = buildItemReferenceMap(history);
1869+
const newInputRefs = buildItemReferenceMap(newInputItems);
18681870

18691871
const appended: AgentInputItem[] = [];
18701872
for (const item of combined) {
18711873
const key = sessionItemKey(item);
1872-
const historyRemaining = historyCounts.get(key) ?? 0;
1873-
if (historyRemaining > 0) {
1874-
historyCounts.set(key, historyRemaining - 1);
1874+
if (consumeReference(newInputRefs, key, item)) {
1875+
decrementCount(newInputCounts, key);
1876+
appended.push(item);
18751877
continue;
18761878
}
18771879

@@ -1882,6 +1884,17 @@ export async function prepareInputItemsWithSession(
18821884
continue;
18831885
}
18841886

1887+
if (consumeReference(historyRefs, key, item)) {
1888+
decrementCount(historyCounts, key);
1889+
continue;
1890+
}
1891+
1892+
const historyRemaining = historyCounts.get(key) ?? 0;
1893+
if (historyRemaining > 0) {
1894+
historyCounts.set(key, historyRemaining - 1);
1895+
continue;
1896+
}
1897+
18851898
appended.push(item);
18861899
}
18871900

@@ -1900,6 +1913,51 @@ function buildItemFrequencyMap(items: AgentInputItem[]): Map<string, number> {
19001913
return counts;
19011914
}
19021915

1916+
function buildItemReferenceMap(
1917+
items: AgentInputItem[],
1918+
): Map<string, AgentInputItem[]> {
1919+
const refs = new Map<string, AgentInputItem[]>();
1920+
for (const item of items) {
1921+
const key = sessionItemKey(item);
1922+
const list = refs.get(key);
1923+
if (list) {
1924+
list.push(item);
1925+
} else {
1926+
refs.set(key, [item]);
1927+
}
1928+
}
1929+
return refs;
1930+
}
1931+
1932+
function consumeReference(
1933+
refs: Map<string, AgentInputItem[]>,
1934+
key: string,
1935+
target: AgentInputItem,
1936+
): boolean {
1937+
const candidates = refs.get(key);
1938+
if (!candidates || candidates.length === 0) {
1939+
return false;
1940+
}
1941+
const index = candidates.findIndex((candidate) => candidate === target);
1942+
if (index === -1) {
1943+
return false;
1944+
}
1945+
candidates.splice(index, 1);
1946+
if (candidates.length === 0) {
1947+
refs.delete(key);
1948+
}
1949+
return true;
1950+
}
1951+
1952+
function decrementCount(map: Map<string, number>, key: string) {
1953+
const remaining = (map.get(key) ?? 0) - 1;
1954+
if (remaining <= 0) {
1955+
map.delete(key);
1956+
} else {
1957+
map.set(key, remaining);
1958+
}
1959+
}
1960+
19031961
function sessionItemKey(item: AgentInputItem): string {
19041962
return JSON.stringify(item, sessionSerializationReplacer);
19051963
}

packages/agents-core/test/run.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,35 @@ describe('Runner.run', () => {
848848
expect(getFirstTextContent(persistedUsers[0])).toBe('Fresh input');
849849
});
850850

851+
it('persists reordered new items ahead of matching history', async () => {
852+
const model = new RecordingModel([
853+
{
854+
...TEST_MODEL_RESPONSE_BASIC,
855+
output: [fakeModelMessage('reordered response')],
856+
},
857+
]);
858+
const historyMessage = user('Repeatable message');
859+
const newMessage = user('Repeatable message');
860+
const session = new MemorySession([historyMessage]);
861+
const agent = new Agent({ name: 'ReorderedSession', model });
862+
const runner = new Runner({
863+
sessionInputCallback: (history, newItems) => newItems.concat(history),
864+
});
865+
866+
await runner.run(agent, [newMessage], { session });
867+
868+
expect(session.added).toHaveLength(1);
869+
const [persisted] = session.added;
870+
const persistedUsers = persisted.filter(
871+
(item): item is protocol.UserMessageItem =>
872+
item.type === 'message' && 'role' in item && item.role === 'user',
873+
);
874+
expect(persistedUsers).toHaveLength(1);
875+
expect(getFirstTextContent(persistedUsers[0])).toBe(
876+
'Repeatable message',
877+
);
878+
});
879+
851880
it('persists binary payloads that share prefixes with history', async () => {
852881
const model = new RecordingModel([
853882
{

0 commit comments

Comments
 (0)