12
12
namespace Symfony \Component \HttpClient \Response ;
13
13
14
14
use Symfony \Component \HttpClient \Chunk \ErrorChunk ;
15
+ use Symfony \Component \HttpClient \Chunk \FirstChunk ;
15
16
use Symfony \Component \HttpClient \Chunk \LastChunk ;
16
17
use Symfony \Component \HttpClient \Exception \TransportException ;
17
18
use Symfony \Contracts \HttpClient \ChunkInterface ;
@@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
34
35
private $ response ;
35
36
private $ info = ['canceled ' => false ];
36
37
private $ passthru ;
38
+ private $ stream ;
37
39
private $ lastYielded = false ;
38
40
39
41
/**
@@ -226,6 +228,19 @@ public static function stream(iterable $responses, float $timeout = null, string
226
228
227
229
$ asyncMap [$ r ->response ] = $ r ;
228
230
$ wrappedResponses [] = $ r ->response ;
231
+
232
+ if ($ r ->stream ) {
233
+ yield from self ::passthruStream ($ response = $ r ->response , $ r , new FirstChunk (), $ asyncMap );
234
+
235
+ if (!isset ($ asyncMap [$ response ])) {
236
+ array_pop ($ wrappedResponses );
237
+ }
238
+
239
+ if ($ r ->response !== $ response && !isset ($ asyncMap [$ r ->response ])) {
240
+ $ asyncMap [$ r ->response ] = $ r ;
241
+ $ wrappedResponses [] = $ r ->response ;
242
+ }
243
+ }
229
244
}
230
245
231
246
if (!$ client ) {
@@ -286,6 +301,7 @@ public static function stream(iterable $responses, float $timeout = null, string
286
301
287
302
private static function passthru (HttpClientInterface $ client , self $ r , ChunkInterface $ chunk , \SplObjectStorage $ asyncMap = null ): \Generator
288
303
{
304
+ $ r ->stream = null ;
289
305
$ response = $ r ->response ;
290
306
$ context = new AsyncContext ($ r ->passthru , $ client , $ r ->response , $ r ->info , $ r ->content , $ r ->offset );
291
307
if (null === $ stream = ($ r ->passthru )($ chunk , $ context )) {
@@ -295,32 +311,39 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte
295
311
296
312
return ;
297
313
}
298
- $ chunk = null ;
299
314
300
315
if (!$ stream instanceof \Iterator) {
301
316
throw new \LogicException (sprintf ('A chunk passthru must return an "Iterator", "%s" returned. ' , get_debug_type ($ stream )));
302
317
}
318
+ $ r ->stream = $ stream ;
319
+
320
+ yield from self ::passthruStream ($ response , $ r , null , $ asyncMap );
321
+ }
303
322
323
+ private static function passthruStream (ResponseInterface $ response , self $ r , ?ChunkInterface $ chunk , ?\SplObjectStorage $ asyncMap ): \Generator
324
+ {
304
325
while (true ) {
305
326
try {
306
- if (null !== $ chunk ) {
307
- $ stream ->next ();
327
+ if (null !== $ chunk && $ r -> stream ) {
328
+ $ r -> stream ->next ();
308
329
}
309
330
310
- if (!$ stream ->valid ()) {
331
+ if (!$ r ->stream || !$ r ->stream ->valid () || !$ r ->stream ) {
332
+ $ r ->stream = null ;
311
333
break ;
312
334
}
313
335
} catch (\Throwable $ e ) {
336
+ unset($ asyncMap [$ response ]);
337
+ $ r ->stream = null ;
314
338
$ r ->info ['error ' ] = $ e ->getMessage ();
315
339
$ r ->response ->cancel ();
316
340
317
341
yield $ r => $ chunk = new ErrorChunk ($ r ->offset , $ e );
318
342
$ chunk ->didThrow () ?: $ chunk ->getContent ();
319
- unset($ asyncMap [$ response ]);
320
343
break ;
321
344
}
322
345
323
- $ chunk = $ stream ->current ();
346
+ $ chunk = $ r -> stream ->current ();
324
347
325
348
if (!$ chunk instanceof ChunkInterface) {
326
349
throw new \LogicException (sprintf ('A chunk passthru must yield instances of "%s", "%s" yielded. ' , ChunkInterface::class, get_debug_type ($ chunk )));
@@ -356,6 +379,12 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte
356
379
}
357
380
}
358
381
382
+ if (null !== $ chunk ->getError () || $ chunk ->isLast ()) {
383
+ $ stream = $ r ->stream ;
384
+ $ r ->stream = null ;
385
+ unset($ asyncMap [$ response ]);
386
+ }
387
+
359
388
if (null === $ chunk ->getError ()) {
360
389
$ r ->offset += \strlen ($ content );
361
390
@@ -387,7 +416,6 @@ private static function passthru(HttpClientInterface $client, self $r, ChunkInte
387
416
$ chunk ->didThrow () ?: $ chunk ->getContent ();
388
417
}
389
418
390
- unset($ asyncMap [$ response ]);
391
419
break ;
392
420
}
393
421
}
0 commit comments