Skip to content

Commit 6720ea3

Browse files
authored
Merge pull request #12 from clue-labs/throttle
Add backpressure support and support throttling
2 parents ce1926b + 9aced59 commit 6720ea3

File tree

3 files changed

+128
-4
lines changed

3 files changed

+128
-4
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
],
1313
"require": {
1414
"php": ">=5.3",
15-
"react/stream": "^1.0 || ^0.7 || ^0.6"
15+
"react/stream": "^1.0 || ^0.7"
1616
},
1717
"require-dev": {
1818
"clue/hexdump": "~0.2.0",

src/Decoder.php

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Decoder extends EventEmitter implements WritableStreamInterface
2323
private $buffer = '';
2424
private $writable = true;
2525
private $closing = false;
26+
private $paused = false;
2627
private $streaming = null;
2728
private $remaining = 0;
2829
private $padding = 0;
@@ -52,7 +53,7 @@ public function write($data)
5253

5354
// entry still incomplete => wait for next chunk
5455
if ($this->streaming !== null) {
55-
return true;
56+
return !$this->paused;
5657
}
5758
}
5859

@@ -93,6 +94,22 @@ public function write($data)
9394
$this->remaining = $header['size'];
9495
$this->padding = $header['padding'];
9596

97+
// entry stream is not paused by default - unless explicitly paused
98+
// emit "drain" even when entry stream is ready again to support backpressure
99+
$that = $this;
100+
$paused =& $this->paused;
101+
$paused = false;
102+
$this->streaming->on('drain', function () use (&$paused, $that) {
103+
$paused = false;
104+
$that->emit('drain');
105+
});
106+
$this->streaming->on('close', function () use (&$paused, $that) {
107+
if ($paused) {
108+
$paused = false;
109+
$that->emit('drain');
110+
}
111+
});
112+
96113
$this->emit('entry', array($header, $this->streaming));
97114

98115
if ($this->remaining === 0) {
@@ -104,7 +121,7 @@ public function write($data)
104121

105122
// incomplete entry => do not read next header
106123
if ($this->streaming !== null) {
107-
return true;
124+
return !$this->paused;
108125
}
109126

110127
if ($this->padding !== 0) {
@@ -184,14 +201,19 @@ private function consumeEntry($buffer)
184201
$this->remaining -= $len;
185202

186203
// emit chunk of data
187-
$this->streaming->write($data);
204+
$ret = $this->streaming->write($data);
188205

189206
// nothing remaining => entry stream finished
190207
if ($this->remaining === 0) {
191208
$this->streaming->end();
192209
$this->streaming = null;
193210
}
194211

212+
// throttle input when streaming entry is still writable but returns false (backpressure)
213+
if ($ret === false && $this->streaming !== null && $this->streaming->isWritable()) {
214+
$this->paused = true;
215+
}
216+
195217
return $buffer;
196218
}
197219

tests/DecoderTest.php

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,108 @@ public function testWriteDataWhenStreamingEntryWillEmitDataOnEntryWithoutEndWhen
131131
$this->assertTrue($ret);
132132
}
133133

134+
public function testWriteDataWhenStreamingEntryIsClosedAlreadyWillNotEmitDataAndWillNotThrottleWhenMoreDataIsRemaining()
135+
{
136+
$entry = new ThroughStream();
137+
$entry->close();
138+
$entry->on('data', $this->expectCallableNever());
139+
140+
$ref = new \ReflectionProperty($this->decoder, 'streaming');
141+
$ref->setAccessible(true);
142+
$ref->setValue($this->decoder, $entry);
143+
144+
$ref = new \ReflectionProperty($this->decoder, 'remaining');
145+
$ref->setAccessible(true);
146+
$ref->setValue($this->decoder, 100);
147+
148+
$ret = $this->decoder->write('hi');
149+
150+
$this->assertTrue($ret);
151+
}
152+
153+
public function testWriteDataWhenStreamingEntryIsPausedAlreadyWillEmitDataOnEntryAndThrottleWhenMoreDataIsRemaining()
154+
{
155+
$entry = new ThroughStream();
156+
$entry->pause();
157+
$entry->on('data', $this->expectCallableOnceWith('hi'));
158+
$entry->on('end', $this->expectCallableNever());
159+
$entry->on('close', $this->expectCallableNever());
160+
161+
$ref = new \ReflectionProperty($this->decoder, 'streaming');
162+
$ref->setAccessible(true);
163+
$ref->setValue($this->decoder, $entry);
164+
165+
$ref = new \ReflectionProperty($this->decoder, 'remaining');
166+
$ref->setAccessible(true);
167+
$ref->setValue($this->decoder, 100);
168+
169+
$ret = $this->decoder->write('hi');
170+
171+
$this->assertFalse($ret);
172+
}
173+
174+
public function testWriteDataWhenStreamingEntryIsPausedDuringDataWillEmitDataOnEntryAndThrottleWhenMoreDataIsRemaining()
175+
{
176+
$entry = new ThroughStream();
177+
$entry->on('data', function () use ($entry) {
178+
$entry->pause();
179+
});
180+
$entry->on('end', $this->expectCallableNever());
181+
$entry->on('close', $this->expectCallableNever());
182+
183+
$ref = new \ReflectionProperty($this->decoder, 'streaming');
184+
$ref->setAccessible(true);
185+
$ref->setValue($this->decoder, $entry);
186+
187+
$ref = new \ReflectionProperty($this->decoder, 'remaining');
188+
$ref->setAccessible(true);
189+
$ref->setValue($this->decoder, 100);
190+
191+
$ret = $this->decoder->write('hi');
192+
193+
$this->assertFalse($ret);
194+
}
195+
196+
public function testWriteDataWhenStreamingEntryIsPausedDuringDataAndResumeEntryAfterwardsWillEmitDrainEventOnDecoder()
197+
{
198+
$ref = null;
199+
$this->decoder->on('entry', function (array $header, ReadableStreamInterface $stream) use (&$ref) {
200+
$ref = $stream;
201+
$stream->pause();
202+
});
203+
$this->decoder->on('entry', $this->expectCallableOnce());
204+
205+
$data = file_get_contents(__DIR__ . '/fixtures/alice-bob.tar', false, null, 0, 512 + 1);
206+
$ret = $this->decoder->write($data);
207+
208+
$this->assertFalse($ret);
209+
210+
$this->decoder->on('drain', $this->expectCallableOnce());
211+
212+
$this->assertNotNull($ref);
213+
$ref->resume();
214+
}
215+
216+
public function testWriteDataWhenStreamingEntryIsPausedDuringDataAndCloseEntryAfterwardsWillEmitDrainEventOnDecoder()
217+
{
218+
$ref = null;
219+
$this->decoder->on('entry', function (array $header, ReadableStreamInterface $stream) use (&$ref) {
220+
$ref = $stream;
221+
$stream->pause();
222+
});
223+
$this->decoder->on('entry', $this->expectCallableOnce());
224+
225+
$data = file_get_contents(__DIR__ . '/fixtures/alice-bob.tar', false, null, 0, 512 + 1);
226+
$ret = $this->decoder->write($data);
227+
228+
$this->assertFalse($ret);
229+
230+
$this->decoder->on('drain', $this->expectCallableOnce());
231+
232+
$this->assertNotNull($ref);
233+
$ref->close();
234+
}
235+
134236
public function testWriteDataWhenStreamingEntryWillEmitDataOnEntryAndEndAndCloseWhenRemainingDataIsMatched()
135237
{
136238
$entry = new ThroughStream();

0 commit comments

Comments
 (0)