3232use Psr \Http \Message \StreamInterface ;
3333use Psr \Http \Message \UriFactoryInterface ;
3434use Psr \Http \Message \UriInterface ;
35+ use Symfony \Component \HttpClient \Internal \HttplugWaitLoop ;
3536use Symfony \Component \HttpClient \Response \HttplugPromise ;
36- use Symfony \Component \HttpClient \Response \ResponseTrait ;
37- use Symfony \Component \HttpClient \Response \StreamWrapper ;
3837use Symfony \Contracts \HttpClient \Exception \TransportExceptionInterface ;
3938use Symfony \Contracts \HttpClient \HttpClientInterface ;
4039use Symfony \Contracts \HttpClient \ResponseInterface ;
@@ -60,27 +59,27 @@ final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestF
6059 private $ client ;
6160 private $ responseFactory ;
6261 private $ streamFactory ;
63- private $ promisePool = [] ;
64- private $ pendingResponse ;
62+ private $ promisePool ;
63+ private $ waitLoop ;
6564
6665 public function __construct (HttpClientInterface $ client = null , ResponseFactoryInterface $ responseFactory = null , StreamFactoryInterface $ streamFactory = null )
6766 {
6867 $ this ->client = $ client ?? HttpClient::create ();
6968 $ this ->responseFactory = $ responseFactory ;
7069 $ this ->streamFactory = $ streamFactory ?? ($ responseFactory instanceof StreamFactoryInterface ? $ responseFactory : null );
71- $ this ->promisePool = new \SplObjectStorage ();
70+ $ this ->promisePool = \function_exists ( ' GuzzleHttp\Promise\queue ' ) ? new \SplObjectStorage () : null ;
7271
73- if (null !== $ this ->responseFactory && null !== $ this ->streamFactory ) {
74- return ;
75- }
72+ if (null === $ this ->responseFactory || null === $ this ->streamFactory ) {
73+ if (!class_exists (Psr17Factory::class)) {
74+ throw new \LogicException ('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7". ' );
75+ }
7676
77- if (!class_exists (Psr17Factory::class)) {
78- throw new \LogicException ('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7". ' );
77+ $ psr17Factory = new Psr17Factory ();
78+ $ this ->responseFactory = $ this ->responseFactory ?? $ psr17Factory ;
79+ $ this ->streamFactory = $ this ->streamFactory ?? $ psr17Factory ;
7980 }
8081
81- $ psr17Factory = new Psr17Factory ();
82- $ this ->responseFactory = $ this ->responseFactory ?? $ psr17Factory ;
83- $ this ->streamFactory = $ this ->streamFactory ?? $ psr17Factory ;
82+ $ this ->waitLoop = new HttplugWaitLoop ($ this ->client , $ this ->promisePool , $ this ->responseFactory , $ this ->streamFactory );
8483 }
8584
8685 /**
@@ -89,7 +88,7 @@ public function __construct(HttpClientInterface $client = null, ResponseFactoryI
8988 public function sendRequest (RequestInterface $ request ): Psr7ResponseInterface
9089 {
9190 try {
92- return $ this ->createPsr7Response ($ this ->sendPsr7Request ($ request ));
91+ return $ this ->waitLoop -> createPsr7Response ($ this ->sendPsr7Request ($ request ));
9392 } catch (TransportExceptionInterface $ e ) {
9493 throw new NetworkException ($ e ->getMessage (), $ request , $ e );
9594 }
@@ -102,7 +101,7 @@ public function sendRequest(RequestInterface $request): Psr7ResponseInterface
102101 */
103102 public function sendAsyncRequest (RequestInterface $ request ): Promise
104103 {
105- if (!class_exists (GuzzlePromise::class) ) {
104+ if (!$ promisePool = $ this -> promisePool ) {
106105 throw new \LogicException (sprintf ('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises". ' , __METHOD__ ));
107106 }
108107
@@ -112,88 +111,30 @@ public function sendAsyncRequest(RequestInterface $request): Promise
112111 return new RejectedPromise ($ e );
113112 }
114113
115- $ cancel = function () use ($ response ) {
116- $ response ->cancel ();
117- unset($ this ->promisePool [$ response ]);
118- };
114+ $ waitLoop = $ this ->waitLoop ;
119115
120- $ promise = new GuzzlePromise (function () use ($ response ) {
121- $ this ->pendingResponse = $ response ;
122- $ this ->wait ();
123- }, $ cancel );
116+ $ promise = new GuzzlePromise (static function () use ($ response , $ waitLoop ) {
117+ $ waitLoop ->wait ($ response );
118+ }, static function () use ($ response , $ promisePool ) {
119+ $ response ->cancel ();
120+ unset($ promisePool [$ response ]);
121+ });
124122
125- $ this -> promisePool [$ response ] = [$ request , $ promise ];
123+ $ promisePool [$ response ] = [$ request , $ promise ];
126124
127- return new HttplugPromise ($ promise, $ cancel );
125+ return new HttplugPromise ($ promise );
128126 }
129127
130128 /**
131- * Resolve pending promises that complete before the timeouts are reached.
129+ * Resolves pending promises that complete before the timeouts are reached.
132130 *
133131 * When $maxDuration is null and $idleTimeout is reached, promises are rejected.
134132 *
135133 * @return int The number of remaining pending promises
136134 */
137135 public function wait (float $ maxDuration = null , float $ idleTimeout = null ): int
138136 {
139- $ pendingResponse = $ this ->pendingResponse ;
140- $ this ->pendingResponse = null ;
141-
142- if (null !== $ maxDuration ) {
143- $ startTime = microtime (true );
144- $ idleTimeout = max (0.0 , min ($ maxDuration / 5 , $ idleTimeout ?? $ maxDuration ));
145- $ remainingDuration = $ maxDuration ;
146- }
147-
148- do {
149- foreach ($ this ->client ->stream ($ this ->promisePool , $ idleTimeout ) as $ response => $ chunk ) {
150- try {
151- if (null !== $ maxDuration && $ chunk ->isTimeout ()) {
152- goto check_duration;
153- }
154-
155- if ($ chunk ->isFirst ()) {
156- // Deactivate throwing on 3/4/5xx
157- $ response ->getStatusCode ();
158- }
159-
160- if (!$ chunk ->isLast ()) {
161- goto check_duration;
162- }
163-
164- if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
165- unset($ this ->promisePool [$ response ]);
166- $ promise ->resolve ($ this ->createPsr7Response ($ response , true ));
167- }
168- } catch (\Exception $ e ) {
169- if ([$ request , $ promise ] = $ this ->promisePool [$ response ] ?? null ) {
170- unset($ this ->promisePool [$ response ]);
171-
172- if ($ e instanceof TransportExceptionInterface) {
173- $ e = new NetworkException ($ e ->getMessage (), $ request , $ e );
174- }
175-
176- $ promise ->reject ($ e );
177- }
178- }
179-
180- if ($ pendingResponse === $ response ) {
181- return \count ($ this ->promisePool );
182- }
183-
184- check_duration:
185- if (null !== $ maxDuration && $ idleTimeout && $ idleTimeout > $ remainingDuration = max (0.0 , $ maxDuration - microtime (true ) + $ startTime )) {
186- $ idleTimeout = $ remainingDuration / 5 ;
187- break ;
188- }
189- }
190-
191- if (!$ count = \count ($ this ->promisePool )) {
192- return 0 ;
193- }
194- } while (null !== $ maxDuration && 0 < $ remainingDuration );
195-
196- return $ count ;
137+ return $ this ->waitLoop ->wait (null , $ maxDuration , $ idleTimeout );
197138 }
198139
199140 /**
@@ -265,6 +206,11 @@ public function createUri($uri): UriInterface
265206 return new Uri ($ uri );
266207 }
267208
209+ public function __destruct ()
210+ {
211+ $ this ->wait ();
212+ }
213+
268214 private function sendPsr7Request (RequestInterface $ request , bool $ buffer = null ): ResponseInterface
269215 {
270216 try {
@@ -286,29 +232,4 @@ private function sendPsr7Request(RequestInterface $request, bool $buffer = null)
286232 throw new NetworkException ($ e ->getMessage (), $ request , $ e );
287233 }
288234 }
289-
290- private function createPsr7Response (ResponseInterface $ response , bool $ buffer = false ): Psr7ResponseInterface
291- {
292- $ psrResponse = $ this ->responseFactory ->createResponse ($ response ->getStatusCode ());
293-
294- foreach ($ response ->getHeaders (false ) as $ name => $ values ) {
295- foreach ($ values as $ value ) {
296- $ psrResponse = $ psrResponse ->withAddedHeader ($ name , $ value );
297- }
298- }
299-
300- if (isset (class_uses ($ response )[ResponseTrait::class])) {
301- $ body = $ this ->streamFactory ->createStreamFromResource ($ response ->toStream (false ));
302- } elseif (!$ buffer ) {
303- $ body = $ this ->streamFactory ->createStreamFromResource (StreamWrapper::createResource ($ response , $ this ->client ));
304- } else {
305- $ body = $ this ->streamFactory ->createStream ($ response ->getContent (false ));
306- }
307-
308- if ($ body ->isSeekable ()) {
309- $ body ->seek (0 );
310- }
311-
312- return $ psrResponse ->withBody ($ body );
313- }
314235}
0 commit comments