Skip to content

Commit 2db3d00

Browse files
committed
fix: #1190 reconcile streamed function calls when server-managed runs abort
1 parent 06f425a commit 2db3d00

4 files changed

Lines changed: 289 additions & 0 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openai/agents-core': patch
3+
---
4+
5+
fix: #1190 reconcile streamed function calls when server-managed runs abort

packages/agents-core/src/run.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ import {
9393
prepareSandboxInterruptedTurnResume,
9494
type SandboxMemoryPersistenceContext,
9595
} from './runner/sandbox';
96+
import {
97+
buildAbortReconciliationInput,
98+
createStreamAbortReconciliationState,
99+
getAbortReconciliationPreviousResponseId,
100+
markAbortReconciliationComplete,
101+
recordStreamEventForAbortReconciliation,
102+
shouldReconcileStreamAbort,
103+
} from './runner/streamReconciliation';
96104

97105
export type {
98106
CallModelInputFilter,
@@ -1247,6 +1255,8 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
12471255
guardrailTracker.throwIfError();
12481256

12491257
let finalResponse: ModelResponse | undefined = undefined;
1258+
const abortReconciliationState =
1259+
createStreamAbortReconciliationState();
12501260
let inputMarked = false;
12511261
const markInputOnce = () => {
12521262
if (inputMarked || !serverConversationTracker) {
@@ -1266,6 +1276,63 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
12661276
);
12671277
inputMarked = true;
12681278
};
1279+
const reconcileStreamAbortIfNeeded = async () => {
1280+
if (
1281+
!serverConversationTracker ||
1282+
!shouldReconcileStreamAbort(abortReconciliationState)
1283+
) {
1284+
return;
1285+
}
1286+
1287+
const reconciliationInput = buildAbortReconciliationInput(
1288+
abortReconciliationState,
1289+
);
1290+
try {
1291+
const reconciliationResponse = await getResponseWithRetry(
1292+
preparedCall.model,
1293+
{
1294+
systemInstructions: preparedCall.modelInput.instructions,
1295+
prompt: preparedCall.prompt,
1296+
...(preparedCall.explictlyModelSet
1297+
? { overridePromptModel: true }
1298+
: {}),
1299+
input: reconciliationInput,
1300+
previousResponseId: getAbortReconciliationPreviousResponseId(
1301+
abortReconciliationState,
1302+
preparedCall,
1303+
),
1304+
conversationId: preparedCall.conversationId,
1305+
modelSettings: preparedCall.modelSettings,
1306+
tools: preparedCall.serializedTools,
1307+
toolsExplicitlyProvided: preparedCall.toolsExplicitlyProvided,
1308+
handoffs: preparedCall.serializedHandoffs,
1309+
outputType: convertAgentOutputTypeToSerializable(
1310+
currentAgent.outputType,
1311+
),
1312+
tracing: getTracing(
1313+
this.config.tracingDisabled,
1314+
this.config.traceIncludeSensitiveData,
1315+
),
1316+
},
1317+
);
1318+
markAbortReconciliationComplete(
1319+
abortReconciliationState,
1320+
reconciliationResponse,
1321+
);
1322+
serverConversationTracker.trackServerItems(
1323+
reconciliationResponse,
1324+
);
1325+
result.state.setConversationContext(
1326+
serverConversationTracker.conversationId,
1327+
serverConversationTracker.previousResponseId,
1328+
);
1329+
} catch (error) {
1330+
logger.debug(
1331+
'Failed to reconcile streamed function calls after abort.',
1332+
error,
1333+
);
1334+
}
1335+
};
12691336

12701337
sentInputToModel = true;
12711338
if (!delayStreamInputPersistence) {
@@ -1301,6 +1368,10 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
13011368
)) {
13021369
guardrailTracker.throwIfError();
13031370
markInputOnce();
1371+
recordStreamEventForAbortReconciliation(
1372+
abortReconciliationState,
1373+
event,
1374+
);
13041375
if (event.type === 'response_done') {
13051376
const parsed = StreamEventResponseCompleted.parse(event);
13061377
finalResponse = {
@@ -1315,6 +1386,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
13151386
// When the user's code exits a loop to consume the stream, we need to break
13161387
// this loop to prevent internal false errors and unnecessary processing
13171388
await awaitGuardrailsAndPersistInput();
1389+
await reconcileStreamAbortIfNeeded();
13181390
return;
13191391
}
13201392
result._addItem(new RunRawModelStreamEvent(event));
@@ -1325,6 +1397,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
13251397
markInputOnce();
13261398
}
13271399
await awaitGuardrailsAndPersistInput();
1400+
await reconcileStreamAbortIfNeeded();
13281401
return;
13291402
}
13301403
throw error;
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import type { ModelRequest, ModelResponse } from '../model';
2+
import type {
3+
FunctionCallItem,
4+
FunctionCallResultItem,
5+
StreamEvent,
6+
} from '../types/protocol';
7+
8+
type PendingStreamedFunctionCall = Pick<
9+
FunctionCallItem,
10+
'callId' | 'name' | 'namespace'
11+
>;
12+
13+
export type StreamAbortReconciliationState = {
14+
responseId?: string;
15+
pendingFunctionCalls: Map<string, PendingStreamedFunctionCall>;
16+
};
17+
18+
export function createStreamAbortReconciliationState(): StreamAbortReconciliationState {
19+
return {
20+
pendingFunctionCalls: new Map(),
21+
};
22+
}
23+
24+
export function recordStreamEventForAbortReconciliation(
25+
state: StreamAbortReconciliationState,
26+
event: StreamEvent,
27+
): void {
28+
if (event.type === 'response_done') {
29+
state.pendingFunctionCalls.clear();
30+
state.responseId = event.response.id;
31+
return;
32+
}
33+
34+
if (event.type !== 'model' || !isRecord(event.event)) {
35+
return;
36+
}
37+
38+
const rawEvent = event.event;
39+
if (
40+
rawEvent.type === 'response.created' &&
41+
isRecord(rawEvent.response) &&
42+
typeof rawEvent.response.id === 'string'
43+
) {
44+
state.responseId = rawEvent.response.id;
45+
return;
46+
}
47+
48+
if (
49+
rawEvent.type !== 'response.output_item.done' ||
50+
!isRecord(rawEvent.item)
51+
) {
52+
return;
53+
}
54+
55+
const item = rawEvent.item;
56+
if (item.type === 'function_call' && typeof item.call_id === 'string') {
57+
state.pendingFunctionCalls.set(item.call_id, {
58+
callId: item.call_id,
59+
name: typeof item.name === 'string' ? item.name : item.call_id,
60+
...(typeof item.namespace === 'string'
61+
? { namespace: item.namespace }
62+
: {}),
63+
});
64+
return;
65+
}
66+
67+
if (
68+
item.type === 'function_call_output' &&
69+
typeof item.call_id === 'string'
70+
) {
71+
state.pendingFunctionCalls.delete(item.call_id);
72+
}
73+
}
74+
75+
export function buildAbortReconciliationInput(
76+
state: StreamAbortReconciliationState,
77+
): FunctionCallResultItem[] {
78+
return Array.from(state.pendingFunctionCalls.values(), (toolCall) => ({
79+
type: 'function_call_result',
80+
name: toolCall.name,
81+
...(typeof toolCall.namespace === 'string'
82+
? { namespace: toolCall.namespace }
83+
: {}),
84+
callId: toolCall.callId,
85+
status: 'incomplete',
86+
output: { type: 'text', text: 'aborted' },
87+
}));
88+
}
89+
90+
export function getAbortReconciliationPreviousResponseId(
91+
state: StreamAbortReconciliationState,
92+
request: Pick<ModelRequest, 'conversationId' | 'previousResponseId'>,
93+
): string | undefined {
94+
if (request.conversationId) {
95+
return request.previousResponseId;
96+
}
97+
return state.responseId ?? request.previousResponseId;
98+
}
99+
100+
export function shouldReconcileStreamAbort(
101+
state: StreamAbortReconciliationState,
102+
): boolean {
103+
return state.pendingFunctionCalls.size > 0;
104+
}
105+
106+
export function markAbortReconciliationComplete(
107+
state: StreamAbortReconciliationState,
108+
response: ModelResponse | undefined,
109+
): void {
110+
state.pendingFunctionCalls.clear();
111+
if (response?.responseId) {
112+
state.responseId = response.responseId;
113+
}
114+
}
115+
116+
function isRecord(value: unknown): value is Record<string, unknown> {
117+
return typeof value === 'object' && value !== null;
118+
}

