Skip to content

Commit 5b694df

Browse files
committed
minor #744 [Platform] Introduce stream method to RawResultInterface (chr-hertel)
This PR was merged into the main branch. Discussion ---------- [Platform] Introduce stream method to `RawResultInterface` | Q | A | ------------- | --- | Bug fix? | no | New feature? | no | Docs? | no | Issues | | License | MIT On the journey to reworked middle layer of Platform. Let's see what breaks here now. * centralizes http stream handling * increases testability (should) * removes a bit more http concerns from converters FYI `@DZunke` Commits ------- ea6d1f0 Add stream method to RawResultInterface
2 parents 453adfd + ea6d1f0 commit 5b694df

File tree

20 files changed

+121
-215
lines changed

20 files changed

+121
-215
lines changed

src/platform/src/Bridge/Anthropic/ResultConverter.php

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
use Symfony\AI\Platform\Result\ToolCall;
2323
use Symfony\AI\Platform\Result\ToolCallResult;
2424
use Symfony\AI\Platform\ResultConverterInterface;
25-
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
26-
use Symfony\Component\HttpClient\EventSourceHttpClient;
27-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
2825

2926
/**
3027
* @author Christopher Hertel <[email protected]>
@@ -47,7 +44,7 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options
4744
}
4845

4946
if ($options['stream'] ?? false) {
50-
return new StreamResult($this->convertStream($response));
47+
return new StreamResult($this->convertStream($result));
5148
}
5249

5350
$data = $result->getData();
@@ -74,15 +71,9 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options
7471
return new TextResult($data['content'][0]['text']);
7572
}
7673

77-
private function convertStream(HttpResponse $result): \Generator
74+
private function convertStream(RawResultInterface $result): \Generator
7875
{
79-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
80-
if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) {
81-
continue;
82-
}
83-
84-
$data = $chunk->getArrayData();
85-
76+
foreach ($result->getDataStream() as $data) {
8677
if ('content_block_delta' != $data['type'] || !isset($data['delta']['text'])) {
8778
continue;
8879
}

src/platform/src/Bridge/Bedrock/RawBedrockResult.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\AI\Platform\Bridge\Bedrock;
1313

1414
use AsyncAws\BedrockRuntime\Result\InvokeModelResponse;
15+
use Symfony\AI\Platform\Exception\RuntimeException;
1516
use Symfony\AI\Platform\Result\RawResultInterface;
1617

1718
/**
@@ -29,6 +30,11 @@ public function getData(): array
2930
return json_decode($this->invokeModelResponse->getBody(), true, 512, \JSON_THROW_ON_ERROR);
3031
}
3132

33+
public function getDataStream(): iterable
34+
{
35+
throw new RuntimeException('Streaming is not implemented yet.');
36+
}
37+
3238
public function getObject(): InvokeModelResponse
3339
{
3440
return $this->invokeModelResponse;

src/platform/src/Bridge/Cerebras/ResultConverter.php

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@
1313

1414
use Symfony\AI\Platform\Exception\RuntimeException;
1515
use Symfony\AI\Platform\Model as BaseModel;
16-
use Symfony\AI\Platform\Result\RawHttpResult;
1716
use Symfony\AI\Platform\Result\RawResultInterface;
1817
use Symfony\AI\Platform\Result\ResultInterface;
1918
use Symfony\AI\Platform\Result\StreamResult;
2019
use Symfony\AI\Platform\Result\TextResult;
2120
use Symfony\AI\Platform\ResultConverterInterface;
22-
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
23-
use Symfony\Component\HttpClient\EventSourceHttpClient;
24-
use Symfony\Component\HttpClient\Exception\JsonException;
25-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
2621

2722
/**
2823
* @author Junaid Farooq <[email protected]>
@@ -34,10 +29,10 @@ public function supports(BaseModel $model): bool
3429
return $model instanceof Model;
3530
}
3631

37-
public function convert(RawHttpResult|RawResultInterface $result, array $options = []): ResultInterface
32+
public function convert(RawResultInterface $result, array $options = []): ResultInterface
3833
{
3934
if ($options['stream'] ?? false) {
40-
return new StreamResult($this->convertStream($result->getObject()));
35+
return new StreamResult($this->convertStream($result));
4136
}
4237

4338
$data = $result->getData();
@@ -53,19 +48,9 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options
5348
return new TextResult($data['choices'][0]['message']['content']);
5449
}
5550

56-
private function convertStream(HttpResponse $result): \Generator
51+
private function convertStream(RawResultInterface $result): \Generator
5752
{
58-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
59-
if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) {
60-
continue;
61-
}
62-
63-
try {
64-
$data = $chunk->getArrayData();
65-
} catch (JsonException) {
66-
continue;
67-
}
68-
53+
foreach ($result->getDataStream() as $data) {
6954
if (!isset($data['choices'][0]['delta']['content'])) {
7055
continue;
7156
}

src/platform/src/Bridge/DeepSeek/ResultConverter.php

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,13 @@
1616
use Symfony\AI\Platform\Exception\RuntimeException;
1717
use Symfony\AI\Platform\Model;
1818
use Symfony\AI\Platform\Result\ChoiceResult;
19-
use Symfony\AI\Platform\Result\RawHttpResult;
2019
use Symfony\AI\Platform\Result\RawResultInterface;
2120
use Symfony\AI\Platform\Result\ResultInterface;
2221
use Symfony\AI\Platform\Result\StreamResult;
2322
use Symfony\AI\Platform\Result\TextResult;
2423
use Symfony\AI\Platform\Result\ToolCall;
2524
use Symfony\AI\Platform\Result\ToolCallResult;
2625
use Symfony\AI\Platform\ResultConverterInterface;
27-
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
28-
use Symfony\Component\HttpClient\EventSourceHttpClient;
29-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
3026

3127
/**
3228
* @author Oskar Stark <[email protected]>
@@ -38,10 +34,10 @@ public function supports(Model $model): bool
3834
return $model instanceof DeepSeek;
3935
}
4036

41-
public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface
37+
public function convert(RawResultInterface $result, array $options = []): ResultInterface
4238
{
4339
if ($options['stream'] ?? false) {
44-
return new StreamResult($this->convertStream($result->getObject()));
40+
return new StreamResult($this->convertStream($result));
4541
}
4642

4743
$data = $result->getData();
@@ -63,16 +59,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
6359
return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices);
6460
}
6561

66-
private function convertStream(HttpResponse $result): \Generator
62+
private function convertStream(RawResultInterface $result): \Generator
6763
{
6864
$toolCalls = [];
69-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
70-
if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) {
71-
continue;
72-
}
73-
74-
$data = $chunk->getArrayData();
75-
65+
foreach ($result->getDataStream() as $data) {
7666
if ($this->streamIsToolCall($data)) {
7767
$toolCalls = $this->convertStreamToToolCalls($toolCalls, $data);
7868
}

src/platform/src/Bridge/DockerModelRunner/Completions/ResultConverter.php

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
use Symfony\AI\Platform\Result\ToolCall;
2727
use Symfony\AI\Platform\Result\ToolCallResult;
2828
use Symfony\AI\Platform\ResultConverterInterface;
29-
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
30-
use Symfony\Component\HttpClient\EventSourceHttpClient;
31-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
3229

3330
/**
3431
* @author Mathieu Santostefano <[email protected]>
@@ -43,7 +40,7 @@ public function supports(Model $model): bool
4340
public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface
4441
{
4542
if ($options['stream'] ?? false) {
46-
return new StreamResult($this->convertStream($result->getObject()));
43+
return new StreamResult($this->convertStream($result));
4744
}
4845

4946
if (404 === $result->getObject()->getStatusCode()
@@ -70,16 +67,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
7067
return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices);
7168
}
7269

73-
private function convertStream(HttpResponse $result): \Generator
70+
private function convertStream(RawResultInterface $result): \Generator
7471
{
7572
$toolCalls = [];
76-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
77-
if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) {
78-
continue;
79-
}
80-
81-
$data = $chunk->getArrayData();
82-
73+
foreach ($result->getDataStream() as $data) {
8374
if ($this->streamIsToolCall($data)) {
8475
$toolCalls = $this->convertStreamToToolCalls($toolCalls, $data);
8576
}

src/platform/src/Bridge/ElevenLabs/ElevenLabsResultConverter.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\AI\Platform\Exception\RuntimeException;
1515
use Symfony\AI\Platform\Model;
1616
use Symfony\AI\Platform\Result\BinaryResult;
17+
use Symfony\AI\Platform\Result\RawHttpResult;
1718
use Symfony\AI\Platform\Result\RawResultInterface;
1819
use Symfony\AI\Platform\Result\ResultInterface;
1920
use Symfony\AI\Platform\Result\StreamResult;
@@ -37,9 +38,8 @@ public function supports(Model $model): bool
3738
return $model instanceof ElevenLabs;
3839
}
3940

40-
public function convert(RawResultInterface $result, array $options = []): ResultInterface
41+
public function convert(RawHttpResult|RawResultInterface $result, array $options = []): ResultInterface
4142
{
42-
/** @var ResponseInterface $response */
4343
$response = $result->getObject();
4444

