Skip to content

Commit f3d59ec

Browse files
committed
Silently close input stream if promise does not reject after cancellation
1 parent 977d94f commit f3d59ec

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

src/UnwrapReadableStream.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterfa
2727
public function __construct(PromiseInterface $promise)
2828
{
2929
$out = $this;
30+
$closed =& $this->closed;
3031

3132
$this->promise = $promise->then(
3233
function ($stream) {
@@ -36,12 +37,19 @@ function ($stream) {
3637
return $stream;
3738
}
3839
)->then(
39-
function (ReadableStreamInterface $stream) use ($out) {
40+
function (ReadableStreamInterface $stream) use ($out, &$closed) {
41+
// stream is already closed, make sure to close output stream
4042
if (!$stream->isReadable()) {
4143
$out->close();
4244
return $stream;
4345
}
4446

47+
// resolves but output is already closed, make sure to close stream silently
48+
if ($closed) {
49+
$stream->close();
50+
return $stream;
51+
}
52+
4553
// stream any writes into output stream
4654
$stream->on('data', function ($data) use ($out) {
4755
$out->emit('data', array($data, $out));

tests/UnwrapReadableTest.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,43 @@ public function testPipingStreamWillForwardDataEvents()
210210

211211
$output->promise()->then($this->expectCallableOnceWith('helloworld'));
212212
}
213+
214+
public function testClosingStreamWillCloseStreamIfItIgnoredCancellationAndResolvesLater()
215+
{
216+
$input = new ReadableStream();
217+
218+
$loop = $this->loop;
219+
$promise = new Promise\Promise(function ($resolve) use ($loop, $input) {
220+
$loop->addTimer(0.001, function () use ($resolve, $input) {
221+
$resolve($input);
222+
});
223+
});
224+
225+
$stream = Stream\unwrapReadable($promise);
226+
227+
$stream->on('close', $this->expectCallableOnce());
228+
229+
$stream->close();
230+
231+
Block\await($promise, $this->loop);
232+
233+
$this->assertFalse($input->isReadable());
234+
}
235+
236+
public function testClosingStreamWillCloseStreamFromCancellationHandler()
237+
{
238+
$input = new ReadableStream();
239+
240+
$promise = new \React\Promise\Promise(function () { }, function ($resolve) use ($input) {
241+
$resolve($input);
242+
});
243+
244+
$stream = Stream\unwrapReadable($promise);
245+
246+
$stream->on('close', $this->expectCallableOnce());
247+
248+
$stream->close();
249+
250+
$this->assertFalse($input->isReadable());
251+
}
213252
}

0 commit comments

Comments
 (0)