Skip to content

Commit cac093a

Browse files
committed
Strictly follow stream semantics
1 parent 489e5d9 commit cac093a

File tree

7 files changed

+262
-56
lines changed

7 files changed

+262
-56
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ $stream = new ReadableResourceStream(fopen('archive.tar', 'r'), $loop);
2929

3030
$decoder = new Decoder();
3131

32-
$decoder->on('entry', function ($header, ReadableStreamInterface $file) {
32+
$decoder->on('entry', function (array $header, React\Stream\ReadableStreamInterface $file) {
3333
echo 'File ' . $header['filename'];
3434
echo ' (' . $header['size'] . ' bytes):' . PHP_EOL;
3535

composer.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
],
1313
"require": {
1414
"php": ">=5.3",
15-
"react/stream": "^1 || ^0.7 || ^0.6"
15+
"react/stream": "^1.0 || ^0.7 || ^0.6"
1616
},
1717
"require-dev": {
1818
"clue/hexdump": "~0.2.0",
19-
"react/event-loop": "^1",
20-
"react/promise": "~2.0|~1.0",
21-
"react/promise-stream": "^1",
19+
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3",
2220
"phpunit/phpunit": "^7.0 || ^6.0 || ^5.0 || ^4.8.35"
2321
},
2422
"autoload": {

examples/dump.php

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,35 @@
22

33
use Clue\Hexdump\Hexdump;
44
use Clue\React\Tar\Decoder;
5-
use React\EventLoop\StreamSelectLoop;
5+
use React\EventLoop\Factory;
66
use React\Stream\ReadableResourceStream;
7-
use React\Promise\Stream;
7+
use React\Stream\ReadableStreamInterface;
88

99
require __DIR__ . '/../vendor/autoload.php';
1010

1111
$in = isset($argv[1]) ? $argv[1] : (__DIR__ . '/../tests/fixtures/alice-bob.tar');
1212
echo 'Reading file "' . $in . '" (pass as argument to example)' . PHP_EOL;
1313

14-
// using the default loop does *not* work for file I/O
15-
//$loop = Factory::create();
16-
$loop = new StreamSelectLoop();
17-
14+
$loop = Factory::create();
1815
$stream = new ReadableResourceStream(fopen($in, 'r'), $loop);
1916

2017
$decoder = new Decoder();
21-
$decoder->on('entry', function ($header, $file) {
18+
$decoder->on('entry', function (array $header, ReadableStreamInterface $file) {
2219
static $i = 0;
2320
echo 'FILE #' . ++$i . PHP_EOL;
2421

25-
2622
echo 'Received entry headers:' . PHP_EOL;
2723
var_dump($header);
2824

29-
Stream\buffer($file)->then(function ($contents) {
25+
$contents = '';
26+
$file->on('data', function ($chunk) use (&$contents) {
27+
$contents .= $chunk;
28+
});
29+
$file->on('close', function () use (&$contents) {
3030
echo 'Received entry contents (' . strlen($contents) . ' bytes)' . PHP_EOL;
3131

3232
$d = new Hexdump();
3333
echo $d->dump($contents) . PHP_EOL . PHP_EOL;
34-
}, function ($error) {
35-
echo 'ERROR: ' . $error . PHP_EOL;
3634
});
3735
});
3836
$decoder->on('error', function ($error) {

src/Decoder.php

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Clue\React\Tar;
44

55
use Evenement\EventEmitter;
6-
use Exception;
76
use React\Stream\ThroughStream;
87
use React\Stream\WritableStreamInterface;
98
use RuntimeException;
@@ -15,8 +14,8 @@
1514
* introduced by POSIX IEEE P1003.1. In the future, it should support more of
1615
* the less common alternative formats.
1716
*
18-
* @event entry(array $header, \React\Stream\ReadableStreamInterface $stream, Decoder $thisDecoder)
19-
* @event error(Exception $e, Decoder $thisDecoder)
17+
* @event entry(array $header, \React\Stream\ReadableStreamInterface $stream)
18+
* @event error(Exception $e)
2019
* @event close()
2120
*/
2221
class Decoder extends EventEmitter implements WritableStreamInterface
@@ -37,14 +36,14 @@ public function __construct()
3736

3837
if (PHP_VERSION < 5.5) {
3938
// PHP 5.5 replaced 'a' with 'Z' (read X bytes and removing trailing NULL bytes)
40-
$this->format = str_replace('Z', 'a', $this->format);
39+
$this->format = str_replace('Z', 'a', $this->format); // @codeCoverageIgnore
4140
}
4241
}
4342

4443
public function write($data)
4544
{
4645
if (!$this->writable) {
47-
return;
46+
return false;
4847
}
4948

5049
// incomplete entry => read until end of entry before expecting next header
@@ -53,7 +52,7 @@ public function write($data)
5352

5453
// entry still incomplete => wait for next chunk
5554
if ($this->streaming !== null) {
56-
return;
55+
return true;
5756
}
5857
}
5958

@@ -63,7 +62,7 @@ public function write($data)
6362

6463
// padding still remaining => wait for next chunk
6564
if ($this->padding !== 0) {
66-
return;
65+
return true;
6766
}
6867
}
6968

@@ -80,32 +79,32 @@ public function write($data)
8079
}
8180
try {
8281
$header = $this->readHeader($header);
83-
} catch (Exception $e) {
82+
} catch (RuntimeException $e) {
8483
// clean up before throwing
8584
$this->buffer = '';
8685
$this->writable = false;
8786

88-
$this->emit('error', array($e, $this));
87+
$this->emit('error', array($e));
8988
$this->close();
90-
return;
89+
return false;
9190
}
9291

9392
$this->streaming = new ThroughStream();
9493
$this->remaining = $header['size'];
9594
$this->padding = $header['padding'];
9695

97-
$this->emit('entry', array($header, $this->streaming, $this));
96+
$this->emit('entry', array($header, $this->streaming));
9897

9998
if ($this->remaining === 0) {
100-
$this->streaming->close();
99+
$this->streaming->end();
101100
$this->streaming = null;
102101
} else {
103102
$this->buffer = $this->consumeEntry($this->buffer);
104103
}
105104

106105
// incomplete entry => do not read next header
107106
if ($this->streaming !== null) {
108-
return;
107+
return true;
109108
}
110109

111110
if ($this->padding !== 0) {
@@ -114,9 +113,11 @@ public function write($data)
114113

115114
// incomplete padding => do not read next header
116115
if ($this->padding !== 0) {
117-
return;
116+
return true;
118117
}
119118
}
119+
120+
return true;
120121
}
121122

122123
public function end($data = null)
@@ -125,6 +126,22 @@ public function end($data = null)
125126
$this->write($data);
126127
}
127128

129+
if ($this->streaming !== null) {
130+
// input stream ended but we were still streaming an entry => emit error about incomplete entry
131+
$this->streaming->emit('error', array(new \RuntimeException('TAR input stream ended unexpectedly')));
132+
$this->streaming->close();
133+
$this->streaming = null;
134+
135+
// add some dummy data to also trigger error on decoder stream
136+
$this->buffer = '.';
137+
}
138+
139+
if ($this->buffer !== '') {
140+
// incomplete entry in buffer
141+
$this->emit('error', array(new \RuntimeException('Stream ended with incomplete entry')));
142+
$this->buffer = '';
143+
}
144+
128145
$this->writable = false;
129146
$this->close();
130147
}
@@ -137,25 +154,18 @@ public function close()
137154

138155
$this->closing = true;
139156
$this->writable = false;
157+
$this->buffer = '';
140158

141159
if ($this->streaming !== null) {
142-
// input stream ended but we were still streaming an entry => emit error about incomplete entry
143-
$this->streaming->emit('error', array());
160+
// input stream ended but we were still streaming an entry => forcefully close without error
144161
$this->streaming->close();
145162
$this->streaming = null;
146-
147-
$this->emit('error', array());
148-
}
149-
150-
if ($this->buffer !== '') {
151-
// incomplete entry in buffer
152-
$this->emit('error', array());
153-
$this->buffer = '';
154163
}
155164

156165
// ignore whether we're still expecting NUL-padding
157166

158-
$this->emit('close', array($this));
167+
$this->emit('close');
168+
$this->removeAllListeners();
159169
}
160170

161171
public function isWritable()
@@ -174,11 +184,11 @@ private function consumeEntry($buffer)
174184
$this->remaining -= $len;
175185

176186
// emit chunk of data
177-
$this->streaming->emit('data', array($data, $this->streaming));
187+
$this->streaming->write($data);
178188

179189
// nothing remaining => entry stream finished
180190
if ($this->remaining === 0) {
181-
$this->streaming->close();
191+
$this->streaming->end();
182192
$this->streaming = null;
183193
}
184194

0 commit comments

Comments
 (0)