Skip to content

Commit 7fc0510

Browse files
committed
feat: add step start and finish events
1 parent fa7d043 commit 7fc0510

File tree

29 files changed

+1338
-23
lines changed

29 files changed

+1338
-23
lines changed

docs/core-concepts/streaming-output.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ function ChatComponent() {
179179
setIsComplete(false);
180180
},
181181

182+
'.step_start': (data) => {
183+
console.log('Step started:', data);
184+
// A new generation cycle is beginning
185+
},
186+
182187
'.text_start': (data) => {
183188
console.log('Text start event received:', data);
184189
setCurrentMessage('');
@@ -202,6 +207,11 @@ function ChatComponent() {
202207
console.log('Tool result:', data.result);
203208
},
204209

210+
'.step_finish': (data) => {
211+
console.log('Step finished:', data);
212+
// Generation cycle complete, may be followed by another step
213+
},
214+
205215
'.stream_end': (data) => {
206216
console.log('Stream ended:', data.finish_reason);
207217
setIsComplete(true);
@@ -246,6 +256,7 @@ All streaming approaches emit the same core events with consistent data structur
246256
### Available Events
247257

248258
- **`stream_start`** - Stream initialization with model and provider info
259+
- **`step_start`** - Beginning of a generation step (emitted before each AI response cycle)
249260
- **`text_start`** - Beginning of a text message
250261
- **`text_delta`** - Incremental text chunks as they're generated
251262
- **`text_complete`** - End of a complete text message
@@ -257,9 +268,13 @@ All streaming approaches emit the same core events with consistent data structur
257268
- **`tool_call_delta`** - Incremental tool call params chunks as they're generated
258269
- **`artifact`** - Binary artifacts produced by tools (images, audio, files)
259270
- **`provider_tool_event`** - Provider-specific tool events (e.g., image generation, web search)
271+
- **`step_finish`** - End of a generation step (emitted after tool calls or before stream end)
260272
- **`error`** - Error handling with recovery information
261273
- **`stream_end`** - Stream completion with usage statistics
262274

275+
> [!TIP]
276+
> **Understanding Steps**: A "step" represents one cycle of AI generation. In a simple request without tools, there's typically one step. When using tools, each cycle of "AI generates → tools execute → AI continues" creates a new step. Use `step_start` and `step_finish` events to track these cycles in multi-turn tool interactions.
277+
263278
### Event Data Examples
264279

265280
Based on actual streaming output:
@@ -277,6 +292,12 @@ Based on actual streaming output:
277292
}
278293
}
279294

295+
// step_start event
296+
{
297+
"id": "anthropic_evt_abc123step",
298+
"timestamp": 1756412888
299+
}
300+
280301
// text_start event
281302
{
282303
"id": "anthropic_evt_8YI9ULcftpFtHzh3",
@@ -338,6 +359,12 @@ Based on actual streaming output:
338359
}
339360
}
340361

