Skip to content

Commit 0197b84

Browse files
committed
Stream response bodies instead of always buffering them completely
1 parent 69ef4c7 commit 0197b84

File tree

4 files changed

+275
-11
lines changed

4 files changed

+275
-11
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/.php_cs
22
/behat.yml
33
/build/
4+
/coverage
45
/composer.lock
56
/phpspec.yml
67
/phpunit.xml

src/Client.php

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
namespace Http\Adapter\Artax;
44

55
use Amp\Artax;
6+
use Amp\CancellationTokenSource;
67
use Amp\Promise;
8+
use Http\Adapter\Artax\Internal\ResponseStream;
79
use Http\Client\Exception\RequestException;
810
use Http\Client\HttpClient;
911
use Http\Discovery\MessageFactoryDiscovery;
@@ -30,6 +32,8 @@ public function __construct(Artax\Client $client = null, ResponseFactory $respon
3032
public function sendRequest(RequestInterface $request)
3133
{
3234
return Promise\wait(call(function () use ($request) {
35+
$cancellationTokenSource = new CancellationTokenSource();
36+
3337
/** @var Artax\Request $req */
3438
$req = new Artax\Request($request->getUri(), $request->getMethod());
3539
$req = $req->withProtocolVersions([$request->getProtocolVersion()]);
@@ -40,25 +44,16 @@ public function sendRequest(RequestInterface $request)
4044
/** @var Artax\Response $resp */
4145
$resp = yield $this->client->request($req, [
4246
Artax\Client::OP_MAX_REDIRECTS => 0,
43-
]);
47+
], $cancellationTokenSource->getToken());
4448
} catch (Artax\HttpException $e) {
4549
throw new RequestException($e->getMessage(), $request, $e);
4650
}
4751

48-
$respBody = $resp->getBody();
49-
$bodyStream = $this->streamFactory->createStream();
50-
51-
while (null !== $chunk = yield $respBody->read()) {
52-
$bodyStream->write($chunk);
53-
}
54-
55-
$bodyStream->rewind();
56-
5752
return $this->responseFactory->createResponse(
5853
$resp->getStatus(),
5954
$resp->getReason(),
6055
$resp->getHeaders(),
61-
$bodyStream,
56+
new ResponseStream($resp->getBody()->getInputStream(), $cancellationTokenSource),
6257
$resp->getProtocolVersion()
6358
);
6459
}));

src/Internal/ResponseStream.php

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
<?php
2+
3+
namespace Http\Adapter\Artax\Internal;
4+
5+
use Amp\Artax;
6+
use Amp\ByteStream\InputStream;
7+
use Amp\ByteStream\IteratorStream;
8+
use Amp\CancellationTokenSource;
9+
use Amp\CancelledException;
10+
use Amp\Emitter;
11+
use Amp\Promise;
12+
use Psr\Http\Message\StreamInterface;
13+
14+
/**
15+
* PSR-7 stream implementation that converts an `Amp\ByteStream\InputStream` into a PSR-7 compatible stream.
16+
*
17+
* @internal
18+
*/
19+
class ResponseStream implements StreamInterface
20+
{
21+
private $buffer = '';
22+
private $position = 0;
23+
private $eof = false;
24+
25+
private $body;
26+
private $cancellationTokenSource;
27+
28+
/**
29+
* @param InputStream $body HTTP response stream to wrap.
30+
* @param CancellationTokenSource $cancellationTokenSource Cancellation source bound to the request to abort it.
31+
*/
32+
public function __construct(InputStream $body, CancellationTokenSource $cancellationTokenSource)
33+
{
34+
$this->body = $body;
35+
$this->cancellationTokenSource = $cancellationTokenSource;
36+
}
37+
38+
public function __toString()
39+
{
40+
try {
41+
return $this->getContents();
42+
} catch (\Throwable $e) {
43+
return '';
44+
}
45+
}
46+
47+
public function __destruct()
48+
{
49+
$this->close();
50+
}
51+
52+
public function close()
53+
{
54+
$this->cancellationTokenSource->cancel();
55+
56+
$emitter = new Emitter();
57+
$emitter->fail(new Artax\HttpException('The stream has been closed'));
58+
$this->body = new IteratorStream($emitter->iterate());
59+
}
60+
61+
public function detach()
62+
{
63+
$this->close();
64+
}
65+
66+
public function getSize()
67+
{
68+
return null;
69+
}
70+
71+
public function tell()
72+
{
73+
return $this->position;
74+
}
75+
76+
public function eof()
77+
{
78+
return $this->eof;
79+
}
80+
81+
public function isSeekable()
82+
{
83+
return false;
84+
}
85+
86+
public function seek($offset, $whence = SEEK_SET)
87+
{
88+
throw new \RuntimeException('Stream is not seekable');
89+
}
90+
91+
public function rewind()
92+
{
93+
$this->seek(0);
94+
}
95+
96+
public function isWritable()
97+
{
98+
return false;
99+
}
100+
101+
public function write($string)
102+
{
103+
throw new \RuntimeException('Stream is not writable');
104+
}
105+
106+
public function isReadable()
107+
{
108+
return true;
109+
}
110+
111+
public function read($length)
112+
{
113+
if ($this->eof) {
114+
return '';
115+
}
116+
117+
if ($this->buffer === '') {
118+
try {
119+
$this->buffer = Promise\wait($this->body->read());
120+
} catch (Artax\HttpException $e) {
121+
throw new \RuntimeException('Reading from the stream failed', 0, $e);
122+
} catch (CancelledException $e) {
123+
throw new \RuntimeException('Reading from the stream failed', 0, $e);
124+
}
125+
126+
if ($this->buffer === null) {
127+
$this->eof = true;
128+
129+
return '';
130+
}
131+
}
132+
133+
$read = \substr($this->buffer, 0, $length);
134+
$this->buffer = (string) \substr($this->buffer, $length);
135+
$this->position += \strlen($read);
136+
137+
return $read;
138+
}
139+
140+
public function getContents()
141+
{
142+
$buffer = '';
143+
144+
while (!$this->eof()) {
145+
$buffer .= $this->read(8192 * 8);
146+
}
147+
148+
return $buffer;
149+
}
150+
151+
public function getMetadata($key = null)
152+
{
153+
return $key === null ? [] : null;
154+
}
155+
}

