Skip to content

Commit bf825c2

Browse files
committed
feat: add step start and finish events
1 parent d52939e commit bf825c2

File tree

28 files changed

+1307
-23
lines changed

28 files changed

+1307
-23
lines changed

src/Enums/StreamEventType.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ enum StreamEventType: string
2020
case Citation = 'citation';
2121
case Error = 'error';
2222
case StreamEnd = 'stream_end';
23+
case StepStart = 'step_start';
24+
case StepFinish = 'step_finish';
2325
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prism\Prism\Events\Broadcasting;
6+
7+
class StepFinishBroadcast extends StreamEventBroadcast {}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prism\Prism\Events\Broadcasting;
6+
7+
class StepStartBroadcast extends StreamEventBroadcast {}

src/Providers/Anthropic/Handlers/Stream.php

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
use Prism\Prism\Streaming\Events\CitationEvent;
1818
use Prism\Prism\Streaming\Events\ErrorEvent;
1919
use Prism\Prism\Streaming\Events\ProviderToolEvent;
20+
use Prism\Prism\Streaming\Events\StepFinishEvent;
21+
use Prism\Prism\Streaming\Events\StepStartEvent;
2022
use Prism\Prism\Streaming\Events\StreamEndEvent;
2123
use Prism\Prism\Streaming\Events\StreamEvent;
2224
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -77,18 +79,28 @@ protected function processStream(Response $response, Request $request, int $dept
7779
$streamEvent = $this->processEvent($event);
7880

7981
if ($streamEvent instanceof Generator) {
80-
yield from $streamEvent;
82+
foreach ($streamEvent as $event) {
83+
yield $event;
84+
}
8185
} elseif ($streamEvent instanceof StreamEvent) {
8286
yield $streamEvent;
8387
}
8488
}
8589

8690
if ($this->state->hasToolCalls()) {
87-
yield from $this->handleToolCalls($request, $depth);
91+
foreach ($this->handleToolCalls($request, $depth) as $item) {
92+
yield $item;
93+
}
8894

8995
return;
9096
}
9197

98+
$this->state->markStepFinished();
99+
yield new StepFinishEvent(
100+
id: EventID::generate(),
101+
timestamp: time()
102+
);
103+
92104
yield $this->emitStreamEndEvent();
93105
}
94106

@@ -113,8 +125,9 @@ protected function processEvent(array $event): StreamEvent|Generator|null
113125

114126
/**
115127
* @param array<string, mixed> $event
128+
* @return Generator<StreamEvent>
116129
*/
117-
protected function handleMessageStart(array $event): ?StreamStartEvent
130+
protected function handleMessageStart(array $event): Generator
118131
{
119132
$message = $event['message'] ?? [];
120133
$this->state->withMessageId($message['id'] ?? EventID::generate());
@@ -130,18 +143,25 @@ protected function handleMessageStart(array $event): ?StreamStartEvent
130143
}
131144

132145
// Only emit StreamStartEvent once per streaming session
133-
if (! $this->state->shouldEmitStreamStart()) {
134-
return null;
146+
if ($this->state->shouldEmitStreamStart()) {
147+
$this->state->markStreamStarted();
148+
149+
yield new StreamStartEvent(
150+
id: EventID::generate(),
151+
timestamp: time(),
152+
model: $message['model'] ?? 'unknown',
153+
provider: 'anthropic'
154+
);
135155
}
136156

137-
$this->state->markStreamStarted();
157+
if ($this->state->shouldEmitStepStart()) {
158+
$this->state->markStepStarted();
138159

139-
return new StreamStartEvent(
140-
id: EventID::generate(),
141-
timestamp: time(),
142-
model: $message['model'] ?? 'unknown',
143-
provider: 'anthropic'
144-
);
160+
yield new StepStartEvent(
161+
id: EventID::generate(),
162+
timestamp: time()
163+
);
164+
}
145165
}
146166