362+
// step_finish event
363+
{
364+
"id": "anthropic_evt_def456step",
365+
"timestamp": 1756412895
366+
}
367+
341368
// stream_end event
342369
{
343370
"id": "anthropic_evt_BZ3rqDYyprnywNyL",
@@ -673,12 +700,16 @@ The Vercel AI SDK format provides structured streaming data:
673700
```
674701
data: {"type":"start","messageId":"anthropic_evt_NPbGJs7D0oQhvz2K"}
675702
703+
data: {"type":"start-step"}
704+
676705
data: {"type":"text-start","id":"msg_013P3F8KkVG3Qasjeay3NUmY"}
677706
678707
data: {"type":"text-delta","id":"msg_013P3F8KkVG3Qasjeay3NUmY","delta":"Hello"}
679708
680709
data: {"type":"text-end","id":"msg_013P3F8KkVG3Qasjeay3NUmY"}
681710
711+
data: {"type":"finish-step"}
712+
682713
data: {"type":"finish","messageMetadata":{"finishReason":"stop","usage":{"promptTokens":1998,"completionTokens":288}}}
683714
684715
data: [DONE]

src/Enums/StreamEventType.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ enum StreamEventType: string
2121
case Artifact = 'artifact';
2222
case Error = 'error';
2323
case StreamEnd = 'stream_end';
24+
case StepStart = 'step_start';
25+
case StepFinish = 'step_finish';
2426
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prism\Prism\Events\Broadcasting;
6+
7+
class StepFinishBroadcast extends StreamEventBroadcast {}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prism\Prism\Events\Broadcasting;
6+
7+
class StepStartBroadcast extends StreamEventBroadcast {}

src/Providers/Anthropic/Handlers/Stream.php

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use Prism\Prism\Streaming\Events\CitationEvent;
1919
use Prism\Prism\Streaming\Events\ErrorEvent;
2020
use Prism\Prism\Streaming\Events\ProviderToolEvent;
21+
use Prism\Prism\Streaming\Events\StepFinishEvent;
22+
use Prism\Prism\Streaming\Events\StepStartEvent;
2123
use Prism\Prism\Streaming\Events\StreamEndEvent;
2224
use Prism\Prism\Streaming\Events\StreamEvent;
2325
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -79,18 +81,28 @@ protected function processStream(Response $response, Request $request, int $dept
7981
$streamEvent = $this->processEvent($event);
8082

8183
if ($streamEvent instanceof Generator) {
82-
yield from $streamEvent;
84+
foreach ($streamEvent as $event) {
85+
yield $event;
86+
}
8387
} elseif ($streamEvent instanceof StreamEvent) {
8488
yield $streamEvent;
8589
}
8690
}
8791

8892
if ($this->state->hasToolCalls()) {
89-
yield from $this->handleToolCalls($request, $depth);
93+
foreach ($this->handleToolCalls($request, $depth) as $item) {
94+
yield $item;
95+
}
9096

9197
return;
9298
}
9399

100+
$this->state->markStepFinished();
101+
yield new StepFinishEvent(
102+
id: EventID::generate(),
103+
timestamp: time()
104+
);
105+
94106
yield $this->emitStreamEndEvent();
95107
}
96108

@@ -115,8 +127,9 @@ protected function processEvent(array $event): StreamEvent|Generator|null
115127

116128
/**
117129
* @param array<string, mixed> $event
130+
* @return Generator<StreamEvent>
118131
*/
119-
protected function handleMessageStart(array $event): ?StreamStartEvent
132+
protected function handleMessageStart(array $event): Generator
120133
{
121134
$message = $event['message'] ?? [];
122135
$this->state->withMessageId($message['id'] ?? EventID::generate());
@@ -132,18 +145,25 @@ protected function handleMessageStart(array $event): ?StreamStartEvent
132145
}
133146

134147
// Only emit StreamStartEvent once per streaming session
135-
if (! $this->state->shouldEmitStreamStart()) {
136-
return null;
148+
if ($this->state->shouldEmitStreamStart()) {
149+
$this->state->markStreamStarted();
150+
151+
yield new StreamStartEvent(
152+
id: EventID::generate(),
153+
timestamp: time(),
154+
model: $message['model'] ?? 'unknown',
155+
provider: 'anthropic'
156+
);
137157
}
138158

139-
$this->state->markStreamStarted();
159+
if ($this->state->shouldEmitStepStart()) {
160+
$this->state->markStepStarted();
140161

141-
return new StreamStartEvent(
142-
id: EventID::generate(),
143-
timestamp: time(),
144-
model: $message['model'] ?? 'unknown',
145-
provider: 'anthropic'
146-
);
162+
yield new StepStartEvent(
163+
id: EventID::generate(),
164+
timestamp: time()
165+
);
166+
}
147167
}
148168

149169
/**
@@ -544,6 +564,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator
544564

545565
$request->addMessage(new ToolResultMessage($toolResults));
546566

567+
// Emit step finish after tool calls
568+
$this->state->markStepFinished();
569+
yield new StepFinishEvent(
570+
id: EventID::generate(),
571+
timestamp: time()
572+
);
573+
547574
// Continue streaming if within step limit
548575
$depth++;
549576
if ($depth < $request->maxSteps()) {

src/Providers/DeepSeek/Handlers/Stream.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Prism\Prism\Providers\DeepSeek\Maps\ToolMap;
2222
use Prism\Prism\Streaming\EventID;
2323
use Prism\Prism\Streaming\Events\ArtifactEvent;
24+
use Prism\Prism\Streaming\Events\StepFinishEvent;
25+
use Prism\Prism\Streaming\Events\StepStartEvent;
2426
use Prism\Prism\Streaming\Events\StreamEndEvent;
2527
use Prism\Prism\Streaming\Events\StreamEvent;
2628
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -96,6 +98,15 @@ protected function processStream(Response $response, Request $request, int $dept
9698
);
9799
}
98100

101+
if ($this->state->shouldEmitStepStart()) {
102+
$this->state->markStepStarted();
103+
104+
yield new StepStartEvent(
105+
id: EventID::generate(),
106+
timestamp: time()
107+
);
108+
}
109+
99110
if ($this->hasToolCalls($data)) {
100111
$toolCalls = $this->extractToolCalls($data, $toolCalls);
101112

@@ -214,6 +225,12 @@ protected function processStream(Response $response, Request $request, int $dept
214225
return;
215226
}
216227

228+
$this->state->markStepFinished();
229+
yield new StepFinishEvent(
230+
id: EventID::generate(),
231+
timestamp: time()
232+
);
233+
217234
yield new StreamEndEvent(
218235
id: EventID::generate(),
219236
timestamp: time(),
@@ -381,6 +398,12 @@ protected function handleToolCalls(Request $request, string $text, array $toolCa
381398
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
382399
$request->addMessage(new ToolResultMessage($toolResults));
383400

401+
$this->state->markStepFinished();
402+
yield new StepFinishEvent(
403+
id: EventID::generate(),
404+
timestamp: time()
405+
);
406+
384407
$this->state->resetTextState();
385408
$this->state->withMessageId(EventID::generate());
386409

src/Providers/Gemini/Handlers/Stream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use Prism\Prism\Providers\Gemini\Maps\ToolMap;
1919
use Prism\Prism\Streaming\EventID;
2020
use Prism\Prism\Streaming\Events\ArtifactEvent;
21+
use Prism\Prism\Streaming\Events\StepFinishEvent;
22+
use Prism\Prism\Streaming\Events\StepStartEvent;
2123
use Prism\Prism\Streaming\Events\StreamEndEvent;
2224
use Prism\Prism\Streaming\Events\StreamEvent;
2325
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -100,6 +102,16 @@ protected function processStream(Response $response, Request $request, int $dept
100102
$this->state->markStreamStarted();
101103
}
102104

105+
// Emit step start event once per step
106+
if ($this->state->shouldEmitStepStart()) {
107+
$this->state->markStepStarted();
108+
109+
yield new StepStartEvent(
110+
id: EventID::generate(),
111+
timestamp: time()
112+
);
113+
}
114+
103115
// Update usage data from each chunk
104116
$this->state->withUsage($this->extractUsage($data, $request));
105117

@@ -219,6 +231,13 @@ protected function processStream(Response $response, Request $request, int $dept
219231
return;
220232
}
221233

234+
// Emit step finish before stream end
235+
$this->state->markStepFinished();
236+
yield new StepFinishEvent(
237+
id: EventID::generate(),
238+
timestamp: time()
239+
);
240+
222241
yield new StreamEndEvent(
223242
id: EventID::generate(),
224243
timestamp: time(),
@@ -356,6 +375,13 @@ protected function handleToolCalls(
356375
$request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls));
357376
$request->addMessage(new ToolResultMessage($toolResults));
358377

378+
// Emit step finish after tool calls
379+
$this->state->markStepFinished();
380+
yield new StepFinishEvent(
381+
id: EventID::generate(),
382+
timestamp: time()
383+
);
384+
359385
$depth++;
360386
if ($depth < $request->maxSteps()) {
361387
$previousUsage = $this->state->usage();

src/Providers/Groq/Handlers/Stream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
use Prism\Prism\Streaming\EventID;
2323
use Prism\Prism\Streaming\Events\ArtifactEvent;
2424
use Prism\Prism\Streaming\Events\ErrorEvent;
25+
use Prism\Prism\Streaming\Events\StepFinishEvent;
26+
use Prism\Prism\Streaming\Events\StepStartEvent;
2527
use Prism\Prism\Streaming\Events\StreamEndEvent;
2628
use Prism\Prism\Streaming\Events\StreamEvent;
2729
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -96,6 +98,16 @@ protected function processStream(Response $response, Request $request, int $dept
9698
);
9799
}
98100

101+
// Emit step start event once per step
102+
if ($this->state->shouldEmitStepStart()) {
103+
$this->state->markStepStarted();
104+
105+
yield new StepStartEvent(
106+
id: EventID::generate(),
107+
timestamp: time()
108+
);
109+
}
110+
99111
if ($this->hasError($data)) {
100112
yield from $this->handleErrors($data, $request);
101113

@@ -168,6 +180,13 @@ protected function processStream(Response $response, Request $request, int $dept
168180
}
169181
}
170182

183+
// Emit step finish before stream end
184+
$this->state->markStepFinished();
185+
yield new StepFinishEvent(
186+
id: EventID::generate(),
187+
timestamp: time()
188+
);
189+
171190
yield new StreamEndEvent(
172191
id: EventID::generate(),
173192
timestamp: time(),
@@ -275,6 +294,13 @@ protected function handleToolCalls(
275294
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
276295
$request->addMessage(new ToolResultMessage($toolResults));
277296

297+
// Emit step finish after tool calls
298+
$this->state->markStepFinished();
299+
yield new StepFinishEvent(
300+
id: EventID::generate(),
301+
timestamp: time()
302+
);
303+
278304
// Reset text state for next response
279305
$this->state->resetTextState();
280306
$this->state->withMessageId(EventID::generate());

0 commit comments

Comments
 (0)