Skip to content

Commit 2963562

Browse files
authored
Merge pull request #23 from roadrunner-php/feature-proto-payloads
Proto payloads between RR and PHP Worker
2 parents 0bd1ef8 + 8b35fd9 commit 2963562

File tree

6 files changed

+425
-29
lines changed

6 files changed

+425
-29
lines changed

composer.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@
4141
"ext-json": "*",
4242
"psr/http-factory": "^1.0.1",
4343
"psr/http-message": "^1.0.1 || ^2.0",
44-
"spiral/roadrunner": "^2023.3",
45-
"spiral/roadrunner-worker": "^3.1.0"
44+
"spiral/roadrunner": "^2023.3 || ^2024.1",
45+
"spiral/roadrunner-worker": "^3.5",
46+
"roadrunner-php/roadrunner-api-dto": "^1.6",
47+
"symfony/polyfill-php83": "^1.29"
4648
},
4749
"require-dev": {
48-
"buggregator/trap": "^1.0",
4950
"jetbrains/phpstorm-attributes": "^1.0",
5051
"nyholm/psr7": "^1.3",
5152
"phpunit/phpunit": "^10.0",
@@ -72,7 +73,8 @@
7273
"analyze": "psalm"
7374
},
7475
"suggest": {
75-
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools"
76+
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools",
77+
"ext-protobuf": "Provides Protocol Buffers support. Without it, performance will be lower."
7678
},
7779
"config": {
7880
"sort-packages": true

src/HttpWorker.php

Lines changed: 115 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
namespace Spiral\RoadRunner\Http;
66

77
use Generator;
8+
use RoadRunner\HTTP\DTO\V1\HeaderValue;
9+
use RoadRunner\HTTP\DTO\V1\Request as RequestProto;
10+
use RoadRunner\HTTP\DTO\V1\Response;
11+
use Spiral\Goridge\Frame;
812
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
913
use Spiral\RoadRunner\Message\Command\StreamStop;
1014
use Spiral\RoadRunner\Payload;
@@ -34,6 +38,8 @@
3438
*/
3539
class HttpWorker implements HttpWorkerInterface
3640
{
41+
private static ?int $codec = null;
42+
3743
public function __construct(
3844
private readonly WorkerInterface $worker,
3945
) {
@@ -56,13 +62,25 @@ public function waitRequest(): ?Request
5662
return null;
5763
}
5864

65+
if (static::$codec === null) {
66+
static::$codec = \json_validate($payload->header) ? Frame::CODEC_JSON : Frame::CODEC_PROTO;
67+
}
68+
69+
if (static::$codec === Frame::CODEC_PROTO) {
70+
$message = new RequestProto();
71+
$message->mergeFromString($payload->header);
72+
73+
return $this->requestFromProto($payload->body, $message);
74+
}
75+
5976
/** @var RequestContext $context */
6077
$context = \json_decode($payload->header, true, 512, \JSON_THROW_ON_ERROR);
6178

62-
return $this->createRequest($payload->body, $context);
79+
return $this->arrayToRequest($payload->body, $context);
6380
}
6481

6582
/**
83+
* @param array<array-key, array<array-key, string>> $headers
6684
* @throws \JsonException
6785
*/
6886
public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void
@@ -76,21 +94,15 @@ public function respond(int $status, string|Generator $body = '', array $headers
7694
return;
7795
}
7896

79-
$head = \json_encode([
80-
'status' => $status,
81-
'headers' => $headers ?: (object)[],
82-
], \JSON_THROW_ON_ERROR);
83-
84-
$this->worker->respond(new Payload($body, $head, $endOfStream));
97+
/** @psalm-suppress TooManyArguments */
98+
$this->worker->respond($this->createRespondPayload($status, $body, $headers, $endOfStream), static::$codec);
8599
}
86100

101+
/**
102+
* @param array<array-key, array<array-key, string>> $headers
103+
*/
87104
private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void
88105
{
89-
$head = \json_encode([
90-
'status' => $status,
91-
'headers' => $headers ?: (object)[],
92-
], \JSON_THROW_ON_ERROR);
93-
94106
$worker = $this->worker instanceof StreamWorkerInterface
95107
? $this->worker->withStreamMode()
96108
: $this->worker;
@@ -103,7 +115,11 @@ private function respondStream(int $status, Generator $body, array $headers = []
103115
// We don't need to send an empty frame if the stream is not ended
104116
return;
105117
}
106-
$worker->respond(new Payload($content, $head, $endOfStream));
118+
/** @psalm-suppress TooManyArguments */
119+
$worker->respond(
120+
$this->createRespondPayload($status, $content, $headers, $endOfStream),
121+
static::$codec
122+
);
107123
break;
108124
}
109125

@@ -117,9 +133,11 @@ private function respondStream(int $status, Generator $body, array $headers = []
117133
return;
118134
}
119135

120-
// Send a chunk of data
121-
$worker->respond(new Payload($content, $head, false));
122-
$head = null;
136+
/**
137+
* Send a chunk of data
138+
* @psalm-suppress TooManyArguments
139+
*/
140+
$worker->respond($this->createRespondPayload($status, $content, $headers, false), static::$codec);
123141