4545
return match (true) {

src/platform/src/Bridge/Gemini/Gemini/ResultConverter.php

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
use Symfony\AI\Platform\Result\ToolCall;
2626
use Symfony\AI\Platform\Result\ToolCallResult;
2727
use Symfony\AI\Platform\ResultConverterInterface;
28-
use Symfony\Component\HttpClient\EventSourceHttpClient;
29-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
3028

3129
/**
3230
* @author Roy Garrido
@@ -51,7 +49,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
5149
}
5250

5351
if ($options['stream'] ?? false) {
54-
return new StreamResult($this->convertStream($response));
52+
return new StreamResult($this->convertStream($result));
5553
}
5654

5755
$data = $result->getData();
@@ -69,50 +67,21 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
6967
return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices);
7068
}
7169

72-
private function convertStream(HttpResponse $result): \Generator
70+
private function convertStream(RawResultInterface $result): \Generator
7371
{
74-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
75-
if ($chunk->isFirst() || $chunk->isLast()) {
72+
foreach ($result->getDataStream() as $data) {
73+
$choices = array_map($this->convertChoice(...), $data['candidates'] ?? []);
74+
75+
if (!$choices) {
7676
continue;
7777
}
7878

79-
$jsonDelta = trim($chunk->getContent());
80-
81-
// Remove leading/trailing brackets
82-
if (str_starts_with($jsonDelta, '[') || str_starts_with($jsonDelta, ',')) {
83-
$jsonDelta = substr($jsonDelta, 1);
84-
}
85-
if (str_ends_with($jsonDelta, ']')) {
86-
$jsonDelta = substr($jsonDelta, 0, -1);
79+
if (1 !== \count($choices)) {
80+
yield new ChoiceResult(...$choices);
81+
continue;
8782
}
8883

89-
// Split in case of multiple JSON objects
90-
$deltas = explode(",\r\n", $jsonDelta);
91-
92-
foreach ($deltas as $delta) {
93-
if ('' === $delta) {
94-
continue;
95-
}
96-
97-
try {
98-
$data = json_decode($delta, true, 512, \JSON_THROW_ON_ERROR);
99-
} catch (\JsonException $e) {
100-
throw new RuntimeException('Failed to decode JSON response.', previous: $e);
101-
}
102-
103-
$choices = array_map($this->convertChoice(...), $data['candidates'] ?? []);
104-
105-
if (!$choices) {
106-
continue;
107-
}
108-
109-
if (1 !== \count($choices)) {
110-
yield new ChoiceResult(...$choices);
111-
continue;
112-
}
113-
114-
yield $choices[0]->getContent();
115-
}
84+
yield $choices[0]->getContent();
11685
}
11786
}
11887

src/platform/src/Bridge/Mistral/Llm/ResultConverter.php

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
use Symfony\AI\Platform\Result\ToolCall;
2424
use Symfony\AI\Platform\Result\ToolCallResult;
2525
use Symfony\AI\Platform\ResultConverterInterface;
26-
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
27-
use Symfony\Component\HttpClient\EventSourceHttpClient;
28-
use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse;
2926

3027
/**
3128
* @author Christopher Hertel <[email protected]>
@@ -45,7 +42,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
4542
$httpResponse = $result->getObject();
4643

4744
if ($options['stream'] ?? false) {
48-
return new StreamResult($this->convertStream($httpResponse));
45+
return new StreamResult($this->convertStream($result));
4946
}
5047

5148
if (200 !== $code = $httpResponse->getStatusCode()) {
@@ -63,16 +60,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options
6360
return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices);
6461
}
6562

66-
private function convertStream(HttpResponse $result): \Generator
63+
private function convertStream(RawResultInterface $result): \Generator
6764
{
6865
$toolCalls = [];
69-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
70-
if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) {
71-
continue;
72-
}
73-
74-
$data = $chunk->getArrayData();
75-
66+
foreach ($result->getDataStream() as $data) {
7667
if ($this->streamIsToolCall($data)) {
7768
$toolCalls = $this->convertStreamToToolCalls($toolCalls, $data);
7869
}

src/platform/src/Bridge/Ollama/OllamaResultConverter.php

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@
2222
use Symfony\AI\Platform\Result\VectorResult;
2323
use Symfony\AI\Platform\ResultConverterInterface;
2424
use Symfony\AI\Platform\Vector\Vector;
25-
use Symfony\Component\HttpClient\Chunk\FirstChunk;
26-
use Symfony\Component\HttpClient\Chunk\LastChunk;
27-
use Symfony\Component\HttpClient\EventSourceHttpClient;
28-
use Symfony\Contracts\HttpClient\ResponseInterface;
2925

3026
/**
3127
* @author Christopher Hertel <[email protected]>
@@ -40,7 +36,7 @@ public function supports(Model $model): bool
4036
public function convert(RawResultInterface $result, array $options = []): ResultInterface
4137
{
4238
if ($options['stream'] ?? false) {
43-
return new StreamResult($this->convertStream($result->getObject()));
39+
return new StreamResult($this->convertStream($result));
4440
}
4541

4642
$data = $result->getData();
@@ -93,20 +89,10 @@ public function doConvertEmbeddings(array $data): ResultInterface
9389
);
9490
}
9591

96-
private function convertStream(ResponseInterface $result): \Generator
92+
private function convertStream(RawResultInterface $result): \Generator
9793
{
9894
$toolCalls = [];
99-
foreach ((new EventSourceHttpClient())->stream($result) as $chunk) {
100-
if ($chunk instanceof FirstChunk || $chunk instanceof LastChunk) {
101-
continue;
102-
}
103-
104-
try {
105-
$data = json_decode($chunk->getContent(), true, 512, \JSON_THROW_ON_ERROR);
106-
} catch (\JsonException $e) {
107-
throw new RuntimeException('Failed to decode JSON: '.$e->getMessage());
108-
}
109-
95+
foreach ($result->getDataStream() as $data) {
11096
if ($this->streamIsToolCall($data)) {
11197
$toolCalls = $this->convertStreamToToolCalls($toolCalls, $data);
11298
}

0 commit comments

Comments
 (0)