Skip to content

Commit 1a5597b

Browse files
committed
Merge pull request #4 from clue-labs/unwrap-writable
Add unwrapWritable() function
2 parents 4c8c8d7 + 68d5bf6 commit 1a5597b

File tree

4 files changed

+521
-0
lines changed

4 files changed

+521
-0
lines changed

README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ built on top of [React PHP](http://reactphp.org/).
1010
* [first()](#first)
1111
* [all()](#all)
1212
* [unwrapReadable()](#unwrapreadable)
13+
* [unwrapWritable()](#unwrapwritable)
1314
* [Install](#install)
1415
* [License](#license)
1516

@@ -151,6 +152,50 @@ If the given promise is already settled and does not resolve with an
151152
instance of `ReadableStreamInterface`, then you will not be able to receive
152153
the `error` event.
153154

155+
### unwrapWritable()
156+
157+
The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap
158+
a `Promise` which resolves with a `WritableStreamInterface`.
159+
160+
This function returns a writable stream instance (implementing `WritableStreamInterface`)
161+
right away which acts as a proxy for the future promise resolution.
162+
Once the given Promise resolves with a `WritableStreamInterface`, any data you
163+
wrote to the proxy will be piped to the inner stream.
164+
165+
```php
166+
//$promise = someFunctionWhichResolvesWithAStream();
167+
$promise = startUploadStream($uri);
168+
169+
$stream = Stream\unwrapWritable($promise);
170+
171+
$stream->write('hello');
172+
$stream->end('world');
173+
174+
$stream->on('close', function () {
175+
echo 'DONE';
176+
});
177+
```
178+
179+
If the given promise is either rejected or fulfilled with anything but an
180+
instance of `WritableStreamInterface`, then the output stream will emit
181+
an `error` event and close:
182+
183+
```php
184+
$promise = startUploadStream($invalidUri);
185+
186+
$stream = Stream\unwrapWritable($promise);
187+
188+
$stream->on('error', function (Exception $error) {
189+
echo 'Error: ' . $error->getMessage();
190+
});
191+
```
192+
193+
The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
194+
at the time of invoking this function.
195+
If the given promise is already settled and does not resolve with an
196+
instance of `WritableStreamInterface`, then you will not be able to receive
197+
the `error` event.
198+
154199
## Install
155200

156201
The recommended way to install this library is [through Composer](https://getcomposer.org).

src/UnwrapWritableStream.php

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
<?php
2+
3+
namespace Clue\React\Promise\Stream;
4+
5+
use Evenement\EventEmitter;
6+
use React\Promise\PromiseInterface;
7+
use React\Stream\WritableStreamInterface;
8+
use React\Stream\Util;
9+
use React\Promise\CancellablePromiseInterface;
10+
use InvalidArgumentException;
11+
12+
/**
13+
* @internal
14+
* @see unwrapWritable() instead
15+
*/
16+
class UnwrapWritableStream extends EventEmitter implements WritableStreamInterface
17+
{
18+
private $promise;
19+
private $stream;
20+
private $buffer = '';
21+
private $closed = false;
22+
private $ending = false;
23+
24+
/**
25+
* Instantiate new unwrapped writable stream for given `Promise` which resolves with a `WritableStreamInterface`.
26+
*
27+
* @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
28+
*/
29+
public function __construct(PromiseInterface $promise)
30+
{
31+
$out = $this;
32+
$store =& $this->stream;
33+
$buffer =& $this->buffer;
34+
$ending =& $this->ending;
35+
$closed =& $this->closed;
36+
37+
$this->promise = $promise->then(
38+
function ($stream) {
39+
if (!($stream instanceof WritableStreamInterface)) {
40+
throw new InvalidArgumentException('Not a writable stream');
41+
}
42+
return $stream;
43+
}
44+
)->then(
45+
function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$ending, &$closed) {
46+
// stream is already closed, make sure to close output stream
47+
if (!$stream->isWritable()) {
48+
$out->close();
49+
return $stream;
50+
}
51+
52+
// resolves but output is already closed, make sure to close stream silently
53+
if ($closed) {
54+
$stream->close();
55+
return $stream;
56+
}
57+
58+
// forward drain events for back pressure
59+
$stream->on('drain', function () use ($out) {
60+
$out->emit('drain', array($out));
61+
});
62+
63+
// error events cancel output stream
64+
$stream->on('error', function ($error) use ($out) {
65+
$out->emit('error', array($error, $out));
66+
$out->close();
67+
});
68+
69+
// close both streams once either side closes
70+
$stream->on('close', array($out, 'close'));
71+
$out->on('close', array($stream, 'close'));
72+
73+
if ($buffer !== '') {
74+
// flush buffer to stream and check if its buffer is not exceeded
75+
$drained = $stream->write($buffer) !== false;
76+
$buffer = '';
77+
78+
if ($drained) {
79+
// signal drain event, because the output stream previous signalled a full buffer
80+
$out->emit('drain', array($out));
81+
}
82+
}
83+
84+
if ($ending) {
85+
$stream->end();
86+
} else {
87+
$store = $stream;
88+
}
89+
90+
return $stream;
91+
},
92+
function ($e) use ($out) {
93+
$out->emit('error', array($e, $out));
94+
$out->close();
95+
}
96+
);
97+
}
98+
99+
public function write($data)
100+
{
101+
if ($this->ending) {
102+
return;
103+
}
104+
105+
// forward to inner stream if possible
106+
if ($this->stream !== null) {
107+
return $this->stream->write($data);
108+
}
109+
110+
// append to buffer and signal the buffer is full
111+
$this->buffer .= $data;
112+
return false;
113+
}
114+
115+
public function end($data = null)
116+
{
117+
if ($this->ending) {
118+
return;
119+
}
120+
121+
$this->ending = true;
122+
123+
// forward to inner stream if possible
124+
if ($this->stream !== null) {
125+
return $this->stream->end($data);
126+
}
127+
128+
// append to buffer
129+
if ($data !== null) {
130+
$this->buffer .= $data;
131+
}
132+
}
133+
134+
public function isWritable()
135+
{
136+
return !$this->ending;
137+
}
138+
139+
public function close()
140+
{
141+
if ($this->closed) {
142+
return;
143+
}
144+
145+
$this->buffer = '';
146+
$this->ending = true;
147+
$this->closed = true;
148+
149+
// try to cancel promise once the stream closes
150+
if ($this->promise instanceof CancellablePromiseInterface) {
151+
$this->promise->cancel();
152+
}
153+
154+
$this->emit('close', array($this));
155+
}
156+
}

src/functions.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,14 @@ function unwrapReadable(PromiseInterface $promise)
147147
{
148148
return new UnwrapReadableStream($promise);
149149
}
150+
151+
/**
152+
* unwrap a `Promise` which resolves with a `WritableStreamInterface`.
153+
*
154+
* @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
155+
* @return WritableStreamInterface
156+
*/
157+
function unwrapWritable(PromiseInterface $promise)
158+
{
159+
return new UnwrapWritableStream($promise);
160+
}

0 commit comments

Comments
 (0)