147167
/**
@@ -526,6 +546,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator
526546

527547
$request->addMessage(new ToolResultMessage($toolResults));
528548

549+
// Emit step finish after tool calls
550+
$this->state->markStepFinished();
551+
yield new StepFinishEvent(
552+
id: EventID::generate(),
553+
timestamp: time()
554+
);
555+
529556
// Continue streaming if within step limit
530557
$depth++;
531558
if ($depth < $request->maxSteps()) {

src/Providers/DeepSeek/Handlers/Stream.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
use Prism\Prism\Providers\DeepSeek\Maps\ToolChoiceMap;
2121
use Prism\Prism\Providers\DeepSeek\Maps\ToolMap;
2222
use Prism\Prism\Streaming\EventID;
23+
use Prism\Prism\Streaming\Events\StepFinishEvent;
24+
use Prism\Prism\Streaming\Events\StepStartEvent;
2325
use Prism\Prism\Streaming\Events\StreamEndEvent;
2426
use Prism\Prism\Streaming\Events\StreamEvent;
2527
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -95,6 +97,15 @@ protected function processStream(Response $response, Request $request, int $dept
9597
);
9698
}
9799

100+
if ($this->state->shouldEmitStepStart()) {
101+
$this->state->markStepStarted();
102+
103+
yield new StepStartEvent(
104+
id: EventID::generate(),
105+
timestamp: time()
106+
);
107+
}
108+
98109
if ($this->hasToolCalls($data)) {
99110
$toolCalls = $this->extractToolCalls($data, $toolCalls);
100111

@@ -213,6 +224,12 @@ protected function processStream(Response $response, Request $request, int $dept
213224
return;
214225
}
215226

227+
$this->state->markStepFinished();
228+
yield new StepFinishEvent(
229+
id: EventID::generate(),
230+
timestamp: time()
231+
);
232+
216233
yield new StreamEndEvent(
217234
id: EventID::generate(),
218235
timestamp: time(),
@@ -369,6 +386,12 @@ protected function handleToolCalls(Request $request, string $text, array $toolCa
369386
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
370387
$request->addMessage(new ToolResultMessage($toolResults));
371388

389+
$this->state->markStepFinished();
390+
yield new StepFinishEvent(
391+
id: EventID::generate(),
392+
timestamp: time()
393+
);
394+
372395
$this->state->resetTextState();
373396
$this->state->withMessageId(EventID::generate());
374397

src/Providers/Gemini/Handlers/Stream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
use Prism\Prism\Providers\Gemini\Maps\ToolChoiceMap;
1818
use Prism\Prism\Providers\Gemini\Maps\ToolMap;
1919
use Prism\Prism\Streaming\EventID;
20+
use Prism\Prism\Streaming\Events\StepFinishEvent;
21+
use Prism\Prism\Streaming\Events\StepStartEvent;
2022
use Prism\Prism\Streaming\Events\StreamEndEvent;
2123
use Prism\Prism\Streaming\Events\StreamEvent;
2224
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -98,6 +100,16 @@ protected function processStream(Response $response, Request $request, int $dept
98100
$this->state->markStreamStarted();
99101
}
100102

103+
// Emit step start event once per step
104+
if ($this->state->shouldEmitStepStart()) {
105+
$this->state->markStepStarted();
106+
107+
yield new StepStartEvent(
108+
id: EventID::generate(),
109+
timestamp: time()
110+
);
111+
}
112+
101113
// Update usage data from each chunk
102114
$this->state->withUsage($this->extractUsage($data, $request));
103115

@@ -217,6 +229,13 @@ protected function processStream(Response $response, Request $request, int $dept
217229
return;
218230
}
219231

232+
// Emit step finish before stream end
233+
$this->state->markStepFinished();
234+
yield new StepFinishEvent(
235+
id: EventID::generate(),
236+
timestamp: time()
237+
);
238+
220239
yield new StreamEndEvent(
221240
id: EventID::generate(),
222241
timestamp: time(),
@@ -338,6 +357,13 @@ protected function handleToolCalls(
338357
$request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls));
339358
$request->addMessage(new ToolResultMessage($toolResults));
340359

360+
// Emit step finish after tool calls
361+
$this->state->markStepFinished();
362+
yield new StepFinishEvent(
363+
id: EventID::generate(),
364+
timestamp: time()
365+
);
366+
341367
$depth++;
342368
if ($depth < $request->maxSteps()) {
343369
$previousUsage = $this->state->usage();

src/Providers/Groq/Handlers/Stream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Prism\Prism\Providers\Groq\Maps\ToolMap;
2222
use Prism\Prism\Streaming\EventID;
2323
use Prism\Prism\Streaming\Events\ErrorEvent;
24+
use Prism\Prism\Streaming\Events\StepFinishEvent;
25+
use Prism\Prism\Streaming\Events\StepStartEvent;
2426
use Prism\Prism\Streaming\Events\StreamEndEvent;
2527
use Prism\Prism\Streaming\Events\StreamEvent;
2628
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -95,6 +97,16 @@ protected function processStream(Response $response, Request $request, int $dept
9597
);
9698
}
9799

100+
// Emit step start event once per step
101+
if ($this->state->shouldEmitStepStart()) {
102+
$this->state->markStepStarted();
103+
104+
yield new StepStartEvent(
105+
id: EventID::generate(),
106+
timestamp: time()
107+
);
108+
}
109+
98110
if ($this->hasError($data)) {
99111
yield from $this->handleErrors($data, $request);
100112

@@ -167,6 +179,13 @@ protected function processStream(Response $response, Request $request, int $dept
167179
}
168180
}
169181

182+
// Emit step finish before stream end
183+
$this->state->markStepFinished();
184+
yield new StepFinishEvent(
185+
id: EventID::generate(),
186+
timestamp: time()
187+
);
188+
170189
yield new StreamEndEvent(
171190
id: EventID::generate(),
172191
timestamp: time(),
@@ -263,6 +282,13 @@ protected function handleToolCalls(
263282
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
264283
$request->addMessage(new ToolResultMessage($toolResults));
265284

285+
// Emit step finish after tool calls
286+
$this->state->markStepFinished();
287+
yield new StepFinishEvent(
288+
id: EventID::generate(),
289+
timestamp: time()
290+
);
291+
266292
// Reset text state for next response
267293
$this->state->resetTextState();
268294
$this->state->withMessageId(EventID::generate());

src/Providers/Mistral/Handlers/Stream.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
use Prism\Prism\Providers\Mistral\Maps\ToolChoiceMap;
2121
use Prism\Prism\Providers\Mistral\Maps\ToolMap;
2222
use Prism\Prism\Streaming\EventID;
23+
use Prism\Prism\Streaming\Events\StepFinishEvent;
24+
use Prism\Prism\Streaming\Events\StepStartEvent;
2325
use Prism\Prism\Streaming\Events\StreamEndEvent;
2426
use Prism\Prism\Streaming\Events\StreamEvent;
2527
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -94,6 +96,15 @@ protected function processStream(Response $response, Request $request, int $dept
9496
);
9597
}
9698

99+
if ($this->state->shouldEmitStepStart()) {
100+
$this->state->markStepStarted();
101+
102+
yield new StepStartEvent(
103+
id: EventID::generate(),
104+
timestamp: time()
105+
);
106+
}
107+
97108
if ($this->hasToolCalls($data)) {
98109
$toolCalls = $this->extractToolCalls($data, $toolCalls);
99110

@@ -172,6 +183,12 @@ protected function processStream(Response $response, Request $request, int $dept
172183
}
173184
}
174185

186+
$this->state->markStepFinished();
187+
yield new StepFinishEvent(
188+
id: EventID::generate(),
189+
timestamp: time()
190+
);
191+
175192
yield new StreamEndEvent(
176193
id: EventID::generate(),
177194
timestamp: time(),
@@ -259,6 +276,12 @@ protected function handleToolCalls(
259276
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
260277
$request->addMessage(new ToolResultMessage($toolResults));
261278

279+
$this->state->markStepFinished();
280+
yield new StepFinishEvent(
281+
id: EventID::generate(),
282+
timestamp: time()
283+
);
284+
262285
$this->state->resetTextState();
263286
$this->state->withMessageId(EventID::generate());
264287

src/Providers/Ollama/Handlers/Stream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
use Prism\Prism\Providers\Ollama\Maps\ToolMap;
1818
use Prism\Prism\Providers\Ollama\ValueObjects\OllamaStreamState;
1919
use Prism\Prism\Streaming\EventID;
20+
use Prism\Prism\Streaming\Events\StepFinishEvent;
21+
use Prism\Prism\Streaming\Events\StepStartEvent;
2022
use Prism\Prism\Streaming\Events\StreamEndEvent;
2123
use Prism\Prism\Streaming\Events\StreamEvent;
2224
use Prism\Prism\Streaming\Events\StreamStartEvent;
@@ -90,6 +92,16 @@ protected function processStream(Response $response, Request $request, int $dept
9092
$this->state->markStreamStarted()->withMessageId(EventID::generate());
9193
}
9294

95+
// Emit step start event once per step
96+
if ($this->state->shouldEmitStepStart()) {
97+
$this->state->markStepStarted();
98+
99+
yield new StepStartEvent(
100+
id: EventID::generate(),
101+
timestamp: time()
102+
);
103+
}
104+
93105
// Accumulate token counts
94106
$this->state->addPromptTokens((int) data_get($data, 'prompt_eval_count', 0));
95107
$this->state->addCompletionTokens((int) data_get($data, 'eval_count', 0));
@@ -185,6 +197,13 @@ protected function processStream(Response $response, Request $request, int $dept
185197
);
186198
}
187199

200+
// Emit step finish before stream end
201+
$this->state->markStepFinished();
202+
yield new StepFinishEvent(
203+
id: EventID::generate(),
204+
timestamp: time()
205+
);
206+
188207
// Emit stream end event with usage
189208
yield new StreamEndEvent(
190209
id: EventID::generate(),
@@ -280,6 +299,13 @@ protected function handleToolCalls(
280299
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
281300
$request->addMessage(new ToolResultMessage($toolResults));
282301

302+
// Emit step finish after tool calls
303+
$this->state->markStepFinished();
304+
yield new StepFinishEvent(
305+
id: EventID::generate(),
306+
timestamp: time()
307+
);
308+
283309
// Continue streaming if within step limit
284310
$depth++;
285311
if ($depth < $request->maxSteps()) {

0 commit comments

Comments
 (0)