Skip to content

Commit 95d3dfa

Browse files
committed
fix: emit agent_end lifecycle event for streaming agents
- Fix streaming agents not calling agent_end lifecycle hook when completing execution - Add agent_end event emissions in streaming loop to match non-streaming behavior - Add tests to verify agent_end events are emitted for both streaming and non-streaming agents - Resolves issue where users could not collect usage information from streaming agent runs Fixes #371
1 parent 6f1677c commit 95d3dfa

File tree

4 files changed

+113
-0
lines changed

4 files changed

+113
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@openai/agents-core': patch
3+
---
4+
5+
Fix streaming agents not calling agent_end lifecycle hook
6+
7+
Streaming agents were not emitting the `agent_end` lifecycle event when completing execution, while non-streaming agents were correctly emitting this event. This fix ensures that both the agent instance and the runner emit the `agent_end` event for streaming agents when they produce a final output, maintaining consistency with the non-streaming behavior.
8+
9+
This resolves the issue where users could not collect usage information or perform cleanup tasks at the end of streaming agent runs using the `agent_end` event handler.

packages/agents-core/src/run.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,17 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
832832
result.state,
833833
result.state._currentStep.output,
834834
);
835+
this.emit(
836+
'agent_end',
837+
result.state._context,
838+
currentAgent,
839+
result.state._currentStep.output,
840+
);
841+
currentAgent.emit(
842+
'agent_end',
843+
result.state._context,
844+
result.state._currentStep.output,
845+
);
835846
return;
836847
} else if (
837848
result.state._currentStep.type === 'next_step_interruption'

packages/agents-core/test/run.stream.test.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it, expect, beforeAll } from 'vitest';
22
import {
33
Agent,
44
run,
5+
Runner,
56
setDefaultModelProvider,
67
setTracingDisabled,
78
Usage,
@@ -115,4 +116,67 @@ describe('Runner.run (streaming)', () => {
115116
);
116117
expect(update?.agent).toBe(agentB);
117118
});
119+
120+
it('emits agent_end lifecycle event for streaming agents', async () => {
121+
class SimpleStreamingModel implements Model {
122+
constructor(private resp: ModelResponse) {}
123+
async getResponse(_req: ModelRequest): Promise<ModelResponse> {
124+
return this.resp;
125+
}
126+
async *getStreamedResponse(): AsyncIterable<StreamEvent> {
127+
yield {
128+
type: 'response_done',
129+
response: {
130+
id: 'r',
131+
usage: {
132+
requests: 1,
133+
inputTokens: 0,
134+
outputTokens: 0,
135+
totalTokens: 0,
136+
},
137+
output: this.resp.output,
138+
},
139+
} as any;
140+
}
141+
}
142+
143+
const agent = new Agent({
144+
name: 'TestAgent',
145+
model: new SimpleStreamingModel({
146+
output: [fakeModelMessage('Final output')],
147+
usage: new Usage(),
148+
}),
149+
});
150+
151+
// Track agent_end events on both the agent and runner
152+
const agentEndEvents: Array<{ context: any; output: string }> = [];
153+
const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = [];
154+
155+
agent.on('agent_end', (context, output) => {
156+
agentEndEvents.push({ context, output });
157+
});
158+
159+
// Create a runner instance to listen for events
160+
const runner = new Runner();
161+
runner.on('agent_end', (context, agent, output) => {
162+
runnerEndEvents.push({ context, agent, output });
163+
});
164+
165+
const result = await runner.run(agent, 'test input', { stream: true });
166+
167+
// Consume the stream
168+
const events: RunStreamEvent[] = [];
169+
for await (const e of result.toStream()) {
170+
events.push(e);
171+
}
172+
await result.completed;
173+
174+
// Verify agent_end was called on both agent and runner
175+
expect(agentEndEvents).toHaveLength(1);
176+
expect(agentEndEvents[0].output).toBe('Final output');
177+
178+
expect(runnerEndEvents).toHaveLength(1);
179+
expect(runnerEndEvents[0].agent).toBe(agent);
180+
expect(runnerEndEvents[0].output).toBe('Final output');
181+
});
118182
});

packages/agents-core/test/run.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,35 @@ describe('Runner.run', () => {
140140

141141
await expect(run(agent, 'fail')).rejects.toThrow('No response found');
142142
});
143+
144+
it('emits agent_end lifecycle event for non-streaming agents', async () => {
145+
const agent = new Agent({
146+
name: 'TestAgent',
147+
});
148+
149+
// Track agent_end events on both the agent and runner
150+
const agentEndEvents: Array<{ context: any; output: string }> = [];
151+
const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = [];
152+
153+
agent.on('agent_end', (context, output) => {
154+
agentEndEvents.push({ context, output });
155+
});
156+
157+
const runner = new Runner();
158+
runner.on('agent_end', (context, agent, output) => {
159+
runnerEndEvents.push({ context, agent, output });
160+
});
161+
162+
const result = await runner.run(agent, 'test input');
163+
164+
// Verify agent_end was called on both agent and runner
165+
expect(agentEndEvents).toHaveLength(1);
166+
expect(agentEndEvents[0].output).toBe('Hello World');
167+
168+
expect(runnerEndEvents).toHaveLength(1);
169+
expect(runnerEndEvents[0].agent).toBe(agent);
170+
expect(runnerEndEvents[0].output).toBe('Hello World');
171+
});
143172
});
144173

145174
describe('additional scenarios', () => {

0 commit comments

Comments
 (0)