Skip to content

Commit 370e7a1

Browse files
Nek-trowski
andauthored
Support stream response body simultaneously with request body (#380)
Co-authored-by: Aaron Piotrowski <aaron@trowski.com>
1 parent e2ee779 commit 370e7a1

File tree

7 files changed

+152
-54
lines changed

7 files changed

+152
-54
lines changed

src/Connection/Http1Connection.php

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -232,20 +232,51 @@ private function request(Request $request, Cancellation $cancellation, Stream $s
232232
$combinedCancellation = $cancellation;
233233
}
234234

235-
$id = $combinedCancellation->subscribe($this->close(...));
235+
$cancellationId = $combinedCancellation->subscribe($this->close(...));
236236

237-
try {
238-
$this->writeRequest($request, $stream, $protocolVersion, $combinedCancellation);
237+
$responseDeferred = new DeferredFuture();
239238

240-
return $this->readResponse($request, $cancellation, $combinedCancellation, $stream);
241-
} catch (\Throwable $exception) {
242-
$this->socket?->close();
239+
EventLoop::queue(function () use (
240+
$responseDeferred,
241+
$request,
242+
$stream,
243+
$protocolVersion,
244+
$combinedCancellation,
245+
): void {
246+
try {
247+
$this->writeRequest($request, $stream, $protocolVersion, $combinedCancellation);
248+
} catch (\Throwable $exception) {
249+
if (!$responseDeferred->isComplete()) {
250+
$responseDeferred->error($exception);
251+
}
252+
}
253+
});
243254

244-
throw $exception;
245-
} finally {
246-
$combinedCancellation->unsubscribe($id);
247-
$cancellation->throwIfRequested();
248-
}
255+
EventLoop::queue(function () use (
256+
$responseDeferred,
257+
$request,
258+
$stream,
259+
$cancellation,
260+
$combinedCancellation,
261+
$cancellationId,
262+
): void {
263+
try {
264+
$response = $this->readResponse($request, $cancellation, $combinedCancellation, $stream);
265+
if (!$responseDeferred->isComplete()) {
266+
$responseDeferred->complete($response);
267+
}
268+
} catch (\Throwable $exception) {
269+
$this->socket?->close();
270+
271+
if (!$responseDeferred->isComplete()) {
272+
$responseDeferred->error($exception);
273+
}
274+
} finally {
275+
$combinedCancellation->unsubscribe($cancellationId);
276+
}
277+
});
278+
279+
return $responseDeferred->getFuture()->await($cancellation);
249280
}
250281

