Skip to content

Commit d60fc9f

Browse files
committed
Merge pull request #2 from clue-labs/first
Add first() function
2 parents 8311a28 + a9c1dcf commit d60fc9f

File tree

3 files changed

+186
-0
lines changed

3 files changed

+186
-0
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ built on top of [React PHP](http://reactphp.org/).
77

88
* [Usage](#usage)
99
* [buffer()](#buffer)
10+
* [first()](#first)
1011
* [unwrapReadable()](#unwrapreadable)
1112
* [Install](#install)
1213
* [License](#license)
@@ -45,6 +46,39 @@ Stream\buffer($stream)->then(function ($contents) {
4546
});
4647
```
4748

49+
The promise will resolve with all data chunks concatenated once the stream closes.
50+
51+
The promise will resolve with an empty string if the stream is already closed.
52+
53+
The promise will reject if the stream emits an error.
54+
55+
The promise will reject if it is canceled.
56+
57+
### first()
58+
59+
The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
60+
function can be used to create a `Promise` which resolves once the given event triggers for the first time.
61+
62+
```php
63+
$stream = accessSomeJsonStream();
64+
65+
Stream\first($stream)->then(function ($chunk) {
66+
echo 'The first chunk arrived: ' . $chunk;
67+
});
68+
```
69+
70+
The promise will resolve with whatever the first event emitted or `null` if the
71+
event does not pass any data.
72+
If you do not pass a custom event name, then it will wait for the first "data"
73+
event and resolve with a string containing the first data chunk.
74+
75+
The promise will reject once the stream closes – unless you're waiting for the
76+
"close" event, in which case it will resolve.
77+
78+
The promise will reject if the stream is already closed.
79+
80+
The promise will reject if it is canceled.
81+
4882
### unwrapReadable()
4983

5084
The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap

src/functions.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use React\Stream\ReadableStreamInterface;
66
use React\Promise;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\WritableStreamInterface;
9+
use Evenement\EventEmitterInterface;
810

911
/**
1012
* Creates a `Promise` which resolves with the stream data buffer
@@ -46,6 +48,45 @@ function buffer(ReadableStreamInterface $stream)
4648
});
4749
}
4850

51+
/**
52+
* Creates a `Promise` which resolves with the first event data
53+
*
54+
* @param ReadableStreamInterface|WritableStreamInterface $stream
55+
* @param string $event
56+
* @return CancellablePromiseInterface Promise<mixed, Exception>
57+
*/
58+
function first(EventEmitterInterface $stream, $event = 'data')
59+
{
60+
if ($stream instanceof ReadableStreamInterface) {
61+
// readable or duplex stream not readable => already closed
62+
// a half-open duplex stream is considered closed if its readable side is closed
63+
if (!$stream->isReadable()) {
64+
return Promise\reject(new \RuntimeException('Stream already closed'));
65+
}
66+
} elseif ($stream instanceof WritableStreamInterface) {
67+
// writable-only stream (not duplex) not writable => already closed
68+
if (!$stream->isWritable()) {
69+
return Promise\reject(new \RuntimeException('Stream already closed'));
70+
}
71+
}
72+
73+
return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) {
74+
$listener = function ($data) use ($stream, $event, &$listener, $resolve) {
75+
$stream->removeListener($event, $listener);
76+
$resolve($data);
77+
};
78+
$stream->on($event, $listener);
79+
80+
$stream->on('close', function () use ($stream, $event, $listener, $reject) {
81+
$stream->removeListener($event, $listener);
82+
$reject(new \RuntimeException('Stream closed'));
83+
});
84+
}, function ($_, $reject) use ($stream, $event, &$listener) {
85+
$stream->removeListener($event, $listener);
86+
$reject(new \RuntimeException('Operation cancelled'));
87+
});
88+
}
89+
4990
/**
5091
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
5192
*

tests/FirstTest.php

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
<?php
2+
3+
use React\Stream\ReadableStream;
4+
use Clue\React\Promise\Stream;
5+
use React\Promise\CancellablePromiseInterface;
6+
use React\Stream\WritableStream;
7+
8+
class FirstTest extends TestCase
9+
{
10+
public function testClosedReadableStreamRejects()
11+
{
12+
$stream = new ReadableStream();
13+
$stream->close();
14+
15+
$promise = Stream\first($stream);
16+
17+
$this->expectPromiseReject($promise);
18+
}
19+
20+
public function testClosedWritableStreamRejects()
21+
{
22+
$stream = new WritableStream();
23+
$stream->close();
24+
25+
$promise = Stream\first($stream);
26+
27+
$this->expectPromiseReject($promise);
28+
}
29+
30+
public function testPendingStreamWillNotResolve()
31+
{
32+
$stream = new ReadableStream();
33+
34+
$promise = Stream\first($stream);
35+
36+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
37+
}
38+
39+
public function testClosingStreamRejects()
40+
{
41+
$stream = new ReadableStream();
42+
$promise = Stream\first($stream);
43+
44+
$stream->close();
45+
46+
$this->expectPromiseReject($promise);
47+
}
48+
49+
public function testClosingWritableStreamRejects()
50+
{
51+
$stream = new WritableStream();
52+
$promise = Stream\first($stream);
53+
54+
$stream->close();
55+
56+
$this->expectPromiseReject($promise);
57+
}
58+
59+
public function testClosingStreamResolvesWhenWaitingForCloseEvent()
60+
{
61+
$stream = new ReadableStream();
62+
$promise = Stream\first($stream, 'close');
63+
64+
$stream->close();
65+
66+
$this->expectPromiseResolve($promise);
67+
}
68+
69+
public function testEmittingDataOnStreamResolvesWithFirstEvent()
70+
{
71+
$stream = new ReadableStream();
72+
$promise = Stream\first($stream);
73+
74+
$stream->emit('data', array('hello', $stream));
75+
$stream->emit('data', array('world', $stream));
76+
$stream->close();
77+
78+
$this->expectPromiseResolveWith('hello', $promise);
79+
}
80+
81+
public function testEmittingErrorOnStreamDoesNothing()
82+
{
83+
$stream = new ReadableStream();
84+
$promise = Stream\first($stream);
85+
86+
$stream->emit('error', array(new \RuntimeException('test')));
87+
88+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
89+
}
90+
91+
public function testEmittingErrorResolvesWhenWaitingForErrorEvent()
92+
{
93+
$stream = new ReadableStream();
94+
$promise = Stream\first($stream, 'error');
95+
96+
$stream->emit('error', array(new \RuntimeException('test')));
97+
98+
$this->expectPromiseResolve($promise);
99+
}
100+
101+
public function testCancelPendingStreamWillReject()
102+
{
103+
$stream = new ReadableStream();
104+
105+
$promise = Stream\first($stream);
106+
107+
$promise->cancel();
108+
109+
$this->expectPromiseReject($promise);
110+
}
111+
}

0 commit comments

Comments
 (0)