Skip to content

Commit 77373c4

Browse files
authored
Add multiplexing method (#417)
* Add multiplexing method * Don't throw exceptions * Rename resolveAll into multiplex * Fix lowest test * Fix early return * Add a parameter to download body * Catch exception for each response individually * Throw on __destruct * Apply suggestions * Better handle of recurcion in exception * Fix CS * Rename into wait
1 parent c9b514e commit 77373c4

File tree

5 files changed

+319
-31
lines changed

5 files changed

+319
-31
lines changed

src/Exception/Http/HttpExceptionTrait.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,15 @@ public function __construct(ResponseInterface $response)
5353
$this->parseJson($body);
5454
} else {
5555
try {
56-
$xml = new \SimpleXMLElement($content);
56+
set_error_handler(static function ($errno, $errstr, $errfile, $errline) {
57+
throw new \RuntimeException($errstr, $errno);
58+
});
59+
60+
try {
61+
$xml = new \SimpleXMLElement($content);
62+
} finally {
63+
restore_error_handler();
64+
}
5765
$this->parseXml($xml);
5866
} catch (\Throwable $e) {
5967
// Not XML ¯\_(ツ)_/¯

src/Response.php

Lines changed: 201 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44

55
namespace AsyncAws\Core;
66

7+
use AsyncAws\Core\Exception\Exception;
78
use AsyncAws\Core\Exception\Http\ClientException;
89
use AsyncAws\Core\Exception\Http\HttpException;
910
use AsyncAws\Core\Exception\Http\NetworkException;
1011
use AsyncAws\Core\Exception\Http\RedirectionException;
1112
use AsyncAws\Core\Exception\Http\ServerException;
13+
use AsyncAws\Core\Exception\InvalidArgument;
14+
use AsyncAws\Core\Exception\LogicException;
1215
use AsyncAws\Core\Exception\RuntimeException;
1316
use AsyncAws\Core\Stream\ResponseBodyResourceStream;
1417
use AsyncAws\Core\Stream\ResponseBodyStream;
1518
use AsyncAws\Core\Stream\ResultStream;
19+
use Symfony\Component\HttpClient\Exception\TransportException;
1620
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
1721
use Symfony\Contracts\HttpClient\HttpClientInterface;
1822
use Symfony\Contracts\HttpClient\ResponseInterface;
@@ -32,12 +36,31 @@ class Response
3236

3337
/**
3438
* A Result can be resolved many times. This variable contains the last resolve result.
35-
* Null means that the result has never been resolved.
39+
* Null means that the result has never been resolved. Array contains material to create an exception.
3640
*
37-
* @var bool|NetworkException|HttpException|null
41+
* @var bool|HttpException|NetworkException|array|null
3842
*/
3943
private $resolveResult;
4044

45+
/**
46+
* A flag that indicated that the body have been downloaded.
47+
*
48+
* @var bool
49+
*/
50+
private $bodyDownloaded = false;
51+
52+
/**
53+
* A flag that indicated that the body started being downloaded.
54+
*
55+
* @var bool
56+
*/
57+
private $streamStarted = false;
58+
59+
/**
60+
* A flag that indicated that an exception has been thrown to the user.
61+
*/
62+
private $didThrow = false;
63+
4164
public function __construct(ResponseInterface $response, HttpClientInterface $httpClient)
4265
{
4366
$this->httpResponse = $response;
@@ -46,7 +69,7 @@ public function __construct(ResponseInterface $response, HttpClientInterface $ht
4669

4770
public function __destruct()
4871
{
49-
if (null === $this->resolveResult) {
72+
if (null === $this->resolveResult || !$this->didThrow) {
5073
$this->resolve();
5174
}
5275
}
@@ -65,15 +88,7 @@ public function __destruct()
6588
public function resolve(?float $timeout = null): bool
6689
{
6790
if (null !== $this->resolveResult) {
68-
if ($this->resolveResult instanceof \Exception) {
69-
throw $this->resolveResult;
70-
}
71-
72-
if (\is_bool($this->resolveResult)) {
73-
return $this->resolveResult;
74-
}
75-
76-
throw new RuntimeException('Unexpected resolve state');
91+
return $this->getResolveStatus();
7792
}
7893

7994
try {
@@ -86,39 +101,132 @@ public function resolve(?float $timeout = null): bool
86101
}
87102
}
88103

89-
$statusCode = $this->httpResponse->getStatusCode();
104+
$this->defineResolveStatus();
90105
} catch (TransportExceptionInterface $e) {
91-
throw $this->resolveResult = new NetworkException('Could not contact remote server.', 0, $e);
106+
$this->resolveResult = new NetworkException('Could not contact remote server.', 0, $e);
92107
}
93108

94-
if (500 <= $statusCode) {
95-
throw $this->resolveResult = new ServerException($this->httpResponse);
109+
return $this->getResolveStatus();
110+
}
111+
112+
/**
113+
* Make sure all provided requests are executed.
114+
*
115+
* @param self[] $responses
116+
* @param float|null $timeout Duration in seconds before aborting. When null wait
117+
* until the end of execution. Using 0 means non-blocking
118+
* @param bool $downloadBody Wait until receiving the entire response body or only the first bytes
119+
*
120+
* @return iterable<self>
121+
*
122+
* @throws NetworkException
123+
* @throws HttpException
124+
*/
125+
final public static function wait(iterable $responses, float $timeout = null, bool $downloadBody = false): iterable
126+
{
127+
/** @var self[] $responseMap */
128+
$responseMap = [];
129+
$indexMap = [];
130+
$httpResponses = [];
131+
$httpClient = null;
132+
foreach ($responses as $index => $response) {
133+
if (null !== $response->resolveResult && (true !== $response->resolveResult || !$downloadBody || $response->bodyDownloaded)) {
134+
yield $index => $response;
135+
136+
continue;
137+
}
138+
139+
if (null === $httpClient) {
140+
$httpClient = $response->httpClient;
141+
} elseif ($httpClient !== $response->httpClient) {
142+
throw new LogicException('Unable to wait for the given results, they all have to be created with the same HttpClient');
143+
}
144+
$httpResponses[] = $response->httpResponse;
145+
$indexMap[$hash = \spl_object_id($response->httpResponse)] = $index;
146+
$responseMap[$hash] = $response;
96147
}
97148

98-
if (400 <= $statusCode) {
99-
throw $this->resolveResult = new ClientException($this->httpResponse);
149+
// no response provided (or all responses already resolved)
150+
if (empty($httpResponses)) {
151+
return;
100152
}
101153

102-
if (300 <= $statusCode) {
103-
throw $this->resolveResult = new RedirectionException($this->httpResponse);
154+
if (null === $httpClient) {
155+
throw new InvalidArgument('At least one response should have contain an Http Client');
104156
}
105157

106-
return $this->resolveResult = true;
158+
foreach ($httpClient->stream($httpResponses, $timeout) as $httpResponse => $chunk) {
159+
$hash = \spl_object_id($httpResponse);
160+
$response = $responseMap[$hash] ?? null;
161+
// Check if null, just in case symfony yield an unexpected response.
162+
if (null === $response) {
163+
continue;
164+
}
165+
166+
// index could be null if already yield
167+
$index = $indexMap[$hash] ?? null;
168+
169+
try {
170+
if ($chunk->isTimeout()) {
171+
// Receiving a timeout mean all responses are inactive.
172+
break;
173+
}
174+
} catch (TransportException $e) {
175+
// Exception is stored as an array, because storing an instance of \Exception will create a circular
176+
// reference and prevent `__destruct` beeing called.
177+
$response->resolveResult = [NetworkException::class, ['Could not contact remote server.', 0, $e]];
178+
179+
if (null !== $index) {
180+
unset($indexMap[$hash]);
181+
yield $index => $response;
182+
if (empty($indexMap)) {
183+
// early exit if all statusCode are known. We don't have to wait for all responses
184+
return;
185+
}
186+
}
187+
}
188+
189+
if (!$response->streamStarted && '' !== $chunk->getContent()) {
190+
$response->streamStarted = true;
191+
}
192+
193+
if ($chunk->isLast()) {
194+
$response->bodyDownloaded = true;
195+
if (null !== $index && $downloadBody) {
196+
unset($indexMap[$hash]);
197+
yield $index => $response;
198+
}
199+
}
200+
if ($chunk->isFirst()) {
201+
$response->defineResolveStatus();
202+
if (null !== $index && !$downloadBody) {
203+
unset($indexMap[$hash]);
204+
yield $index => $response;
205+
}
206+
}
207+
208+
if (empty($indexMap)) {
209+
// early exit if all statusCode are known. We don't have to wait for all responses
210+
return;
211+
}
212+
}
107213
}
108214

109215
/**
110216
* Returns info on the current request.
111217
*
112218
* @return array{
113219
* resolved: bool,
114-
* response?: ?ResponseInterface,
115-
* status?: int
220+
* body_downloaded: bool,
221+
* response: \Symfony\Contracts\HttpClient\ResponseInterface,
222+
* status: int,
116223
* }
117224
*/
118225
public function info(): array
119226
{
120227
return [
121228
'resolved' => null !== $this->resolveResult,
229+
'body_downloaded' => $this->bodyDownloaded,
122230
'response' => $this->httpResponse,
123231
'status' => (int) $this->httpResponse->getInfo('http_code'),
124232
];
@@ -149,7 +257,11 @@ public function getContent(): string
149257
{
150258
$this->resolve();
151259

152-
return $this->httpResponse->getContent(false);
260+
try {
261+
return $this->httpResponse->getContent(false);
262+
} finally {
263+
$this->bodyDownloaded = true;
264+
}
153265
}
154266

155267
/**
@@ -160,7 +272,11 @@ public function toArray(): array
160272
{
161273
$this->resolve();
162274

163-
return $this->httpResponse->toArray(false);
275+
try {
276+
return $this->httpResponse->toArray(false);
277+
} finally {
278+
$this->bodyDownloaded = true;
279+
}
164280
}
165281

166282
public function getStatusCode(): int
@@ -180,6 +296,65 @@ public function toStream(): ResultStream
180296
return new ResponseBodyResourceStream($this->httpResponse->toStream());
181297
}
182298

183-
return new ResponseBodyStream($this->httpClient->stream($this->httpResponse));
299+
if ($this->streamStarted) {
300+
throw new RuntimeException('Can not create a ResultStream because the body started being downloaded. The body was started to be downloaded in Response::wait()');
301+
}
302+
303+
try {
304+
return new ResponseBodyStream($this->httpClient->stream($this->httpResponse));
305+
} finally {
306+
$this->bodyDownloaded = true;
307+
}
308+
}
309+
310+
private function defineResolveStatus(): void
311+
{
312+
try {
313+
$statusCode = $this->httpResponse->getStatusCode();
314+
} catch (TransportExceptionInterface $e) {
315+
$this->resolveResult = [NetworkException::class, ['Could not contact remote server.', 0, $e]];
316+
317+
return;
318+
}
319+
320+
if (500 <= $statusCode) {
321+
$this->resolveResult = [ServerException::class, [$this->httpResponse]];
322+
323+
return;
324+
}
325+
326+
if (400 <= $statusCode) {
327+
$this->resolveResult = [ClientException::class, [$this->httpResponse]];
328+
329+
return;
330+
}
331+
332+
if (300 <= $statusCode) {
333+
$this->resolveResult = [RedirectionException::class, [$this->httpResponse]];
334+
335+
return;
336+
}
337+
338+
$this->resolveResult = true;
339+
}
340+
341+
private function getResolveStatus(): bool
342+
{
343+
if (\is_bool($this->resolveResult)) {
344+
return $this->resolveResult;
345+
}
346+
347+
if (\is_array($this->resolveResult)) {
348+
[$class, $args] = $this->resolveResult;
349+
/** @psalm-suppress PropertyTypeCoercion */
350+
$this->resolveResult = new $class(...$args);
351+
}
352+
if ($this->resolveResult instanceof Exception) {
353+
$this->didThrow = true;
354+
355+
throw $this->resolveResult;
356+
}
357+
358+
throw new RuntimeException('Unexpected resolve state');
184359
}
185360
}

src/Result.php

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,43 @@ final public function resolve(?float $timeout = null): bool
6262
return $this->response->resolve($timeout);
6363
}
6464

65+
/**
66+
* Make sure all provided requests are executed.
67+
* This only work if the http responses are produced by the same HTTP client.
68+
* See https://symfony.com/doc/current/components/http_client.html#multiplexing-responses.
69+
*
70+
* @param self[] $results
71+
* @param float|null $timeout Duration in seconds before aborting. When null wait
72+
* until the end of execution. Using 0 means non-blocking
73+
* @param bool $downloadBody Wait until receiving the entire response body or only the first bytes
74+
*
75+
* @return iterable<self>
76+
*
77+
* @throws NetworkException
78+
* @throws HttpException
79+
*/
80+
final public static function wait(iterable $results, float $timeout = null, bool $downloadBody = false): iterable
81+
{
82+
$resultMap = [];
83+
$responses = [];
84+
foreach ($results as $index => $result) {
85+
$responses[$index] = $result->response;
86+
$resultMap[$index] = $result;
87+
}
88+
89+
foreach (Response::wait($responses, $timeout, $downloadBody) as $index => $response) {
90+
yield $index => $resultMap[$index];
91+
}
92+
}
93+
6594
/**
6695
* Returns info on the current request.
6796
*
6897
* @return array{
6998
* resolved: bool,
70-
* response?: ?\Symfony\Contracts\HttpClient\ResponseInterface,
71-
* status?: int
99+
* body_downloaded: bool,
100+
* response: \Symfony\Contracts\HttpClient\ResponseInterface,
101+
* status: int,
72102
* }
73103
*/
74104
final public function info(): array

src/Waiter.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,9 @@ final public function resolve(?float $timeout = null): bool
140140
*
141141
* @return array{
142142
* resolved: bool,
143-
* response?: ?\Symfony\Contracts\HttpClient\ResponseInterface,
144-
* status?: int
143+
* body_downloaded: bool,
144+
* response: \Symfony\Contracts\HttpClient\ResponseInterface,
145+
* status: int,
145146
* }
146147
*/
147148
final public function info(): array

0 commit comments

Comments
 (0)