Skip to content

Commit ce1926b

Browse files
authored
Merge pull request #11 from clue-labs/deps
Support latest ReactPHP stream version and strictly follow stream semantics
2 parents 1eccb58 + cac093a commit ce1926b

File tree

7 files changed

+276
-69
lines changed

7 files changed

+276
-69
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ tar stream into the `Decoder` which emits "entry" events for each individual fil
2525

2626
```php
2727
$loop = React\EventLoop\Factory::create();
28-
$stream = new Stream(fopen('archive.tar', 'r'), $loop);
28+
$stream = new ReadableResourceStream(fopen('archive.tar', 'r'), $loop);
2929

3030
$decoder = new Decoder();
3131

32-
$decoder->on('entry', function ($header, ReadableStreamInterface $file) {
32+
$decoder->on('entry', function (array $header, React\Stream\ReadableStreamInterface $file) {
3333
echo 'File ' . $header['filename'];
3434
echo ' (' . $header['size'] . ' bytes):' . PHP_EOL;
35-
35+
3636
$file->on('data', function ($chunk) {
3737
echo $chunk;
3838
});

composer.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
],
1313
"require": {
1414
"php": ">=5.3",
15-
"react/stream": "~0.4.0|~0.3.0"
15+
"react/stream": "^1.0 || ^0.7 || ^0.6"
1616
},
1717
"require-dev": {
1818
"clue/hexdump": "~0.2.0",
19-
"react/event-loop": "~0.4.0|~0.3.0",
20-
"react/promise": "~2.0|~1.0",
19+
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3",
2120
"phpunit/phpunit": "^7.0 || ^6.0 || ^5.0 || ^4.8.35"
2221
},
2322
"autoload": {

examples/dump.php

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,32 @@
11
<?php
22

3-
use React\Stream\Stream;
4-
use React\EventLoop\Factory;
5-
use Clue\React\Tar\Decoder;
6-
use React\Stream\BufferedSink;
73
use Clue\Hexdump\Hexdump;
8-
use React\EventLoop\StreamSelectLoop;
4+
use Clue\React\Tar\Decoder;
5+
use React\EventLoop\Factory;
6+
use React\Stream\ReadableResourceStream;
7+
use React\Stream\ReadableStreamInterface;
98

109
require __DIR__ . '/../vendor/autoload.php';
1110

1211
$in = isset($argv[1]) ? $argv[1] : (__DIR__ . '/../tests/fixtures/alice-bob.tar');
1312
echo 'Reading file "' . $in . '" (pass as argument to example)' . PHP_EOL;
1413

15-
// using the default loop does *not* work for file I/O
16-
//$loop = Factory::create();
17-
$loop = new StreamSelectLoop();
18-
19-
$stream = new Stream(fopen($in, 'r'), $loop);
14+
$loop = Factory::create();
15+
$stream = new ReadableResourceStream(fopen($in, 'r'), $loop);
2016

2117
$decoder = new Decoder();
22-
$decoder->on('entry', function ($header, $file) {
18+
$decoder->on('entry', function (array $header, ReadableStreamInterface $file) {
2319
static $i = 0;
2420
echo 'FILE #' . ++$i . PHP_EOL;
2521

26-
2722
echo 'Received entry headers:' . PHP_EOL;
2823
var_dump($header);
2924

30-
BufferedSink::createPromise($file)->then(function ($contents) {
25+
$contents = '';
26+
$file->on('data', function ($chunk) use (&$contents) {
27+
$contents .= $chunk;
28+
});
29+
$file->on('close', function () use (&$contents) {
3130
echo 'Received entry contents (' . strlen($contents) . ' bytes)' . PHP_EOL;
3231

3332
$d = new Hexdump();

src/Decoder.php

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
namespace Clue\React\Tar;
44

5-
use React\Stream\WritableStream;
6-
use React\Stream\ReadableStream;
5+
use Evenement\EventEmitter;
6+
use React\Stream\ThroughStream;
7+
use React\Stream\WritableStreamInterface;
78
use RuntimeException;
8-
use Exception;
99

1010
/**
1111
* Decodes a TAR stream and emits "entry" events for each individual file in the archive.
@@ -14,11 +14,11 @@
1414
* introduced by POSIX IEEE P1003.1. In the future, it should support more of
1515
* the less common alternative formats.
1616
*
17-
* @event entry(array $header, ReadableStream $stream, Decoder $thisDecoder)
18-
* @event error(Exception $e, Decoder $thisDecoder)
17+
* @event entry(array $header, \React\Stream\ReadableStreamInterface $stream)
18+
* @event error(Exception $e)
1919
* @event close()
2020
*/
21-
class Decoder extends WritableStream
21+
class Decoder extends EventEmitter implements WritableStreamInterface
2222
{
2323
private $buffer = '';
2424
private $writable = true;
@@ -36,14 +36,14 @@ public function __construct()
3636

3737
if (PHP_VERSION < 5.5) {
3838
// PHP 5.5 replaced 'a' with 'Z' (read X bytes and removing trailing NULL bytes)
39-
$this->format = str_replace('Z', 'a', $this->format);
39+
$this->format = str_replace('Z', 'a', $this->format); // @codeCoverageIgnore
4040
}
4141
}
4242

4343
public function write($data)
4444
{
4545
if (!$this->writable) {
46-
return;
46+
return false;
4747
}
4848

4949
// incomplete entry => read until end of entry before expecting next header
@@ -52,7 +52,7 @@ public function write($data)
5252

5353
// entry still incomplete => wait for next chunk
5454
if ($this->streaming !== null) {
55-
return;
55+
return true;
5656
}
5757
}
5858

@@ -62,7 +62,7 @@ public function write($data)
6262

6363
// padding still remaining => wait for next chunk
6464
if ($this->padding !== 0) {
65-
return;
65+
return true;
6666
}
6767
}
6868

@@ -79,32 +79,32 @@ public function write($data)
7979
}
8080
try {
8181
$header = $this->readHeader($header);
82-
} catch (Exception $e) {
82+
} catch (RuntimeException $e) {
8383
// clean up before throwing
8484
$this->buffer = '';
8585
$this->writable = false;
8686

87-
$this->emit('error', array($e, $this));
87+
$this->emit('error', array($e));
8888
$this->close();
89-
return;
89+
return false;
9090
}
9191

92-
$this->streaming = new ReadableStream();
92+
$this->streaming = new ThroughStream();
9393
$this->remaining = $header['size'];
9494
$this->padding = $header['padding'];
9595

96-
$this->emit('entry', array($header, $this->streaming, $this));
96+
$this->emit('entry', array($header, $this->streaming));
9797

9898
if ($this->remaining === 0) {
99-
$this->streaming->close();
99+
$this->streaming->end();
100100
$this->streaming = null;
101101
} else {
102102
$this->buffer = $this->consumeEntry($this->buffer);
103103
}
104104

105105
// incomplete entry => do not read next header
106106
if ($this->streaming !== null) {
107-
return;
107+
return true;
108108
}
109109

110110
if ($this->padding !== 0) {
@@ -113,9 +113,11 @@ public function write($data)
113113

114114
// incomplete padding => do not read next header
115115
if ($this->padding !== 0) {
116-
return;
116+
return true;
117117
}
118118
}
119+
120+
return true;
119121
}
120122

121123
public function end($data = null)
@@ -124,6 +126,22 @@ public function end($data = null)
124126
$this->write($data);
125127
}
126128

129+
if ($this->streaming !== null) {
130+
// input stream ended but we were still streaming an entry => emit error about incomplete entry
131+
$this->streaming->emit('error', array(new \RuntimeException('TAR input stream ended unexpectedly')));
132+
$this->streaming->close();
133+
$this->streaming = null;
134+
135+
// add some dummy data to also trigger error on decoder stream
136+
$this->buffer = '.';
137+
}
138+
139+
if ($this->buffer !== '') {
140+
// incomplete entry in buffer
141+
$this->emit('error', array(new \RuntimeException('Stream ended with incomplete entry')));
142+
$this->buffer = '';
143+
}
144+
127145
$this->writable = false;
128146
$this->close();
129147
}
@@ -136,25 +154,18 @@ public function close()
136154

137155
$this->closing = true;
138156
$this->writable = false;
157+
$this->buffer = '';
139158

140159
if ($this->streaming !== null) {
141-
// input stream ended but we were still streaming an entry => emit error about incomplete entry
142-
$this->streaming->emit('error', array());
160+
// input stream ended but we were still streaming an entry => forcefully close without error
143161
$this->streaming->close();
144162
$this->streaming = null;
145-
146-
$this->emit('error', array());
147-
}
148-
149-
if ($this->buffer !== '') {
150-
// incomplete entry in buffer
151-
$this->emit('error', array());
152-
$this->buffer = '';
153163
}
154164

155165
// ignore whether we're still expecting NUL-padding
156166

157-
$this->emit('close', array($this));
167+
$this->emit('close');
168+
$this->removeAllListeners();
158169
}
159170

160171
public function isWritable()
@@ -173,11 +184,11 @@ private function consumeEntry($buffer)
173184
$this->remaining -= $len;
174185

175186
// emit chunk of data
176-
$this->streaming->emit('data', array($data, $this->streaming));
187+
$this->streaming->write($data);
177188

178189
// nothing remaining => entry stream finished
179190
if ($this->remaining === 0) {
180-
$this->streaming->close();
191+
$this->streaming->end();
181192
$this->streaming = null;
182193
}
183194

0 commit comments

Comments
 (0)