Skip to content

Commit 4bce902

Browse files
authored
Merge pull request #18 from clue-labs/backpressure
Add backpressure support and support pause()/resume()
2 parents 76ff9c5 + 4f4b34d commit 4bce902

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed

src/TransformStream.php

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,27 @@ class TransformStream extends EventEmitter implements DuplexStreamInterface
1616
private $readable = true;
1717
private $writable = true;
1818
private $closed = false;
19+
private $paused = false;
20+
private $drain = false;
1921

2022
public function write($data)
2123
{
2224
if (!$this->writable || $data === '') {
23-
return false;
25+
return $this->writable;
2426
}
2527

2628
try {
2729
$this->transformData($data);
30+
31+
if ($this->paused) {
32+
$this->drain = true;
33+
return false;
34+
}
35+
36+
return true;
2837
} catch (Exception $e) {
2938
$this->forwardError($e);
39+
return false;
3040
}
3141
}
3242

@@ -73,12 +83,21 @@ public function isWritable()
7383

7484
public function pause()
7585
{
86+
if (!$this->readable) {
87+
return;
88+
}
7689

90+
$this->paused = true;
7791
}
7892

7993
public function resume()
8094
{
95+
$this->paused = false;
8196

97+
if ($this->drain && $this->writable) {
98+
$this->drain = false;
99+
$this->emit('drain');
100+
}
82101
}
83102

84103
public function pipe(WritableStreamInterface $dest, array $options = array())

tests/TransformStreamTest.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php
22

33
use Clue\React\Zlib\TransformStream;
4+
use React\Stream\ThroughStream;
45

56
class TransformStreamTest extends TestCase
67
{
@@ -95,4 +96,101 @@ public function testDoesNotEmitIfAlreadyClosed(TransformStream $stream)
9596
$stream->end();
9697
$stream->close();
9798
}
99+
100+
public function testWriteReturnsTrueNormally()
101+
{
102+
$stream = new TransformStream();
103+
104+
$ret = $stream->write('hello');
105+
$this->assertTrue($ret);
106+
}
107+
108+
public function testWriteEmptyStringReturnsTrueNormally()
109+
{
110+
$stream = new TransformStream();
111+
112+
$ret = $stream->write('');
113+
$this->assertTrue($ret);
114+
}
115+
116+
public function testWriteReturnsFalseWhenClosed()
117+
{
118+
$stream = new TransformStream();
119+
$stream->close();
120+
121+
$ret = $stream->write('hello');
122+
$this->assertFalse($ret);
123+
}
124+
125+
public function testWriteEmptyStringReturnsFalseWhenClosed()
126+
{
127+
$stream = new TransformStream();
128+
$stream->close();
129+
130+
$ret = $stream->write('');
131+
$this->assertFalse($ret);
132+
}
133+
134+
public function testWriteReturnsFalseWhenPaused()
135+
{
136+
$stream = new TransformStream();
137+
$stream->pause();
138+
139+
$ret = $stream->write('hello');
140+
$this->assertFalse($ret);
141+
}
142+
143+
public function testWriteReturnsTrueWhenResumedAgain()
144+
{
145+
$stream = new TransformStream();
146+
$stream->pause();
147+
$stream->resume();
148+
149+
$ret = $stream->write('hello');
150+
$this->assertTrue($ret);
151+
}
152+
153+
public function testResumeEmitsDrainEventWhenPreviousWriteReturnedFalse()
154+
{
155+
$stream = new TransformStream();
156+
$stream->pause();
157+
$stream->write('hello');
158+
159+
$stream->on('drain', $this->expectCallableOnce());
160+
$stream->resume();
161+
}
162+
163+
public function testResumeDoesNotEmitDrainEventWhenNoPreviousWriteReturnedFalse()
164+
{
165+
$stream = new TransformStream();
166+
$stream->pause();
167+
168+
$stream->on('drain', $this->expectCallableNever());
169+
$stream->resume();
170+
}
171+
172+
public function testPauseAndResumeIsNoOpWhenClosed()
173+
{
174+
$stream = new TransformStream();
175+
$stream->close();
176+
177+
$stream->on('drain', $this->expectCallableNever());
178+
$stream->pause();
179+
$stream->resume();
180+
}
181+
182+
public function testSupportsBackPressureInPipeChain()
183+
{
184+
$source = new ThroughStream();
185+
186+
$dest = new ThroughStream();
187+
$dest->pause();
188+
189+
$stream = new TransformStream();
190+
191+
$source->pipe($stream)->pipe($dest);
192+
193+
$ret = $source->write('hello');
194+
$this->assertFalse($ret);
195+
}
98196
}

0 commit comments

Comments
 (0)