Skip to content

Commit a9c1dcf

Browse files
committed
Also support WritableStreamInterface for first()
1 parent 1a513f4 commit a9c1dcf

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ The promise will reject if it is canceled.
5656

5757
### first()
5858

59-
The `first(ReadableStreamInterface $stream, $event = 'data')` function can be used to create
60-
a `Promise` which resolves once the given event triggers for the first time.
59+
The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
60+
function can be used to create a `Promise` which resolves once the given event triggers for the first time.
6161

6262
```php
6363
$stream = accessSomeJsonStream();

src/functions.php

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use React\Stream\ReadableStreamInterface;
66
use React\Promise;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\WritableStreamInterface;
9+
use Evenement\EventEmitterInterface;
810

911
/**
1012
* Creates a `Promise` which resolves with the stream data buffer
@@ -49,15 +51,23 @@ function buffer(ReadableStreamInterface $stream)
4951
/**
5052
* Creates a `Promise` which resolves with the first event data
5153
*
52-
* @param ReadableStreamInterface $stream
53-
* @param string $event
54+
* @param ReadableStreamInterface|WritableStreamInterface $stream
55+
* @param string $event
5456
* @return CancellablePromiseInterface Promise<mixed, Exception>
5557
*/
56-
function first(ReadableStreamInterface $stream, $event = 'data')
58+
function first(EventEmitterInterface $stream, $event = 'data')
5759
{
58-
// stream already ended => reject with error
59-
if (!$stream->isReadable()) {
60-
return Promise\reject(new \RuntimeException('Stream already closed'));
60+
if ($stream instanceof ReadableStreamInterface) {
61+
// readable or duplex stream not readable => already closed
62+
// a half-open duplex stream is considered closed if its readable side is closed
63+
if (!$stream->isReadable()) {
64+
return Promise\reject(new \RuntimeException('Stream already closed'));
65+
}
66+
} elseif ($stream instanceof WritableStreamInterface) {
67+
// writable-only stream (not duplex) not writable => already closed
68+
if (!$stream->isWritable()) {
69+
return Promise\reject(new \RuntimeException('Stream already closed'));
70+
}
6171
}
6272

6373
return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) {

tests/FirstTest.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
use React\Stream\ReadableStream;
44
use Clue\React\Promise\Stream;
55
use React\Promise\CancellablePromiseInterface;
6+
use React\Stream\WritableStream;
67

78
class FirstTest extends TestCase
89
{
9-
public function testClosedStreamRejects()
10+
public function testClosedReadableStreamRejects()
1011
{
1112
$stream = new ReadableStream();
1213
$stream->close();
@@ -16,6 +17,16 @@ public function testClosedStreamRejects()
1617
$this->expectPromiseReject($promise);
1718
}
1819

20+
public function testClosedWritableStreamRejects()
21+
{
22+
$stream = new WritableStream();
23+
$stream->close();
24+
25+
$promise = Stream\first($stream);
26+
27+
$this->expectPromiseReject($promise);
28+
}
29+
1930
public function testPendingStreamWillNotResolve()
2031
{
2132
$stream = new ReadableStream();
@@ -35,6 +46,16 @@ public function testClosingStreamRejects()
3546
$this->expectPromiseReject($promise);
3647
}
3748

49+
public function testClosingWritableStreamRejects()
50+
{
51+
$stream = new WritableStream();
52+
$promise = Stream\first($stream);
53+
54+
$stream->close();
55+
56+
$this->expectPromiseReject($promise);
57+
}
58+
3859
public function testClosingStreamResolvesWhenWaitingForCloseEvent()
3960
{
4061
$stream = new ReadableStream();

0 commit comments

Comments
 (0)