Skip to content

Commit cf4ace0

Browse files
committed
Only remove stream from loop if it was actually added
1 parent 56f3100 commit cf4ace0

File tree

4 files changed

+130
-4
lines changed

4 files changed

+130
-4
lines changed

src/DuplexResourceStream.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt
3333
private $readable = true;
3434
private $writable = true;
3535
private $closing = false;
36+
private $listening = false;
3637

3738
public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null)
3839
{
@@ -100,13 +101,17 @@ public function isWritable()
100101

101102
public function pause()
102103
{
103-
$this->loop->removeReadStream($this->stream);
104+
if ($this->listening) {
105+
$this->loop->removeReadStream($this->stream);
106+
$this->listening = false;
107+
}
104108
}
105109

106110
public function resume()
107111
{
108-
if ($this->readable) {
112+
if (!$this->listening && $this->readable) {
109113
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
114+
$this->listening = true;
110115
}
111116
}
112117

src/ReadableResourceStream.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea
3636
private $bufferSize;
3737

3838
private $closed = false;
39+
private $listening = false;
3940

4041
public function __construct($stream, LoopInterface $loop, $readChunkSize = null)
4142
{
@@ -81,13 +82,17 @@ public function isReadable()
8182

8283
public function pause()
8384
{
84-
$this->loop->removeReadStream($this->stream);
85+
if ($this->listening) {
86+
$this->loop->removeReadStream($this->stream);
87+
$this->listening = false;
88+
}
8589
}
8690

8791
public function resume()
8892
{
89-
if (!$this->closed) {
93+
if (!$this->listening && !$this->closed) {
9094
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
95+
$this->listening = true;
9196
}
9297
}
9398

tests/DuplexResourceStreamTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,35 @@ public function testEndRemovesReadStreamFromLoop()
249249
$conn->end('bye');
250250
}
251251

252+
/**
253+
* @covers React\Stream\DuplexResourceStream::pause
254+
*/
255+
public function testPauseRemovesReadStreamFromLoop()
256+
{
257+
$stream = fopen('php://temp', 'r+');
258+
$loop = $this->createLoopMock();
259+
$loop->expects($this->once())->method('addReadStream')->with($stream);
260+
$loop->expects($this->once())->method('removeReadStream')->with($stream);
261+
262+
$conn = new DuplexResourceStream($stream, $loop);
263+
$conn->pause();
264+
$conn->pause();
265+
}
266+
267+
/**
268+
* @covers React\Stream\DuplexResourceStream::pause
269+
*/
270+
public function testResumeDoesAddStreamToLoopOnlyOnce()
271+
{
272+
$stream = fopen('php://temp', 'r+');
273+
$loop = $this->createLoopMock();
274+
$loop->expects($this->once())->method('addReadStream')->with($stream);
275+
276+
$conn = new DuplexResourceStream($stream, $loop);
277+
$conn->resume();
278+
$conn->resume();
279+
}
280+
252281
/**
253282
* @covers React\Stream\DuplexResourceStream::close
254283
*/
@@ -263,6 +292,35 @@ public function testCloseRemovesReadStreamFromLoop()
263292
$conn->close();
264293
}
265294

295+
/**
296+
* @covers React\Stream\DuplexResourceStream::close
297+
*/
298+
public function testCloseAfterPauseRemovesReadStreamFromLoopOnlyOnce()
299+
{
300+
$stream = fopen('php://temp', 'r+');
301+
$loop = $this->createLoopMock();
302+
$loop->expects($this->once())->method('addReadStream')->with($stream);
303+
$loop->expects($this->once())->method('removeReadStream')->with($stream);
304+
305+
$conn = new DuplexResourceStream($stream, $loop);
306+
$conn->pause();
307+
$conn->close();
308+
}
309+
310+
/**
311+
* @covers React\Stream\DuplexResourceStream::close
312+
*/
313+
public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce()
314+
{
315+
$stream = fopen('php://temp', 'r+');
316+
$loop = $this->createLoopMock();
317+
$loop->expects($this->once())->method('addReadStream')->with($stream);
318+
319+
$conn = new DuplexResourceStream($stream, $loop);
320+
$conn->close();
321+
$conn->resume();
322+
}
323+
266324
public function testEndedStreamsShouldNotWrite()
267325
{
268326
$file = tempnam(sys_get_temp_dir(), 'reactphptest_');

tests/ReadableResourceStreamTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,35 @@ public function testClosingStreamInDataEventShouldNotTriggerError()
204204
$conn->handleData($stream);
205205
}
206206

207+
/**
208+
* @covers React\Stream\ReadableResourceStream::pause
209+
*/
210+
public function testPauseRemovesReadStreamFromLoop()
211+
{
212+
$stream = fopen('php://temp', 'r+');
213+
$loop = $this->createLoopMock();
214+
$loop->expects($this->once())->method('addReadStream')->with($stream);
215+
$loop->expects($this->once())->method('removeReadStream')->with($stream);
216+
217+
$conn = new ReadableResourceStream($stream, $loop);
218+
$conn->pause();
219+
$conn->pause();
220+
}
221+
222+
/**
223+
* @covers React\Stream\ReadableResourceStream::pause
224+
*/
225+
public function testResumeDoesAddStreamToLoopOnlyOnce()
226+
{
227+
$stream = fopen('php://temp', 'r+');
228+
$loop = $this->createLoopMock();
229+
$loop->expects($this->once())->method('addReadStream')->with($stream);
230+
231+
$conn = new ReadableResourceStream($stream, $loop);
232+
$conn->resume();
233+
$conn->resume();
234+
}
235+
207236
/**
208237
* @covers React\Stream\ReadableResourceStream::close
209238
*/
@@ -218,6 +247,35 @@ public function testCloseRemovesReadStreamFromLoop()
218247
$conn->close();
219248
}
220249

250+
/**
251+
* @covers React\Stream\ReadableResourceStream::close
252+
*/
253+
public function testCloseAfterPauseRemovesReadStreamFromLoopOnce()
254+
{
255+
$stream = fopen('php://temp', 'r+');
256+
$loop = $this->createLoopMock();
257+
$loop->expects($this->once())->method('addReadStream')->with($stream);
258+
$loop->expects($this->once())->method('removeReadStream')->with($stream);
259+
260+
$conn = new ReadableResourceStream($stream, $loop);
261+
$conn->pause();
262+
$conn->close();
263+
}
264+
265+
/**
266+
* @covers React\Stream\ReadableResourceStream::close
267+
*/
268+
public function testResumeAfterCloseDoesAddReadStreamToLoopOnlyOnce()
269+
{
270+
$stream = fopen('php://temp', 'r+');
271+
$loop = $this->createLoopMock();
272+
$loop->expects($this->once())->method('addReadStream')->with($stream);
273+
274+
$conn = new ReadableResourceStream($stream, $loop);
275+
$conn->close();
276+
$conn->resume();
277+
}
278+
221279
/**
222280
* @covers React\Stream\ReadableResourceStream::handleData
223281
*/

0 commit comments

Comments
 (0)