Skip to content

Commit 437da5a

Browse files
committed
Refactor to use internal class
1 parent a53e9cf commit 437da5a

File tree

3 files changed

+114
-51
lines changed

3 files changed

+114
-51
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
}
1212
],
1313
"autoload": {
14+
"psr-4": { "Clue\\React\\Promise\\Stream\\" : "src/" },
1415
"files": [ "src/functions.php" ]
1516
},
1617
"require": {

src/UnwrapReadableStream.php

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
3+
namespace Clue\React\Promise\Stream;
4+
5+
use Evenement\EventEmitter;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Promise\PromiseInterface;
8+
use React\Stream\WritableStreamInterface;
9+
use React\Stream\Util;
10+
use React\Promise\CancellablePromiseInterface;
11+
12+
/**
13+
* @internal
14+
* @see unwrapReadable() instead
15+
*/
16+
class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterface
17+
{
18+
private $promise;
19+
private $closed = false;
20+
21+
/**
22+
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
23+
*
24+
* @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception>
25+
* @return ReadableStreamInterface
26+
*/
27+
public function __construct(PromiseInterface $promise)
28+
{
29+
$out = $this;
30+
31+
// TODO: support backpressure
32+
33+
// try to cancel promise once the stream closes
34+
if ($promise instanceof CancellablePromiseInterface) {
35+
$out->on('close', function() use ($promise) {
36+
$promise->cancel();
37+
});
38+
}
39+
40+
$this->promise = $promise->then(
41+
function ($stream) {
42+
if (!($stream instanceof ReadableStreamInterface)) {
43+
throw new \InvalidArgumentException('Not a readable stream');
44+
}
45+
return $stream;
46+
}
47+
)->then(
48+
function (ReadableStreamInterface $stream) use ($out) {
49+
if (!$stream->isReadable()) {
50+
$out->close();
51+
return $stream;
52+
}
53+
54+
// stream any writes into output stream
55+
$stream->on('data', function ($data) use ($out) {
56+
$out->emit('data', array($data, $out));
57+
});
58+
59+
// error events cancel output stream
60+
$stream->on('error', function ($error) use ($out) {
61+
$out->emit('error', array($error, $out));
62+
$out->close();
63+
});
64+
65+
// close output stream once body closes
66+
$stream->on('close', function () use ($out) {
67+
$out->close();
68+
});
69+
$stream->on('end', function () use ($out) {
70+
$out->close();
71+
});
72+
return $stream;
73+
},
74+
function ($e) use ($out) {
75+
$out->emit('error', array($e, $out));
76+
$out->close();
77+
}
78+
);
79+
}
80+
81+
public function isReadable()
82+
{
83+
return !$this->closed;
84+
}
85+
86+
public function pause()
87+
{
88+
}
89+
90+
public function resume()
91+
{
92+
}
93+
94+
public function pipe(WritableStreamInterface $dest, array $options = array())
95+
{
96+
Util::pipe($this, $dest, $options);
97+
98+
return $dest;
99+
}
100+
101+
public function close()
102+
{
103+
if ($this->closed) {
104+
return;
105+
}
106+
107+
$this->closed = true;
108+
109+
$this->emit('end', array($this));
110+
$this->emit('close', array($this));
111+
}
112+
}

src/functions.php

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -56,55 +56,5 @@ function buffer(ReadableStreamInterface $stream)
5656
*/
5757
function unwrapReadable(PromiseInterface $promise)
5858
{
59-
$out = new ReadableStream();
60-
61-
// TODO: support backpressure
62-
63-
// try to cancel promise once the stream closes
64-
if ($promise instanceof CancellablePromiseInterface) {
65-
$out->on('close', function() use ($promise) {
66-
$promise->cancel();
67-
});
68-
}
69-
70-
$promise->then(
71-
function ($stream) {
72-
if (!($stream instanceof ReadableStreamInterface)) {
73-
throw new \InvalidArgumentException('Not a readable stream');
74-
}
75-
return $stream;
76-
}
77-
)->then(
78-
function (ReadableStreamInterface $stream) use ($out) {
79-
if (!$stream->isReadable()) {
80-
$out->close();
81-
return;
82-
}
83-
84-
// stream any writes into output stream
85-
$stream->on('data', function ($data) use ($out) {
86-
$out->emit('data', array($data, $out));
87-
});
88-
89-
// error events cancel output stream
90-
$stream->on('error', function ($error) use ($out) {
91-
$out->emit('error', array($error, $out));
92-
$out->close();
93-
});
94-
95-
// close output stream once body closes
96-
$stream->on('close', function () use ($out) {
97-
$out->close();
98-
});
99-
$stream->on('end', function () use ($out) {
100-
$out->close();
101-
});
102-
},
103-
function ($e) use ($out) {
104-
$out->emit('error', array($e, $out));
105-
$out->close();
106-
}
107-
);
108-
109-
return $out;
59+
return new UnwrapReadableStream($promise);
11060
}

0 commit comments

Comments
 (0)