124142
try {
125143
$body->next();
@@ -134,7 +152,7 @@ private function respondStream(int $status, Generator $body, array $headers = []
134152
/**
135153
* @param RequestContext $context
136154
*/
137-
private function createRequest(string $body, array $context): Request
155+
private function arrayToRequest(string $body, array $context): Request
138156
{
139157
\parse_str($context['rawQuery'], $query);
140158
return new Request(
@@ -154,6 +172,37 @@ private function createRequest(string $body, array $context): Request
154172
);
155173
}
156174

175+
private function requestFromProto(string $body, RequestProto $message): Request
176+
{
177+
/** @var UploadedFilesList $uploads */
178+
$uploads = \json_decode($message->getUploads(), true) ?? [];
179+
$headers = $this->headerValueToArray($message->getHeader());
180+
181+
\parse_str($message->getRawQuery(), $query);
182+
/** @psalm-suppress ArgumentTypeCoercion, MixedArgumentTypeCoercion */
183+
return new Request(
184+
remoteAddr: $message->getRemoteAddr(),
185+
protocol: $message->getProtocol(),
186+
method: $message->getMethod(),
187+
uri: $message->getUri(),
188+
headers: $this->filterHeaders($headers),
189+
cookies: \array_map(
190+
static fn(array $values) => \implode(',', $values),
191+
$this->headerValueToArray($message->getCookies()),
192+
),
193+
uploads: $uploads,
194+
attributes: [
195+
Request::PARSED_BODY_ATTRIBUTE_NAME => $message->getParsed(),
196+
] + \array_map(
197+
static fn(array $values) => \array_shift($values),
198+
$this->headerValueToArray($message->getAttributes()),
199+
),
200+
query: $query,
201+
body: $message->getParsed() && empty($body) ? '{}' : $body,
202+
parsed: $message->getParsed(),
203+
);
204+
}
205+
157206
/**
158207
* Remove all non-string and empty-string keys
159208
*
@@ -164,7 +213,7 @@ private function filterHeaders(array $headers): array
164213
{
165214
foreach ($headers as $key => $_) {
166215
if (!\is_string($key) || $key === '') {
167-
// ignore invalid header names or values (otherwise, the worker will be crashed)
216+
// ignore invalid header names or values (otherwise, the worker might be crashed)
168217
// @see: <https://git.io/JzjgJ>
169218
unset($headers[$key]);
170219
}
@@ -173,4 +222,52 @@ private function filterHeaders(array $headers): array
173222
/** @var HeadersList $headers */
174223
return $headers;
175224
}
225+
226+
/**
227+
* @param \Traversable<non-empty-string, HeaderValue> $message
228+
*/
229+
private function headerValueToArray(\Traversable $message): array
230+
{
231+
$result = [];
232+
/**
233+
* @var non-empty-string $key
234+
* @var HeaderValue $value
235+
*/
236+
foreach ($message as $key => $value) {
237+
$result[$key] = \iterator_to_array($value->getValue());
238+
}
239+
240+
return $result;
241+
}
242+
243+
/**
244+
* @param array<array-key, array<array-key, string>> $headers
245+
* @return array<non-empty-string, HeaderValue>
246+
*/
247+
private function arrayToHeaderValue(array $headers = []): array
248+
{
249+
$result = [];
250+
/**
251+
* @var non-empty-string $key
252+
* @var array<array-key, string> $value
253+
*/
254+
foreach ($headers as $key => $value) {
255+
$result[$key] = new HeaderValue(['value' => $value]);
256+
}
257+
258+
return $result;
259+
}
260+
261+
/**
262+
* @param array<array-key, array<array-key, string>> $headers
263+
*/
264+
private function createRespondPayload(int $status, string $body, array $headers = [], bool $eos = true): Payload
265+
{
266+
$head = static::$codec === Frame::CODEC_PROTO
267+
? (new Response(['status' => $status, 'headers' => $this->arrayToHeaderValue($headers)]))
268+
->serializeToString()
269+
: \json_encode(['status' => $status, 'headers' => $headers ?: (object)[]], \JSON_THROW_ON_ERROR);
270+
271+
return new Payload(body: $body, header: $head, eos: $eos);
272+
}
176273
}

src/HttpWorkerInterface.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ public function waitRequest(): ?Request;
2626
* If the body is a generator, then each yielded value will be sent as a separated stream chunk.
2727
* Returned value will be sent as a last stream package.
2828
* Note: Stream response is supported by RoadRunner since version 2023.3
29-
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
30-
* and each value MUST be an array of strings for that header.
29+
* @param HeadersList|array<array-key, array<array-key, string>> $headers $headers An associative array of the
30+
* message's headers. Each key MUST be a header name, and each value MUST be an array of strings for
31+
* that header.
3132
*/
3233
public function respond(int $status, string|Generator $body, array $headers = []): void;
3334
}

0 commit comments

Comments
 (0)