@@ -233,15 +233,15 @@ abstract protected static function perform(ClientState $multi, array &$responses
233
233
*/
234
234
abstract protected static function select (ClientState $ multi , float $ timeout ): int ;
235
235
236
- private static function initialize (self $ response ): void
236
+ private static function initialize (self $ response, float $ timeout = null ): void
237
237
{
238
238
if (null !== $ response ->info ['error ' ]) {
239
239
throw new TransportException ($ response ->info ['error ' ]);
240
240
}
241
241
242
242
try {
243
- if (($ response ->initializer )($ response )) {
244
- foreach (self ::stream ([$ response ]) as $ chunk ) {
243
+ if (($ response ->initializer )($ response, $ timeout )) {
244
+ foreach (self ::stream ([$ response ], $ timeout ) as $ chunk ) {
245
245
if ($ chunk ->isFirst ()) {
246
246
break ;
247
247
}
@@ -304,7 +304,7 @@ private function doDestruct()
304
304
$ this ->shouldBuffer = true ;
305
305
306
306
if ($ this ->initializer && null === $ this ->info ['error ' ]) {
307
- self ::initialize ($ this );
307
+ self ::initialize ($ this , - 0.0 );
308
308
$ this ->checkStatusCode ();
309
309
}
310
310
}
@@ -325,6 +325,12 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
325
325
$ lastActivity = microtime (true );
326
326
$ elapsedTimeout = 0 ;
327
327
328
+ if ($ fromLastTimeout = 0.0 === $ timeout && '-0 ' === (string ) $ timeout ) {
329
+ $ timeout = null ;
330
+ } elseif ($ fromLastTimeout = 0 > $ timeout ) {
331
+ $ timeout = -$ timeout ;
332
+ }
333
+
328
334
while (true ) {
329
335
$ hasActivity = false ;
330
336
$ timeoutMax = 0 ;
@@ -340,13 +346,18 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
340
346
$ timeoutMin = min ($ timeoutMin , $ response ->timeout , 1 );
341
347
$ chunk = false ;
342
348
349
+ if ($ fromLastTimeout && null !== $ multi ->lastTimeout ) {
350
+ $ elapsedTimeout = microtime (true ) - $ multi ->lastTimeout ;
351
+ }
352
+
343
353
if (isset ($ multi ->handlesActivity [$ j ])) {
344
- // no-op
354
+ $ multi -> lastTimeout = null ;
345
355
} elseif (!isset ($ multi ->openHandles [$ j ])) {
346
356
unset($ responses [$ j ]);
347
357
continue ;
348
358
} elseif ($ elapsedTimeout >= $ timeoutMax ) {
349
359
$ multi ->handlesActivity [$ j ] = [new ErrorChunk ($ response ->offset , sprintf ('Idle timeout reached for "%s". ' , $ response ->getInfo ('url ' )))];
360
+ $ multi ->lastTimeout ?? $ multi ->lastTimeout = $ lastActivity ;
350
361
} else {
351
362
continue ;
352
363
}
0 commit comments