Skip to content

Commit 1a513f4

Browse files
committed
Add first() function
1 parent 8311a28 commit 1a513f4

File tree

3 files changed

+155
-0
lines changed

3 files changed

+155
-0
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ built on top of [React PHP](http://reactphp.org/).
77

88
* [Usage](#usage)
99
* [buffer()](#buffer)
10+
* [first()](#first)
1011
* [unwrapReadable()](#unwrapreadable)
1112
* [Install](#install)
1213
* [License](#license)
@@ -45,6 +46,39 @@ Stream\buffer($stream)->then(function ($contents) {
4546
});
4647
```
4748

49+
The promise will resolve with all data chunks concatenated once the stream closes.
50+
51+
The promise will resolve with an empty string if the stream is already closed.
52+
53+
The promise will reject if the stream emits an error.
54+
55+
The promise will reject if it is canceled.
56+
57+
### first()
58+
59+
The `first(ReadableStreamInterface $stream, $event = 'data')` function can be used to create
60+
a `Promise` which resolves once the given event triggers for the first time.
61+
62+
```php
63+
$stream = accessSomeJsonStream();
64+
65+
Stream\first($stream)->then(function ($chunk) {
66+
echo 'The first chunk arrived: ' . $chunk;
67+
});
68+
```
69+
70+
The promise will resolve with whatever the first event emitted or `null` if the
71+
event does not pass any data.
72+
If you do not pass a custom event name, then it will wait for the first "data"
73+
event and resolve with a string containing the first data chunk.
74+
75+
The promise will reject once the stream closes – unless you're waiting for the
76+
"close" event, in which case it will resolve.
77+
78+
The promise will reject if the stream is already closed.
79+
80+
The promise will reject if it is canceled.
81+
4882
### unwrapReadable()
4983

5084
The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap

src/functions.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,37 @@ function buffer(ReadableStreamInterface $stream)
4646
});
4747
}
4848

49+
/**
50+
* Creates a `Promise` which resolves with the first event data
51+
*
52+
* @param ReadableStreamInterface $stream
53+
* @param string $event
54+
* @return CancellablePromiseInterface Promise<mixed, Exception>
55+
*/
56+
function first(ReadableStreamInterface $stream, $event = 'data')
57+
{
58+
// stream already ended => reject with error
59+
if (!$stream->isReadable()) {
60+
return Promise\reject(new \RuntimeException('Stream already closed'));
61+
}
62+
63+
return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) {
64+
$listener = function ($data) use ($stream, $event, &$listener, $resolve) {
65+
$stream->removeListener($event, $listener);
66+
$resolve($data);
67+
};
68+
$stream->on($event, $listener);
69+
70+
$stream->on('close', function () use ($stream, $event, $listener, $reject) {
71+
$stream->removeListener($event, $listener);
72+
$reject(new \RuntimeException('Stream closed'));
73+
});
74+
}, function ($_, $reject) use ($stream, $event, &$listener) {
75+
$stream->removeListener($event, $listener);
76+
$reject(new \RuntimeException('Operation cancelled'));
77+
});
78+
}
79+
4980
/**
5081
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
5182
*

tests/FirstTest.php

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
use React\Stream\ReadableStream;
4+
use Clue\React\Promise\Stream;
5+
use React\Promise\CancellablePromiseInterface;
6+
7+
class FirstTest extends TestCase
8+
{
9+
public function testClosedStreamRejects()
10+
{
11+
$stream = new ReadableStream();
12+
$stream->close();
13+
14+
$promise = Stream\first($stream);
15+
16+
$this->expectPromiseReject($promise);
17+
}
18+
19+
public function testPendingStreamWillNotResolve()
20+
{
21+
$stream = new ReadableStream();
22+
23+
$promise = Stream\first($stream);
24+
25+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
26+
}
27+
28+
public function testClosingStreamRejects()
29+
{
30+
$stream = new ReadableStream();
31+
$promise = Stream\first($stream);
32+
33+
$stream->close();
34+
35+
$this->expectPromiseReject($promise);
36+
}
37+
38+
public function testClosingStreamResolvesWhenWaitingForCloseEvent()
39+
{
40+
$stream = new ReadableStream();
41+
$promise = Stream\first($stream, 'close');
42+
43+
$stream->close();
44+
45+
$this->expectPromiseResolve($promise);
46+
}
47+
48+
public function testEmittingDataOnStreamResolvesWithFirstEvent()
49+
{
50+
$stream = new ReadableStream();
51+
$promise = Stream\first($stream);
52+
53+
$stream->emit('data', array('hello', $stream));
54+
$stream->emit('data', array('world', $stream));
55+
$stream->close();
56+
57+
$this->expectPromiseResolveWith('hello', $promise);
58+
}
59+
60+
public function testEmittingErrorOnStreamDoesNothing()
61+
{
62+
$stream = new ReadableStream();
63+
$promise = Stream\first($stream);
64+
65+
$stream->emit('error', array(new \RuntimeException('test')));
66+
67+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
68+
}
69+
70+
public function testEmittingErrorResolvesWhenWaitingForErrorEvent()
71+
{
72+
$stream = new ReadableStream();
73+
$promise = Stream\first($stream, 'error');
74+
75+
$stream->emit('error', array(new \RuntimeException('test')));
76+
77+
$this->expectPromiseResolve($promise);
78+
}
79+
80+
public function testCancelPendingStreamWillReject()
81+
{
82+
$stream = new ReadableStream();
83+
84+
$promise = Stream\first($stream);
85+
86+
$promise->cancel();
87+
88+
$this->expectPromiseReject($promise);
89+
}
90+
}

0 commit comments

Comments
 (0)