Skip to content

Commit 4c8c8d7

Browse files
committed
Merge pull request #3 from clue-labs/all
Add all() function
2 parents d60fc9f + 07b66aa commit 4c8c8d7

File tree

3 files changed

+180
-1
lines changed

3 files changed

+180
-1
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ built on top of [React PHP](http://reactphp.org/).
88
* [Usage](#usage)
99
* [buffer()](#buffer)
1010
* [first()](#first)
11+
* [all()](#all)
1112
* [unwrapReadable()](#unwrapreadable)
1213
* [Install](#install)
1314
* [License](#license)
@@ -79,6 +80,32 @@ The promise will reject if the stream is already closed.
7980

8081
The promise will reject if it is canceled.
8182

83+
### all()
84+
85+
The `all(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
86+
function can be used to create a `Promise` which resolves with an array of all the event data.
87+
88+
```php
89+
$stream = accessSomeJsonStream();
90+
91+
Stream\all($stream)->then(function ($chunks) {
92+
echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
93+
});
94+
```
95+
96+
The promise will resolve with an array of whatever all events emitted or `null` if the
97+
events do not pass any data.
98+
If you do not pass a custom event name, then it will wait for all the "data"
99+
events and resolve with an array containing all the data chunks.
100+
101+
The promise will resolve with an array once the stream closes.
102+
103+
The promise will resolve with an empty array if the stream is already closed.
104+
105+
The promise will reject if the stream emits an error.
106+
107+
The promise will reject if it is canceled.
108+
82109
### unwrapReadable()
83110

84111
The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap

src/functions.php

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ function buffer(ReadableStreamInterface $stream)
3535
$stream->on('close', function () use ($resolve, &$buffer) {
3636
$resolve($buffer);
3737
});
38-
}, function ($_, $reject) use ($buffer) {
38+
}, function ($_, $reject) {
3939
$reject(new \RuntimeException('Cancelled buffering'));
4040
});
4141

@@ -87,6 +87,56 @@ function first(EventEmitterInterface $stream, $event = 'data')
8787
});
8888
}
8989

90+
/**
91+
* Creates a `Promise` which resolves with an array of all the event data
92+
*
93+
* @param ReadableStreamInterface|WritableStreamInterface $stream
94+
* @param string $event
95+
* @return CancellablePromiseInterface Promise<string, Exception>
96+
*/
97+
function all(EventEmitterInterface $stream, $event = 'data')
98+
{
99+
// stream already ended => resolve with empty buffer
100+
if ($stream instanceof ReadableStreamInterface) {
101+
// readable or duplex stream not readable => already closed
102+
// a half-open duplex stream is considered closed if its readable side is closed
103+
if (!$stream->isReadable()) {
104+
return Promise\resolve(array());
105+
}
106+
} elseif ($stream instanceof WritableStreamInterface) {
107+
// writable-only stream (not duplex) not writable => already closed
108+
if (!$stream->isWritable()) {
109+
return Promise\resolve(array());
110+
}
111+
}
112+
113+
$buffer = array();
114+
$bufferer = function ($data) use (&$buffer) {
115+
$buffer []= $data;
116+
};
117+
$stream->on($event, $bufferer);
118+
119+
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
120+
$stream->on('error', function ($error) use ($reject) {
121+
$reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error));
122+
});
123+
124+
$stream->on('close', function () use ($resolve, &$buffer) {
125+
$resolve($buffer);
126+
});
127+
}, function ($_, $reject) {
128+
$reject(new \RuntimeException('Cancelled buffering'));
129+
});
130+
131+
return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream, $event) {
132+
// promise rejected => clear buffer and buffering
133+
$buffer = array();
134+
$stream->removeListener($event, $bufferer);
135+
136+
throw $error;
137+
});
138+
}
139+
90140
/**
91141
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
92142
*

tests/AllTest.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
use React\Stream\ReadableStream;
4+
use Clue\React\Promise\Stream;
5+
use React\Promise\CancellablePromiseInterface;
6+
use React\Stream\WritableStream;
7+
8+
class AllTest extends TestCase
9+
{
10+
public function testClosedStreamResolvesWithEmptyBuffer()
11+
{
12+
$stream = new ReadableStream();
13+
$stream->close();
14+
15+
$promise = Stream\all($stream);
16+
17+
$this->expectPromiseResolveWith(array(), $promise);
18+
}
19+
20+
public function testClosedWritableStreamResolvesWithEmptyBuffer()
21+
{
22+
$stream = new WritableStream();
23+
$stream->close();
24+
25+
$promise = Stream\all($stream);
26+
27+
$this->expectPromiseResolveWith(array(), $promise);
28+
}
29+
30+
public function testPendingStreamWillNotResolve()
31+
{
32+
$stream = new ReadableStream();
33+
34+
$promise = Stream\all($stream);
35+
36+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
37+
}
38+
39+
public function testClosingStreamResolvesWithEmptyBuffer()
40+
{
41+
$stream = new ReadableStream();
42+
$promise = Stream\all($stream);
43+
44+
$stream->close();
45+
46+
$this->expectPromiseResolveWith(array(), $promise);
47+
}
48+
49+
public function testClosingWritableStreamResolvesWithEmptyBuffer()
50+
{
51+
$stream = new WritableStream();
52+
$promise = Stream\all($stream);
53+
54+
$stream->close();
55+
56+
$this->expectPromiseResolveWith(array(), $promise);
57+
}
58+
59+
public function testEmittingDataOnStreamResolvesWithArrayOfData()
60+
{
61+
$stream = new ReadableStream();
62+
$promise = Stream\all($stream);
63+
64+
$stream->emit('data', array('hello', $stream));
65+
$stream->emit('data', array('world', $stream));
66+
$stream->close();
67+
68+
$this->expectPromiseResolveWith(array('hello', 'world'), $promise);
69+
}
70+
71+
public function testEmittingErrorOnStreamRejects()
72+
{
73+
$stream = new ReadableStream();
74+
$promise = Stream\all($stream);
75+
76+
$stream->emit('error', array(new \RuntimeException('test')));
77+
78+
$this->expectPromiseReject($promise);
79+
}
80+
81+
public function testEmittingErrorAfterEmittingDataOnStreamRejects()
82+
{
83+
$stream = new ReadableStream();
84+
$promise = Stream\all($stream);
85+
86+
$stream->emit('data', array('hello', $stream));
87+
$stream->emit('error', array(new \RuntimeException('test')));
88+
89+
$this->expectPromiseReject($promise);
90+
}
91+
92+
public function testCancelPendingStreamWillReject()
93+
{
94+
$stream = new ReadableStream();
95+
96+
$promise = Stream\all($stream);
97+
98+
$promise->cancel();
99+
100+
$this->expectPromiseReject($promise);
101+
}
102+
}

0 commit comments

Comments
 (0)