@@ -92,7 +92,7 @@ abstract protected static function schedule(self $response, array &$runningRespo
9292 /**
9393 * Performs all pending non-blocking operations.
9494 */
95- abstract protected static function perform (ClientState $ multi , array & $ responses ): void ;
95+ abstract protected static function perform (ClientState $ multi , array $ responses ): void ;
9696
9797 /**
9898 * Waits for network activity.
@@ -150,10 +150,15 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
150150 $ lastActivity = hrtime (true ) / 1E9 ;
151151 $ elapsedTimeout = 0 ;
152152
153- if ($ fromLastTimeout = 0.0 === $ timeout && '-0 ' === (string ) $ timeout ) {
154- $ timeout = null ;
155- } elseif ($ fromLastTimeout = 0 > $ timeout ) {
156- $ timeout = -$ timeout ;
153+ if ((0.0 === $ timeout && '-0 ' === (string ) $ timeout ) || 0 > $ timeout ) {
154+ $ timeout = $ timeout ? -$ timeout : null ;
155+
156+ /** @var ClientState $multi */
157+ foreach ($ runningResponses as [$ multi ]) {
158+ if (null !== $ multi ->lastTimeout ) {
159+ $ elapsedTimeout = max ($ elapsedTimeout , $ lastActivity - $ multi ->lastTimeout );
160+ }
161+ }
157162 }
158163
159164 while (true ) {
@@ -162,35 +167,33 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
162167 $ timeoutMin = $ timeout ?? \INF ;
163168
164169 /** @var ClientState $multi */
165- foreach ($ runningResponses as $ i => [$ multi ]) {
166- $ responses = &$ runningResponses [$ i ][1 ];
170+ foreach ($ runningResponses as $ i => [$ multi , &$ responses ]) {
167171 self ::perform ($ multi , $ responses );
168172
169173 foreach ($ responses as $ j => $ response ) {
170174 $ timeoutMax = $ timeout ?? max ($ timeoutMax , $ response ->timeout );
171175 $ timeoutMin = min ($ timeoutMin , $ response ->timeout , 1 );
172176 $ chunk = false ;
173177
174- if ($ fromLastTimeout && null !== $ multi ->lastTimeout ) {
175- $ elapsedTimeout = hrtime (true ) / 1E9 - $ multi ->lastTimeout ;
176- }
177-
178178 if (isset ($ multi ->handlesActivity [$ j ])) {
179179 $ multi ->lastTimeout = null ;
180+ $ elapsedTimeout = 0 ;
180181 } elseif (!isset ($ multi ->openHandles [$ j ])) {
182+ $ hasActivity = true ;
181183 unset($ responses [$ j ]);
182184 continue ;
183185 } elseif ($ elapsedTimeout >= $ timeoutMax ) {
184186 $ multi ->handlesActivity [$ j ] = [new ErrorChunk ($ response ->offset , sprintf ('Idle timeout reached for "%s". ' , $ response ->getInfo ('url ' )))];
185187 $ multi ->lastTimeout ??= $ lastActivity ;
188+ $ elapsedTimeout = $ timeoutMax ;
186189 } else {
187190 continue ;
188191 }
189192
190- while ($ multi ->handlesActivity [$ j ] ?? false ) {
191- $ hasActivity = true ;
192- $ elapsedTimeout = 0 ;
193+ $ lastActivity = null ;
194+ $ hasActivity = true ;
193195
196+ while ($ multi ->handlesActivity [$ j ] ?? false ) {
194197 if (\is_string ($ chunk = array_shift ($ multi ->handlesActivity [$ j ]))) {
195198 if (null !== $ response ->inflate && false === $ chunk = @inflate_add ($ response ->inflate , $ chunk )) {
196199 $ multi ->handlesActivity [$ j ] = [null , new TransportException (sprintf ('Error while processing content unencoding for "%s". ' , $ response ->getInfo ('url ' )))];
@@ -227,7 +230,6 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
227230 }
228231 } elseif ($ chunk instanceof ErrorChunk) {
229232 unset($ responses [$ j ]);
230- $ elapsedTimeout = $ timeoutMax ;
231233 } elseif ($ chunk instanceof FirstChunk) {
232234 if ($ response ->logger ) {
233235 $ info = $ response ->getInfo ();
@@ -274,10 +276,12 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
274276 if ($ chunk instanceof ErrorChunk && !$ chunk ->didThrow ()) {
275277 // Ensure transport exceptions are always thrown
276278 $ chunk ->getContent ();
279+ throw new \LogicException ('A transport exception should have been thrown. ' );
277280 }
278281 }
279282
280283 if (!$ responses ) {
284+ $ hasActivity = true ;
281285 unset($ runningResponses [$ i ]);
282286 }
283287
@@ -291,7 +295,7 @@ public static function stream(iterable $responses, ?float $timeout = null): \Gen
291295 }
292296
293297 if ($ hasActivity ) {
294- $ lastActivity = hrtime (true ) / 1E9 ;
298+ $ lastActivity ?? = hrtime (true ) / 1E9 ;
295299 continue ;
296300 }
297301
0 commit comments