1616 * @event response
1717 * @event drain
1818 * @event error
19- * @event end
19+ * @event close
2020 * @internal
2121 */
2222class ClientRequestStream extends EventEmitter implements WritableStreamInterface
@@ -33,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
3333 private $ request ;
3434
3535 /** @var ?ConnectionInterface */
36- private $ stream ;
36+ private $ connection ;
37+
38+ /** @var string */
39+ private $ buffer = '' ;
3740
38- private $ buffer ;
3941 private $ responseFactory ;
4042 private $ state = self ::STATE_INIT ;
4143 private $ ended = false ;
@@ -58,22 +60,22 @@ private function writeHead()
5860 $ this ->state = self ::STATE_WRITING_HEAD ;
5961
6062 $ request = $ this ->request ;
61- $ streamRef = &$ this ->stream ;
63+ $ connectionRef = &$ this ->connection ;
6264 $ stateRef = &$ this ->state ;
6365 $ pendingWrites = &$ this ->pendingWrites ;
6466 $ that = $ this ;
6567
6668 $ promise = $ this ->connect ();
6769 $ promise ->then (
68- function (ConnectionInterface $ stream ) use ($ request , &$ streamRef , &$ stateRef , &$ pendingWrites , $ that ) {
69- $ streamRef = $ stream ;
70- assert ($ streamRef instanceof ConnectionInterface);
70+ function (ConnectionInterface $ connection ) use ($ request , &$ connectionRef , &$ stateRef , &$ pendingWrites , $ that ) {
71+ $ connectionRef = $ connection ;
72+ assert ($ connectionRef instanceof ConnectionInterface);
7173
72- $ stream ->on ('drain ' , array ($ that , 'handleDrain ' ));
73- $ stream ->on ('data ' , array ($ that , 'handleData ' ));
74- $ stream ->on ('end ' , array ($ that , 'handleEnd ' ));
75- $ stream ->on ('error ' , array ($ that , 'handleError ' ));
76- $ stream ->on ('close ' , array ($ that , 'handleClose ' ));
74+ $ connection ->on ('drain ' , array ($ that , 'handleDrain ' ));
75+ $ connection ->on ('data ' , array ($ that , 'handleData ' ));
76+ $ connection ->on ('end ' , array ($ that , 'handleEnd ' ));
77+ $ connection ->on ('error ' , array ($ that , 'handleError ' ));
78+ $ connection ->on ('close ' , array ($ that , 'close ' ));
7779
7880 assert ($ request instanceof RequestInterface);
7981 $ headers = "{$ request ->getMethod ()} {$ request ->getRequestTarget ()} HTTP/ {$ request ->getProtocolVersion ()}\r\n" ;
@@ -83,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &
8385 }
8486 }
8587
86- $ more = $ stream ->write ($ headers . "\r\n" . $ pendingWrites );
88+ $ more = $ connection ->write ($ headers . "\r\n" . $ pendingWrites );
8789
8890 assert ($ stateRef === ClientRequestStream::STATE_WRITING_HEAD );
8991 $ stateRef = ClientRequestStream::STATE_HEAD_WRITTEN ;
@@ -113,7 +115,7 @@ public function write($data)
113115
114116 // write directly to connection stream if already available
115117 if (self ::STATE_HEAD_WRITTEN <= $ this ->state ) {
116- return $ this ->stream ->write ($ data );
118+ return $ this ->connection ->write ($ data );
117119 }
118120
119121 // otherwise buffer and try to establish connection
@@ -157,26 +159,28 @@ public function handleData($data)
157159 $ response = gPsr \parse_response ($ this ->buffer );
158160 $ bodyChunk = (string ) $ response ->getBody ();
159161 } catch (\InvalidArgumentException $ exception ) {
160- $ this ->emit ('error ' , array ($ exception ));
161- }
162-
163- $ this ->buffer = null ;
164-
165- $ this ->stream ->removeListener ('drain ' , array ($ this , 'handleDrain ' ));
166- $ this ->stream ->removeListener ('data ' , array ($ this , 'handleData ' ));
167- $ this ->stream ->removeListener ('end ' , array ($ this , 'handleEnd ' ));
168- $ this ->stream ->removeListener ('error ' , array ($ this , 'handleError ' ));
169- $ this ->stream ->removeListener ('close ' , array ($ this , 'handleClose ' ));
170-
171- if (!isset ($ response )) {
162+ $ this ->closeError ($ exception );
172163 return ;
173164 }
174165
175- $ this ->stream ->on ('close ' , array ($ this , 'handleClose ' ));
176-
177- assert ($ response instanceof ResponseInterface);
178- assert ($ this ->stream instanceof ConnectionInterface);
179- $ body = $ this ->stream ;
166+ // response headers successfully received => remove listeners for connection events
167+ $ connection = $ this ->connection ;
168+ assert ($ connection instanceof ConnectionInterface);
169+ $ connection ->removeListener ('drain ' , array ($ this , 'handleDrain ' ));
170+ $ connection ->removeListener ('data ' , array ($ this , 'handleData ' ));
171+ $ connection ->removeListener ('end ' , array ($ this , 'handleEnd ' ));
172+ $ connection ->removeListener ('error ' , array ($ this , 'handleError ' ));
173+ $ connection ->removeListener ('close ' , array ($ this , 'close ' ));
174+ $ this ->connection = null ;
175+ $ this ->buffer = '' ;
176+
177+ // take control over connection handling and close connection once response body closes
178+ $ that = $ this ;
179+ $ input = $ body = new CloseProtectionStream ($ connection );
180+ $ input ->on ('close ' , function () use ($ connection , $ that ) {
181+ $ connection ->close ();
182+ $ that ->close ();
183+ });
180184
181185 // determine length of response body
182186 $ length = null ;
@@ -194,7 +198,11 @@ public function handleData($data)
194198 $ this ->emit ('response ' , array ($ response , $ body ));
195199
196200 // re-emit HTTP response body to trigger body parsing if parts of it are buffered
197- $ this ->stream ->emit ('data ' , array ($ bodyChunk ));
201+ if ($ bodyChunk !== '' ) {
202+ $ input ->handleData ($ bodyChunk );
203+ } elseif ($ length === 0 ) {
204+ $ input ->handleEnd ();
205+ }
198206 }
199207 }
200208
@@ -216,12 +224,6 @@ public function handleError(\Exception $error)
216224 ));
217225 }
218226
219- /** @internal */
220- public function handleClose ()
221- {
222- $ this ->close ();
223- }
224-
225227 /** @internal */
226228 public function closeError (\Exception $ error )
227229 {
@@ -240,9 +242,11 @@ public function close()
240242
241243 $ this ->state = self ::STATE_END ;
242244 $ this ->pendingWrites = '' ;
245+ $ this ->buffer = '' ;
243246
244- if ($ this ->stream ) {
245- $ this ->stream ->close ();
247+ if ($ this ->connection instanceof ConnectionInterface) {
248+ $ this ->connection ->close ();
249+ $ this ->connection = null ;
246250 }
247251
248252 $ this ->emit ('close ' );
0 commit comments