packages/agents-core/test/run.stream.test.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,53 @@ function getRequestInputItems(request: ModelRequest): AgentInputItem[] {
6060
return Array.isArray(request.input) ? request.input : [];
6161
}
6262

63+
class AbortAfterStreamedFunctionCallModel implements Model {
64+
public requests: ModelRequest[] = [];
65+
66+
constructor(private readonly responseId: string) {}
67+
68+
async getResponse(request: ModelRequest): Promise<ModelResponse> {
69+
this.requests.push(request);
70+
return {
71+
output: [fakeModelMessage('reconciled')],
72+
usage: new Usage(),
73+
responseId: 'resp-reconciled',
74+
};
75+
}
76+
77+
async *getStreamedResponse(
78+
request: ModelRequest,
79+
): AsyncIterable<StreamEvent> {
80+
this.requests.push(request);
81+
yield {
82+
type: 'model',
83+
event: {
84+
type: 'response.created',
85+
response: {
86+
id: this.responseId,
87+
},
88+
},
89+
};
90+
yield {
91+
type: 'model',
92+
event: {
93+
type: 'response.output_item.done',
94+
item: {
95+
type: 'function_call',
96+
id: 'fc_abort',
97+
call_id: 'call_abort',
98+
name: 'slow_tool',
99+
arguments: '{}',
100+
status: 'completed',
101+
},
102+
},
103+
};
104+
const error = new Error('aborted');
105+
error.name = 'AbortError';
106+
throw error;
107+
}
108+
}
109+
63110
// Test for unhandled rejection when stream loop throws
64111