251282
private function release(): void
@@ -568,7 +599,7 @@ private function writeRequest(
568599
Request $request,
569600
Stream $stream,
570601
string $protocolVersion,
571-
Cancellation $cancellation
602+
Cancellation $cancellation,
572603
): void {
573604
try {
574605
$socket = $this->socket;

src/Connection/Internal/Http2ConnectionProcessor.php

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
438438
$stream->responsePending = false;
439439
EventLoop::queue(static function () use ($response, $stream): void {
440440
try {
441-
$stream->requestBodyCompletion->getFuture()->await();
441+
$stream->requestHeaderCompletion->getFuture()->await();
442442
$stream->preResponseResolution?->await();
443443
$stream->pendingResponse?->complete($response);
444444
} catch (\Throwable $e) {
@@ -967,22 +967,22 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
967967
$streamId = $this->streamId += 2; // Client streams should be odd-numbered, starting at 1.
968968

969969
$this->streams[$streamId] = $http2stream = new Http2Stream(
970-
$streamId,
971-
$request,
972-
$stream,
973-
$cancellation,
974-
$this->createStreamTransferWatcher($streamId, $request->getTransferTimeout()),
975-
$this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()),
976-
self::DEFAULT_WINDOW_SIZE,
977-
$this->initialWindowSize,
970+
id: $streamId,
971+
request: $request,
972+
stream: $stream,
973+
cancellation: $cancellation,
974+
transferWatcher: $this->createStreamTransferWatcher($streamId, $request->getTransferTimeout()),
975+
inactivityWatcher: $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()),
976+
serverWindow: self::DEFAULT_WINDOW_SIZE,
977+
clientWindow: $this->initialWindowSize,
978978
);
979979

980+
$cancellation = $http2stream->cancellation; // Use CompositeCancellation from Http2Stream.
981+
980982
$cancellationId = $cancellation->subscribe(
981983
fn (CancelledException $exception) => $this->releaseStream($streamId, $exception, false),
982984
);
983985

984-
$cancellation = $http2stream->cancellation; // Use CompositeCancellation from Http2Stream.
985-
986986
\assert($http2stream->pendingResponse !== null);
987987
$responseFuture = $http2stream->pendingResponse->getFuture();
988988

@@ -991,7 +991,7 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
991991
->finally(static fn () => $cancellation->unsubscribe($cancellationId))
992992
->ignore();
993993

994-
try {
994+
async(function () use ($request, $stream, $http2stream, $cancellation): void {
995995
events()->requestHeaderStart($request, $stream);
996996

997997
$body = $request->getBody()->getContent();
@@ -1007,64 +1007,64 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
10071007
$firstChunk = \array_shift($split);
10081008
$lastChunk = \array_pop($split);
10091009

1010-
$this->writeFrame(Http2Parser::HEADERS, Http2Parser::NO_FLAG, $streamId, $firstChunk)->ignore();
1010+
$this->writeFrame(Http2Parser::HEADERS, stream: $http2stream->id, data: $firstChunk)->ignore();
10111011

10121012
foreach ($split as $headerChunk) {
1013-
$this->writeFrame(Http2Parser::CONTINUATION, Http2Parser::NO_FLAG, $streamId, $headerChunk)->ignore();
1013+
$this->writeFrame(Http2Parser::CONTINUATION, stream: $http2stream->id, data: $headerChunk)->ignore();
10141014
}
10151015

1016-
$this->writeFrame(Http2Parser::CONTINUATION, $flag, $streamId, $lastChunk)->await();
1016+
$this->writeFrame(Http2Parser::CONTINUATION, $flag, $http2stream->id, $lastChunk)->await();
10171017
} else {
1018-
$this->writeFrame(Http2Parser::HEADERS, $flag, $streamId, $headers)->await();
1018+
$this->writeFrame(Http2Parser::HEADERS, $flag, $http2stream->id, $headers)->await();
10191019
}
10201020

1021+
$http2stream->requestHeaderCompletion->complete();
1022+
10211023
events()->requestHeaderEnd($request, $stream);
10221024

10231025
events()->requestBodyStart($request, $stream);
10241026

10251027
if ($chunk === null) {
10261028
$http2stream->requestBodyCompletion->complete();
10271029
} else {
1028-
$buffer = $chunk;
10291030
$writeFuture = Future::complete();
10301031
do {
1031-
$chunk = $body->read($cancellation);
1032-
1033-
if (!isset($this->streams[$streamId])) {
1034-
// Request stream closed, so this await will throw.
1035-
return $responseFuture->await();
1036-
}
1037-
10381032
// Wait for prior write to complete if we've buffered too much of the request body.
10391033
if (\strlen($http2stream->requestBodyBuffer) >= self::DEFAULT_MAX_FRAME_SIZE) {
10401034
$writeFuture->await($cancellation);
10411035
}
10421036

1043-
if ($chunk === null) {
1044-
// Don't move this out of the loop, this needs to be set before calling writeData
1045-
$http2stream->requestBodyCompletion->complete();
1046-
}
1047-
1048-
$writeFuture = $this->writeData($http2stream, $buffer);
1037+
$writeFuture = $this->writeData($http2stream, $chunk);
10491038
events()->requestBodyProgress($request, $stream);
1050-
$buffer = $chunk;
1051-
} while ($buffer !== null);
1039+
1040+
$chunk = $body->read($cancellation);
1041+
} while ($chunk !== null);
1042+
1043+
$http2stream->requestBodyCompletion->complete();
10521044

10531045
$writeFuture->await($cancellation);
1046+
$this->writeBufferedData($http2stream)->await($cancellation);
10541047
}
10551048

10561049
events()->requestBodyEnd($request, $stream);
1057-
} catch (\Throwable $exception) {
1050+
})->catch(function (\Throwable $exception) use ($http2stream, $cancellation, $cancellationId): void {
10581051
$cancellation->unsubscribe($cancellationId);
10591052

1060-
$exception = $this->wrapException($exception, "Failed to write request (stream {$streamId}) to socket");
1053+
$exception = $this->wrapException(
1054+
$exception,
1055+
"Failed to write request (stream {$http2stream->id}) to socket",
1056+
);
1057+
1058+
if (!$http2stream->requestHeaderCompletion->isComplete()) {
1059+
$http2stream->requestHeaderCompletion->error($exception);
1060+
}
10611061

10621062
if (!$http2stream->requestBodyCompletion->isComplete()) {
10631063
$http2stream->requestBodyCompletion->error($exception);
10641064
}
10651065

1066-
$this->releaseStream($streamId, $exception, false);
1067-
}
1066+
$this->releaseStream($http2stream->id, $exception, false);
1067+
});
10681068

10691069
return $responseFuture->await();
10701070
}
@@ -1229,7 +1229,7 @@ private function applySetting(int $setting, int $value): void
12291229

