77use Http \Client \Exception \HttpException ;
88use Http \Client \Exception \RequestException ;
99use Http \Discovery \MessageFactoryDiscovery ;
10+ use Http \Discovery \StreamFactoryDiscovery ;
1011use Http \Message \ResponseFactory ;
12+ use Http \Message \StreamFactory ;
1113use Psr \Http \Message \RequestInterface ;
1214use Psr \Http \Message \ResponseInterface ;
1315use Psr \Http \Message \StreamInterface ;
@@ -43,17 +45,24 @@ class Client implements HttpClient, HttpAsyncClient
4345 */
4446 private $ responseFactory ;
4547
48+ /**
49+ * @var StreamFactory
50+ */
51+ private $ streamFactory ;
52+
4653 /**
4754 * Initialize the React client.
4855 *
4956 * @param ResponseFactory|null $responseFactory
5057 * @param LoopInterface|null $loop
5158 * @param ReactClient|null $client
59+ * @param StreamFactory|null $streamFactory
5260 */
5361 public function __construct (
5462 ResponseFactory $ responseFactory = null ,
5563 LoopInterface $ loop = null ,
56- ReactClient $ client = null
64+ ReactClient $ client = null ,
65+ StreamFactory $ streamFactory = null
5766 ) {
5867 if (null !== $ client && null === $ loop ) {
5968 throw new \RuntimeException (
@@ -65,6 +74,7 @@ public function __construct(
6574 $ this ->client = $ client ?: ReactFactory::buildHttpClient ($ this ->loop );
6675
6776 $ this ->responseFactory = $ responseFactory ?: MessageFactoryDiscovery::find ();
77+ $ this ->streamFactory = $ streamFactory ?: StreamFactoryDiscovery::find ();
6878 }
6979
7080 /**
@@ -94,17 +104,12 @@ public function sendAsyncRequest(RequestInterface $request)
94104 });
95105
96106 $ reactRequest ->on ('response ' , function (ReactResponse $ reactResponse = null ) use ($ deferred , $ reactRequest , $ request ) {
97- $ bodyStream = null ;
107+ $ bodyStream = $ this -> streamFactory -> createStream () ;
98108 $ reactResponse ->on ('data ' , function ($ data ) use (&$ bodyStream ) {
99- if ($ data instanceof StreamInterface) {
100- $ bodyStream = $ data ;
101- } else {
102- $ bodyStream ->write ($ data );
103- }
109+ $ bodyStream ->write ((string ) $ data );
104110 });
105111
106112 $ reactResponse ->on ('end ' , function (\Exception $ error = null ) use ($ deferred , $ request , $ reactResponse , &$ bodyStream ) {
107- $ bodyStream ->rewind ();
108113 $ response = $ this ->buildResponse (
109114 $ reactResponse ,
110115 $ bodyStream
@@ -158,7 +163,8 @@ private function buildReactRequest(RequestInterface $request)
158163 /**
159164 * Transform a React Response to a valid PSR7 ResponseInterface instance.
160165 *
161- * @param ReactResponse $response
166+ * @param ReactResponse $response
167+ * @param StreamInterface $body
162168 *
163169 * @return ResponseInterface
164170 */
0 commit comments