65112
describe('Runner.run (streaming)', () => {
@@ -212,6 +259,52 @@ describe('Runner.run (streaming)', () => {
212259
expect(getEventListeners(retainedSignals[0], 'abort').length).toBe(0);
213260
});
214261

262+
it('reconciles streamed function calls on abort with conversationId', async () => {
263+
const model = new AbortAfterStreamedFunctionCallModel('resp-aborted');
264+
const agent = new Agent({ name: 'AbortReconcile', model });
265+
266+
const result = await run(agent, 'hi', {
267+
stream: true,
268+
conversationId: 'conv-abort',
269+
});
270+
271+
await result.completed;
272+
273+
expect(model.requests).toHaveLength(2);
274+
expect(model.requests[1].conversationId).toBe('conv-abort');
275+
expect(model.requests[1].signal).toBeUndefined();
276+
expect(getRequestInputItems(model.requests[1])).toEqual([
277+
expect.objectContaining({
278+
type: 'function_call_result',
279+
callId: 'call_abort',
280+
name: 'slow_tool',
281+
status: 'incomplete',
282+
output: { type: 'text', text: 'aborted' },
283+
}),
284+
]);
285+
});
286+
287+
it('uses the streamed response id when reconciling previousResponseId-only aborts', async () => {
288+
const model = new AbortAfterStreamedFunctionCallModel('resp-aborted');
289+
const agent = new Agent({ name: 'AbortPreviousResponse', model });
290+
291+
const result = await run(agent, 'hi', {
292+
stream: true,
293+
previousResponseId: 'resp-before-abort',
294+
});
295+
296+
await result.completed;
297+
298+
expect(model.requests).toHaveLength(2);
299+
expect(model.requests[1].conversationId).toBeUndefined();
300+
expect(model.requests[1].previousResponseId).toBe('resp-aborted');
301+
expect(getRequestInputItems(model.requests[1])[0]).toMatchObject({
302+
type: 'function_call_result',
303+
callId: 'call_abort',
304+
status: 'incomplete',
305+
});
306+
});
307+
215308
it('emits agent_updated_stream_event with new agent on handoff', async () => {
216309
class SimpleStreamingModel implements Model {
217310
constructor(private resp: ModelResponse) {}

0 commit comments

Comments
 (0)