12301230
private function writeBufferedData(Http2Stream $stream): Future
12311231
{
1232-
if ($stream->requestBodyCompletion->isComplete() && $stream->requestBodyBuffer === '') {
1232+
if ($stream->ended) {
12331233
return Future::complete();
12341234
}
12351235

@@ -1261,6 +1261,8 @@ private function writeBufferedData(Http2Stream $stream): Future
12611261
$stream->id,
12621262
$stream->requestBodyBuffer
12631263
);
1264+
1265+
$stream->ended = true;
12641266
} else {
12651267
$future = $this->writeFrame(
12661268
Http2Parser::DATA,

src/Connection/Internal/Http2Stream.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,12 @@ final class Http2Stream
4444

4545
public int $bufferSize = 0;
4646

47+
public bool $ended = false;
48+
4749
public string $requestBodyBuffer = '';
4850

51+
public readonly DeferredFuture $requestHeaderCompletion;
52+
4953
public readonly DeferredFuture $requestBodyCompletion;
5054

5155
/** @var int Integer between 1 and 256 */
@@ -72,6 +76,7 @@ public function __construct(
7276
public int $clientWindow,
7377
) {
7478
$this->pendingResponse = new DeferredFuture();
79+
$this->requestHeaderCompletion = new DeferredFuture();
7580
$this->requestBodyCompletion = new DeferredFuture();
7681
$this->body = new Queue();
7782

src/Internal/EventInvoker.php

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public function requestBodyStart(Request $request, Stream $stream): void
159159
public function requestBodyProgress(Request $request, Stream $stream): void
160160
{
161161
$previousPhase = self::getPhase($request);
162-
if ($previousPhase !== Phase::RequestBody) {
162+
if (!\in_array($previousPhase, [
163+
Phase::RequestBody,
164+
Phase::ServerProcessing,
165+
Phase::ResponseHeaders,
166+
Phase::ResponseBody,
167+
], true)) {
163168
throw new \Error('Invalid request phase: ' . $previousPhase->name);
164169
}
165170

@@ -169,11 +174,18 @@ public function requestBodyProgress(Request $request, Stream $stream): void
169174
public function requestBodyEnd(Request $request, Stream $stream): void
170175
{
171176
$previousPhase = self::getPhase($request);
172-
if ($previousPhase !== Phase::RequestBody) {
177+
if (!\in_array($previousPhase, [
178+
Phase::RequestBody,
179+
Phase::ServerProcessing,
180+
Phase::ResponseHeaders,
181+
Phase::ResponseBody,
182+
], true)) {
173183
throw new \Error('Invalid request phase transition from ' . $previousPhase->name . ' to ServerProcessing');
174184
}
175185

176-
$this->requestPhase[$request] = Phase::ServerProcessing;
186+
if ($previousPhase === Phase::RequestBody) {
187+
$this->requestPhase[$request] = Phase::ServerProcessing;
188+
}
177189

178190
$this->invoke($request, fn (EventListener $eventListener) => $eventListener->requestBodyEnd($request, $stream));
179191
}

test/Connection/Http1ConnectionTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public function testInactivityTimeout(): void
214214

215215
public function testWritingRequestWithRelativeUriPathFails(): void
216216
{
217-
[$client] = Socket\createSocketPair();
217+
[$server, $client] = Socket\createSocketPair();
218218

219219
$connection = new Http1Connection($client, 0, null, 5);
220220

test/Connection/Http2ConnectionTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Amp\Http\Client\Connection;
44

5+
use Amp\ByteStream\ReadableIterableStream;
56
use Amp\ByteStream\StreamException;
67
use Amp\CancelledException;
78
use Amp\Future;
@@ -11,13 +12,15 @@
1112
use Amp\Http\Client\Request;
1213
use Amp\Http\Client\Response;
1314
use Amp\Http\Client\SocketException;
15+
use Amp\Http\Client\StreamedContent;
1416
use Amp\Http\Client\Trailers;
1517
use Amp\Http\HPack;
1618
use Amp\Http\Http2\Http2Parser;
1719
use Amp\Http\Http2\Http2Processor;
1820
use Amp\Http\HttpStatus;
1921
use Amp\NullCancellation;
2022
use Amp\PHPUnit\AsyncTestCase;
23+
use Amp\Pipeline\Queue;
2124
use Amp\Socket;
2225
use Amp\Socket\ResourceSocket;
2326
use Amp\TimeoutCancellation;
@@ -607,4 +610,48 @@ public function testServerEarlyResponse(): void
607610
$response = $responseFuture->await();
608611
self::assertSame(HttpStatus::PAYLOAD_TOO_LARGE, $response->getStatus());
609612
}
613+
614+
public function testResponseIsFullyAsynchronous(): void
615+
{
616+
$writeStream = new Queue();
617+
$writeContent = StreamedContent::fromStream(new ReadableIterableStream($writeStream->iterate()));
618+
$request = new Request('http://localhost/', 'POST', $writeContent);
619+
620+
$writeStream->pushAsync('something');
621+
622+
events()->requestStart($request);
623+
624+
$stream = $this->connection->getStream($request);
625+
626+
$response = async(fn () => $stream->request($request, new NullCancellation));
627+
628+
EventLoop::queue(function (): void {
629+
delay(0.1);
630+
631+
$this->server->write(self::packFrame($this->hpack->encode([
632+
[":status", (string) HttpStatus::OK],
633+
]), Http2Parser::HEADERS, Http2Parser::END_HEADERS, 1));
634+
635+
delay(0.1);
636+
637+
$this->server->write(self::packFrame('test', Http2Parser::DATA, 0, 1));
638+
639+
delay(0.1);
640+
});
641+
642+
$response = $response->await();
643+
self::assertSame(200, $response->getStatus());
644+
645+
$foo = $response->getBody()->read();
646+
self::assertSame('test', $foo);
647+
648+
$writeStream->pushAsync('Some more content to the request');
649+
650+
$this->server->write(self::packFrame('test2', Http2Parser::DATA, 0, 1));
651+
652+
$foo = $response->getBody()->read();
653+
self::assertSame('test2', $foo);
654+
655+
$writeStream->complete();
656+
}
610657
}

test/Http2IntegrationTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ protected function setUp(): void
2727

2828
$this->httpServer->expose('127.0.0.1:0');
2929
$this->httpServer->start(new ClosureRequestHandler(function (Request $request): Response {
30-
return new Response(200, [], (string) \strlen($request->getBody()->buffer()));
30+
$body = $request->getBody()->buffer();
31+
return new Response(200, [], (string) \strlen($body));
3132
}), new DefaultErrorHandler());
3233

3334
$this->httpClient = (new HttpClientBuilder())->build();

0 commit comments

Comments
 (0)