Skip to content

Commit d2b185a

Browse files
committed
Support back-pressure (forward pause() and resume() calls)
1 parent 8bd3b3f commit d2b185a

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

src/UnwrapReadableStream.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ public function __construct(PromiseInterface $promise)
2828
{
2929
$out = $this;
3030

31-
// TODO: support backpressure
32-
3331
$this->promise = $promise->then(
3432
function ($stream) {
3533
if (!($stream instanceof ReadableStreamInterface)) {
@@ -78,10 +76,16 @@ public function isReadable()
7876

7977
public function pause()
8078
{
79+
$this->promise->then(function (ReadableStreamInterface $stream) {
80+
$stream->pause();
81+
});
8182
}
8283

8384
public function resume()
8485
{
86+
$this->promise->then(function (ReadableStreamInterface $stream) {
87+
$stream->resume();
88+
});
8589
}
8690

8791
public function pipe(WritableStreamInterface $dest, array $options = array())

tests/UnwrapReadableTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,26 @@ public function testEmitsCloseOnlyOnceWhenClosingStreamMultipleTimes()
164164
$stream->close();
165165
$stream->close();
166166
}
167+
168+
public function testForwardsPauseToInputStream()
169+
{
170+
$input = $this->getMock('React\Stream\ReadableStreamInterface');
171+
$input->expects($this->once())->method('pause');
172+
173+
$promise = Promise\resolve($input);
174+
$stream = Stream\unwrapReadable($promise);
175+
176+
$stream->pause();
177+
}
178+
179+
public function testForwardsResumeToInputStream()
180+
{
181+
$input = $this->getMock('React\Stream\ReadableStreamInterface');
182+
$input->expects($this->once())->method('resume');
183+
184+
$promise = Promise\resolve($input);
185+
$stream = Stream\unwrapReadable($promise);
186+
187+
$stream->resume();
188+
}
167189
}

0 commit comments

Comments
 (0)