Skip to content

Commit a53e9cf

Browse files
committed
Initial import
0 parents  commit a53e9cf

File tree

10 files changed

+629
-0
lines changed

10 files changed

+629
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/vendor
2+
/composer.lock

.travis.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
language: php
2+
3+
php:
4+
- 5.3
5+
- 5.4
6+
- 5.5
7+
- 5.6
8+
- 7
9+
- hhvm
10+
11+
sudo: false
12+
13+
install:
14+
- composer install --no-interaction
15+
16+
script:
17+
- phpunit --coverage-text

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2016 Christian Lück
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is furnished
10+
to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
THE SOFTWARE.

README.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# clue/promise-stream-react [![Build Status](https://travis-ci.org/clue/php-promise-stream-react.svg?branch=master)](https://travis-ci.org/clue/php-promise-stream-react)
2+
3+
The missing link between Promise-land and Stream-land,
4+
built on top of [React PHP](http://reactphp.org/).
5+
6+
**Table of Contents**
7+
8+
* [Usage](#usage)
9+
* [buffer()](#buffer)
10+
* [unwrapReadable()](#unwrapreadable)
11+
* [Install](#install)
12+
* [License](#license)
13+
14+
> Note: This project is in early alpha stage! Feel free to report any issues you encounter.
15+
16+
## Usage
17+
18+
This lightweight library consists only of a few simple functions.
19+
All functions reside under the `Clue\React\Promise\Stream` namespace.
20+
21+
The below examples assume you use an import statement similar to this:
22+
23+
```php
24+
use Clue\React\Promise\Stream;
25+
26+
Stream\buffer(…);
27+
```
28+
29+
Alternatively, you can also refer to them with their fully-qualified name:
30+
31+
```php
32+
\Clue\React\Promise\Stream\buffer(…);
33+
```
34+
35+
### buffer()
36+
37+
The `buffer(ReadableStreamInterface $stream)` function can be used to create
38+
a `Promise` which resolves with the stream data buffer.
39+
40+
```php
41+
$stream = accessSomeJsonStream();
42+
43+
Stream\buffer($stream)->then(function ($contents) {
44+
var_dump(json_decode($contents));
45+
});
46+
```
47+
48+
### unwrapReadable()
49+
50+
The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap
51+
a `Promise` which resolves with a `ReadableStreamInterface`.
52+
53+
This function returns a readable stream instance (implementing `ReadableStreamInterface`)
54+
right away which acts as a proxy for the future promise resolution.
55+
Once the given Promise resolves with a `ReadableStreamInterface`, its data will
56+
be piped to the output stream.
57+
58+
```php
59+
//$promise = someFunctionWhichResolvesWithAStream();
60+
$promise = startDownloadStream($uri);
61+
62+
$stream = Stream\unwrapReadable($promise);
63+
64+
$stream->on('data', function ($data) {
65+
echo $data;
66+
});
67+
68+
$stream->on('end', function () {
69+
echo 'DONE';
70+
});
71+
```
72+
73+
If the given promise is either rejected or fulfilled with anything but an
74+
instance of `ReadableStreamInterface`, then the output stream will emit
75+
an `error` event and close:
76+
77+
```php
78+
$promise = startDownloadStream($invalidUri);
79+
80+
$stream = Stream\unwrapReadable($promise);
81+
82+
$stream->on('error', function (Exception $error) {
83+
echo 'Error: ' . $error->getMessage();
84+
});
85+
```
86+
87+
The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
88+
at the time of invoking this function.
89+
If the given promise is already settled and does not resolve with an
90+
instance of `ReadableStreamInterface`, then you will not be able to receive
91+
the `error` event.
92+
93+
## Install
94+
95+
The recommended way to install this library is [through Composer](https://getcomposer.org).
96+
[New to Composer?](https://getcomposer.org/doc/00-intro.md)
97+
98+
This will install the latest supported version:
99+
100+
```bash
101+
$ composer require clue/promise-stream:dev-master
102+
```
103+
104+
## License
105+
106+
MIT

composer.json

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "clue/promise-stream-react",
3+
"description": "The missing link between Promise-land and Stream-land",
4+
"keywords": ["unwrap", "stream", "buffer", "promise", "ReactPHP", "async"],
5+
"homepage": "https://github.com/clue/php-promise-stream-react",
6+
"license": "MIT",
7+
"authors": [
8+
{
9+
"name": "Christian Lück",
10+
"email": "[email protected]"
11+
}
12+
],
13+
"autoload": {
14+
"files": [ "src/functions.php" ]
15+
},
16+
"require": {
17+
"php": ">=5.3",
18+
"react/stream": "^0.4 || ^0.3",
19+
"react/promise": "^2.1 || ^1.2"
20+
},
21+
"require-dev": {
22+
"react/event-loop": "^0.4 || ^0.3",
23+
"react/promise-timer": "^1.0",
24+
"clue/block-react": "^1.0"
25+
}
26+
}

phpunit.xml.dist

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<phpunit bootstrap="tests/bootstrap.php"
4+
colors="true"
5+
convertErrorsToExceptions="true"
6+
convertNoticesToExceptions="true"
7+
convertWarningsToExceptions="true"
8+
>
9+
<testsuites>
10+
<testsuite>
11+
<directory>./tests/</directory>
12+
</testsuite>
13+
</testsuites>
14+
<filter>
15+
<whitelist>
16+
<directory>./src/</directory>
17+
</whitelist>
18+
</filter>
19+
</phpunit>

src/functions.php

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
namespace Clue\React\Promise\Stream;
4+
5+
use React\Stream\ReadableStream;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Promise;
8+
use React\Promise\PromiseInterface;
9+
use React\Promise\CancellablePromiseInterface;
10+
11+
/**
12+
* Creates a `Promise` which resolves with the stream data buffer
13+
*
14+
* @param ReadableStreamInterface $stream
15+
* @return CancellablePromiseInterface Promise<string, Exception>
16+
*/
17+
function buffer(ReadableStreamInterface $stream)
18+
{
19+
// stream already ended => resolve with empty buffer
20+
if (!$stream->isReadable()) {
21+
return Promise\resolve('');
22+
}
23+
24+
$buffer = '';
25+
$bufferer = function ($data) use (&$buffer) {
26+
$buffer .= $data;
27+
};
28+
$stream->on('data', $bufferer);
29+
30+
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
31+
$stream->on('error', function ($error) use ($reject) {
32+
$reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error));
33+
});
34+
35+
$stream->on('close', function () use ($resolve, &$buffer) {
36+
$resolve($buffer);
37+
});
38+
}, function ($_, $reject) use ($buffer) {
39+
$reject(new \RuntimeException('Cancelled buffering'));
40+
});
41+
42+
return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream) {
43+
// promise rejected => clear buffer and buffering
44+
$buffer = '';
45+
$stream->removeListener('data', $bufferer);
46+
47+
throw $error;
48+
});
49+
}
50+
51+
/**
52+
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
53+
*
54+
* @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception>
55+
* @return ReadableStreamInterface
56+
*/
57+
function unwrapReadable(PromiseInterface $promise)
58+
{
59+
$out = new ReadableStream();
60+
61+
// TODO: support backpressure
62+
63+
// try to cancel promise once the stream closes
64+
if ($promise instanceof CancellablePromiseInterface) {
65+
$out->on('close', function() use ($promise) {
66+
$promise->cancel();
67+
});
68+
}
69+
70+
$promise->then(
71+
function ($stream) {
72+
if (!($stream instanceof ReadableStreamInterface)) {
73+
throw new \InvalidArgumentException('Not a readable stream');
74+
}
75+
return $stream;
76+
}
77+
)->then(
78+
function (ReadableStreamInterface $stream) use ($out) {
79+
if (!$stream->isReadable()) {
80+
$out->close();
81+
return;
82+
}
83+
84+
// stream any writes into output stream
85+
$stream->on('data', function ($data) use ($out) {
86+
$out->emit('data', array($data, $out));
87+
});
88+
89+
// error events cancel output stream
90+
$stream->on('error', function ($error) use ($out) {
91+
$out->emit('error', array($error, $out));
92+
$out->close();
93+
});
94+
95+
// close output stream once body closes
96+
$stream->on('close', function () use ($out) {
97+
$out->close();
98+
});
99+
$stream->on('end', function () use ($out) {
100+
$out->close();
101+
});
102+
},
103+
function ($e) use ($out) {
104+
$out->emit('error', array($e, $out));
105+
$out->close();
106+
}
107+
);
108+
109+
return $out;
110+
}

tests/BufferTest.php

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
use React\Stream\ReadableStream;
4+
use Clue\React\Promise\Stream;
5+
use React\Promise\CancellablePromiseInterface;
6+
7+
class BufferTest extends TestCase
8+
{
9+
public function testClosedStreamResolvesWithEmptyBuffer()
10+
{
11+
$stream = new ReadableStream();
12+
$stream->close();
13+
14+
$promise = Stream\buffer($stream);
15+
16+
$this->expectPromiseResolveWith('', $promise);
17+
}
18+
19+
public function testPendingStreamWillNotResolve()
20+
{
21+
$stream = new ReadableStream();
22+
23+
$promise = Stream\buffer($stream);
24+
25+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
26+
}
27+
28+
public function testClosingStreamResolvesWithEmptyBuffer()
29+
{
30+
$stream = new ReadableStream();
31+
$promise = Stream\buffer($stream);
32+
33+
$stream->close();
34+
35+
$this->expectPromiseResolveWith('', $promise);
36+
}
37+
38+
public function testEmittingDataOnStreamResolvesWithConcatenatedData()
39+
{
40+
$stream = new ReadableStream();
41+
$promise = Stream\buffer($stream);
42+
43+
$stream->emit('data', array('hello', $stream));
44+
$stream->emit('data', array('world', $stream));
45+
$stream->close();
46+
47+
$this->expectPromiseResolveWith('helloworld', $promise);
48+
}
49+
50+
public function testEmittingErrorOnStreamRejects()
51+
{
52+
$stream = new ReadableStream();
53+
$promise = Stream\buffer($stream);
54+
55+
$stream->emit('error', array(new \RuntimeException('test')));
56+
57+
$this->expectPromiseReject($promise);
58+
}
59+
60+
public function testEmittingErrorAfterEmittingDataOnStreamRejects()
61+
{
62+
$stream = new ReadableStream();
63+
$promise = Stream\buffer($stream);
64+
65+
$stream->emit('data', array('hello', $stream));
66+
$stream->emit('error', array(new \RuntimeException('test')));
67+
68+
$this->expectPromiseReject($promise);
69+
}
70+
71+
public function testCancelPendingStreamWillReject()
72+
{
73+
$stream = new ReadableStream();
74+
75+
$promise = Stream\buffer($stream);
76+
77+
$promise->cancel();
78+
79+
$this->expectPromiseReject($promise);
80+
}
81+
}

0 commit comments

Comments
 (0)