tests/ResponseStreamTest.php

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?php
2+
3+
namespace Http\Adapter\Artax\Test;
4+
5+
use Amp\ByteStream\InMemoryStream;
6+
use Amp\ByteStream\IteratorStream;
7+
use Amp\CancellationTokenSource;
8+
use Amp\CancelledException;
9+
use Amp\Emitter;
10+
use Http\Adapter\Artax\Internal\ResponseStream;
11+
use PHPUnit\Framework\TestCase;
12+
use function Amp\Iterator\fromIterable;
13+
14+
class ResponseStreamTest extends TestCase
15+
{
16+
public function testNotSeekable()
17+
{
18+
$stream = new ResponseStream(new InMemoryStream(), new CancellationTokenSource());
19+
$this->assertFalse($stream->isSeekable());
20+
21+
$this->expectException(\RuntimeException::class);
22+
$stream->seek(0);
23+
}
24+
25+
public function testNotRewindable()
26+
{
27+
$stream = new ResponseStream(new InMemoryStream(), new CancellationTokenSource());
28+
29+
$this->expectException(\RuntimeException::class);
30+
$stream->rewind();
31+
}
32+
33+
public function testNotWritable()
34+
{
35+
$stream = new ResponseStream(new InMemoryStream(), new CancellationTokenSource());
36+
$this->assertFalse($stream->isWritable());
37+
38+
$this->expectException(\RuntimeException::class);
39+
$stream->write('');
40+
}
41+
42+
public function testReadSlowStream()
43+
{
44+
$inputStream = new IteratorStream(fromIterable(['a', 'b', 'c'], 100));
45+
$stream = new ResponseStream($inputStream, new CancellationTokenSource());
46+
$this->assertTrue($stream->isReadable());
47+
48+
$this->assertSame('abc', (string) $stream);
49+
50+
// As the stream isn't rewindable, we get an empty result here.
51+
$this->assertSame('', (string) $stream);
52+
53+
$this->assertSame(3, $stream->tell());
54+
55+
$this->assertSame('', $stream->read(8192));
56+
}
57+
58+
public function testReadAfterClose()
59+
{
60+
$inputStream = new IteratorStream(fromIterable(['a', 'b', 'c'], 100));
61+
$stream = new ResponseStream($inputStream, new CancellationTokenSource());
62+
63+
$stream->close();
64+
65+
$this->expectException(\RuntimeException::class);
66+
$stream->read(8192);
67+
}
68+
69+
public function testStringCastAfterClose()
70+
{
71+
$inputStream = new IteratorStream(fromIterable(['a', 'b', 'c'], 100));
72+
$stream = new ResponseStream($inputStream, new CancellationTokenSource());
73+
74+
$stream->close();
75+
76+
$this->assertSame('', (string) $stream);
77+
}
78+
79+
public function testReadAfterCancel()
80+
{
81+
$emitter = new Emitter();
82+
$emitter->fail(new CancelledException());
83+
$inputStream = new IteratorStream($emitter->iterate());
84+
$stream = new ResponseStream($inputStream, new CancellationTokenSource());
85+
86+
$this->expectException(\RuntimeException::class);
87+
$stream->read(8192);
88+
}
89+
90+
public function testReadAfterDetach()
91+
{
92+
$inputStream = new IteratorStream(fromIterable(['a', 'b', 'c'], 100));
93+
$stream = new ResponseStream($inputStream, new CancellationTokenSource());
94+
95+
$stream->detach();
96+
97+
$this->expectException(\RuntimeException::class);
98+
$stream->read(8192);
99+
}
100+
101+
public function testMetadata()
102+
{
103+
$stream = new ResponseStream(new InMemoryStream(), new CancellationTokenSource());
104+
$this->assertNull($stream->getMetadata('foobar'));
105+
$this->assertInternalType('array', $stream->getMetadata());
106+
}
107+
108+
public function testSize()
109+
{
110+
$stream = new ResponseStream(new InMemoryStream(), new CancellationTokenSource());
111+
$this->assertNull($stream->getSize());
112+
}
113+
}

0 commit comments

Comments
 (0)