33namespace TraderInteractive \Api ;
44
55use ArrayObject ;
6+ use GuzzleHttp \Pool ;
67use TraderInteractive \Util ;
78use GuzzleHttp \Client as GuzzleClient ;
89use GuzzleHttp \ClientInterface as GuzzleClientInterface ;
@@ -22,11 +23,11 @@ final class GuzzleAdapter implements AdapterInterface
2223 const DEFAULT_CONCURRENCY_LIMIT = PHP_INT_MAX ;
2324
2425 /**
25- * Collection of Promise\PromiseInterface instances with keys matching what was given from start().
26+ * Collection of RequestInterface instances with keys matching what was given from start().
2627 *
2728 * @var array
2829 */
29- private $ promises = [];
30+ private $ requests = [];
3031
3132 /**
3233 * Collection of Api\Response with keys matching what was given from start().
@@ -72,7 +73,7 @@ public function __construct(
7273 public function start (RequestInterface $ request ) : string
7374 {
7475 $ handle = uniqid ();
75- $ this ->promises [$ handle ] = $ this -> client -> sendAsync ( $ request) ;
76+ $ this ->requests [$ handle ] = $ request ;
7677 return $ handle ;
7778 }
7879
@@ -83,7 +84,7 @@ public function start(RequestInterface $request) : string
8384 */
8485 public function end (string $ endHandle ) : ResponseInterface
8586 {
86- $ results = $ this ->fulfillPromises ($ this ->promises , $ this ->exceptions );
87+ $ results = $ this ->fulfillPromises ($ this ->requests , $ this ->exceptions );
8788 foreach ($ results as $ handle => $ response ) {
8889 try {
8990 $ contents = (string )$ response ->getBody ();
@@ -103,7 +104,7 @@ public function end(string $endHandle) : ResponseInterface
103104 }
104105 }
105106
106- $ this ->promises = [];
107+ $ this ->requests = [];
107108
108109 if ($ this ->exceptions ->offsetExists ($ endHandle )) {
109110 $ exception = $ this ->exceptions [$ endHandle ];
@@ -121,31 +122,31 @@ public function end(string $endHandle) : ResponseInterface
121122 }
122123
123124 /**
124- * Helper method to execute all guzzle promises.
125- *
126- * @param array $promises
127- * @param array $exceptions
128- *
129- * @return array Array of fulfilled PSR7 responses.
125+ * @return ResponseInterface[]
130126 */
131- private function fulfillPromises (array $ promises , ArrayObject $ exceptions ) : array
127+ private function fulfillPromises (array $ requests , ArrayObject $ exceptions ) : array
132128 {
133- if (empty ($ promises )) {
129+ if (empty ($ requests )) {
134130 return [];
135131 }
136132
137- $ results = new ArrayObject ();
138- Promise \Each::ofLimit (
139- $ this ->promises ,
140- $ this ->concurrencyLimit ,
141- function (ResponseInterface $ response , $ index ) use ($ results ) {
142- $ results [$ index ] = $ response ;
143- },
144- function (RequestException $ e , $ index ) use ($ exceptions ) {
145- $ exceptions [$ index ] = $ e ;
146- }
147- )->wait ();
133+ $ responses = new ArrayObject ();
134+ $ pool = new Pool (
135+ $ this ->client ,
136+ $ requests ,
137+ [
138+ 'concurrency ' => $ this ->concurrencyLimit ,
139+ Promise \Promise::FULFILLED => function (ResponseInterface $ response , $ index ) use ($ responses ) {
140+ $ responses [$ index ] = $ response ;
141+ },
142+ Promise \Promise::REJECTED => function (RequestException $ e , $ index ) use ($ exceptions ) {
143+ $ exceptions [$ index ] = $ e ;
144+ }
145+ ]
146+ );
147+ $ promise = $ pool ->promise ();
148+ $ promise ->wait ();
148149
149- return $ results ->getArrayCopy ();
150+ return $ responses ->getArrayCopy ();
150151 }
151152}
0 commit comments