Skip to content

Commit 18e5f71

Browse files
jderusseNyholm
andauthored
Stream (#226)
* Add a Stream to handle requests * Update src/Core/CHANGELOG.md Co-Authored-By: Tobias Nyholm <[email protected]> Co-authored-by: Tobias Nyholm <[email protected]>
1 parent c044a2e commit 18e5f71

16 files changed

+804
-138
lines changed

AbstractApi.php

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use AsyncAws\Core\Signer\Request;
1515
use AsyncAws\Core\Signer\Signer;
1616
use AsyncAws\Core\Signer\SignerV4;
17+
use AsyncAws\Core\Stream\StreamFactory;
1718
use Psr\Log\LoggerInterface;
1819
use Psr\Log\NullLogger;
1920
use Symfony\Component\HttpClient\HttpClient;
@@ -74,8 +75,8 @@ public function __construct($configuration = [], ?CredentialProvider $credential
7475
}
7576

7677
/**
77-
* @param string[]|string[][] $headers headers names provided as keys or as part of values
78-
* @param array|string|resource|callable $body
78+
* @param string[]|string[][] $headers headers names provided as keys or as part of values
79+
* @param string|resource|callable|iterable|null $body
7980
*/
8081
final public function request(string $method, $body = '', $headers = [], ?string $endpoint = null): Result
8182
{
@@ -91,18 +92,24 @@ abstract protected function getSignatureVersion(): string;
9192
abstract protected function getSignatureScopeName(): string;
9293

9394
/**
94-
* @param string[]|string[][] $headers headers names provided as keys or as part of values
95-
* @param string|resource|callable $body
95+
* @param string[]|string[][] $headers headers names provided as keys or as part of values
96+
* @param string|resource|callable|iterable|null $body
9697
*/
9798
final protected function getResponse(string $method, $body, $headers = [], ?string $endpoint = null): ResponseInterface
9899
{
99100
if (!isset($headers['content-type'])) {
100101
$headers['content-type'] = 'text/plain';
101102
}
102103

103-
$request = new Request($method, $this->fillEndpoint($endpoint), $headers, $body);
104+
$stream = StreamFactory::create($body);
105+
106+
$request = new Request($method, $this->fillEndpoint($endpoint), $headers, $stream);
104107
$this->getSigner()->sign($request, $this->credentialProvider->getCredentials($this->configuration));
105108

109+
if (!$request->hasHeader('content-length') && null !== $length = $request->getBody()->length()) {
110+
$request->setHeader('content-length', $length);
111+
}
112+
106113
return $this->httpClient->request($request->getMethod(), $request->getUrl(), ['headers' => $request->getHeaders(), 'body' => $request->getBody()]);
107114
}
108115

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
- Methods `AwsClient::cloudFormation()`, `AwsClient::lambda()`, `AwsClient::sns()`
99
- Protected methods `Result::registerPrefetch()` and `Result::unregisterPrefetch()`
1010
- Timeout parameter to `InstanceProvider::__construct()`
11+
- Requests can now be streamed
12+
- Streamable request accepts iterable alongside string, callable, resource
1113

1214
### Changed
1315

1416
- Removed `AwsClient` and replaced it with `AwsClientFactory`
15-
- Class `AsuncAws\Core\Signer\Request` is marked as internal
16-
- Make sure behavior of calling `Result::resolve()` is consistent.
17+
- Class `AsyncAws\Core\Signer\Request` is marked as internal
18+
- Make sure behavior of calling `Result::resolve()` is consistent
1719
- Rename namespace `Signers` into `Signer`.
1820

1921
## 0.1.0

Signer/Request.php

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace AsyncAws\Core\Signer;
44

5+
use AsyncAws\Core\Stream\Stream;
6+
57
/**
68
* Dummy object to store a Request.
79
*
@@ -20,10 +22,9 @@ final class Request
2022
private $body;
2123

2224
/**
23-
* @param string[]|string[][] $headers
24-
* @param string|resource|callable $body
25+
* @param string[]|string[][] $headers
2526
*/
26-
public function __construct(string $method, string $url, array $headers, $body)
27+
public function __construct(string $method, string $url, array $headers, Stream $body)
2728
{
2829
$this->method = $method;
2930
$this->url = $url;
@@ -66,18 +67,12 @@ public function getHeader(string $name)
6667
return $this->headers[strtolower($name)] ?? null;
6768
}
6869

69-
/**
70-
* @return string|resource|callable
71-
*/
72-
public function getBody()
70+
public function getBody(): Stream
7371
{
7472
return $this->body;
7573
}
7674

77-
/**
78-
* @param string|resource|callable $body
79-
*/
80-
public function setBody($body)
75+
public function setBody(Stream $body)
8176
{
8277
$this->body = $body;
8378
}

Signer/SignerV4.php

Lines changed: 55 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
use AsyncAws\Core\Credentials\Credentials;
66
use AsyncAws\Core\Exception\InvalidArgument;
7-
use AsyncAws\Core\Exception\RuntimeException;
7+
use AsyncAws\Core\Stream\FixedSizeStream;
8+
use AsyncAws\Core\Stream\IterableStream;
9+
use AsyncAws\Core\Stream\Stream;
10+
use AsyncAws\Core\Stream\StringStream;
811

912
/**
1013
* Version4 of signer.
@@ -54,14 +57,7 @@ public function __construct(string $scopeName, string $region)
5457

5558
public function sign(Request $request, ?Credentials $credentials): void
5659
{
57-
$body = $request->getBody() ?? '';
58-
if (\is_resource($body) && -1 === fseek($body, 0)) {
59-
throw new RuntimeException('Unable to seek the resource');
60-
}
61-
6260
if (null === $credentials) {
63-
$request->setHeader('content-length', $this->getContentLength($request));
64-
6561
return;
6662
}
6763

@@ -86,30 +82,22 @@ public function sign(Request $request, ?Credentials $credentials): void
8682

8783
$canonicalHeaders = $this->getCanonicalizedHeaders($request);
8884

89-
$canonicalRequest = implode(
90-
"\n",
91-
[
92-
$request->getMethod(),
93-
$this->getCanonicalizedPath($parsedUrl),
94-
$this->getCanonicalizedQuery($parsedUrl),
95-
\implode("\n", array_values($canonicalHeaders)),
96-
'', // empty line after headers
97-
implode(';', \array_keys($canonicalHeaders)),
98-
$request->getHeader('x-amz-content-sha256'),
99-
]
100-
);
101-
102-
$stringToSign = implode(
103-
"\n",
104-
[
105-
self::ALGORITHM_REQUEST,
106-
$amzDate,
107-
implode('/', $credentialScope),
108-
hash('sha256', $canonicalRequest),
109-
]
110-
);
85+
$canonicalRequest = implode("\n", [
86+
$request->getMethod(),
87+
$this->getCanonicalizedPath($parsedUrl),
88+
$this->getCanonicalizedQuery($parsedUrl),
89+
\implode("\n", array_values($canonicalHeaders)),
90+
'', // empty line after headers
91+
implode(';', \array_keys($canonicalHeaders)),
92+
$request->getHeader('x-amz-content-sha256'),
93+
]);
11194

112-
$signature = hash_hmac('sha256', $stringToSign, $signingKey);
95+
$signature = hash_hmac('sha256', implode("\n", [
96+
self::ALGORITHM_REQUEST,
97+
$amzDate,
98+
implode('/', $credentialScope),
99+
hash('sha256', $canonicalRequest),
100+
]), $signingKey);
113101

114102
$authorizationHeader = sprintf(
115103
'%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
@@ -123,122 +111,69 @@ public function sign(Request $request, ?Credentials $credentials): void
123111
$request->setHeader('authorization', $authorizationHeader);
124112
}
125113

126-
private function getContentLength(Request $request): ?int
114+
private function prepareBody(Request $request, string $amzDate, string $credentialScope, string &$signature, string $signingKey): void
127115
{
128-
if ($request->hasHeader('content-length')) {
129-
return (int) ((array) $request->getHeader('content-length'))[0];
130-
}
131-
132116
$body = $request->getBody();
133-
if (\is_string($body)) {
134-
return \strlen($body);
135-
}
136117

137-
if (\is_resource($body)) {
138-
return fstat($body)['size'] ?? null;
118+
if ($request->hasHeader('content-length')) {
119+
$contentLength = (int) ((array) $request->getHeader('content-length'))[0];
120+
} else {
121+
$contentLength = $body->length();
139122
}
140123

141-
return null;
142-
}
143-
144-
private function prepareBody(Request $request, string $amzDate, string $credentialScope, string &$signature, string $signingKey): void
145-
{
146-
$body = $request->getBody();
147-
148-
// we can't manage signature of undefined length closure
149-
$contentLength = $this->getContentLength($request);
124+
// we can't manage signature of undefined length. Let's convert it to string
150125
if (null === $contentLength) {
151-
if (\is_callable($body)) {
152-
$buffer = '';
153-
while (true) {
154-
if (!\is_string($data = $body(self::CHUNK_SIZE))) {
155-
throw new InvalidArgument(sprintf('The return value of the "body" option callback must be a string, %s returned.', \gettype($data)));
156-
}
157-
158-
if ('' == $data) {
159-
break;
160-
}
161-
162-
$buffer .= $data;
163-
}
164-
165-
$request->setBody($body = $buffer);
166-
} elseif (\is_resource($body)) {
167-
$request->setBody($body = \stream_get_contents($body));
168-
}
126+
$request->setBody($body = StringStream::create($body));
127+
$contentLength = $body->length();
169128
}
170129

171-
if (\is_string($body)) {
172-
$request->setHeader('x-amz-content-sha256', hash('sha256', $body));
130+
// no need to stream small body
131+
if ($contentLength < self::CHUNK_SIZE) {
132+
$request->setBody($body = StringStream::create($body));
133+
$request->setHeader('x-amz-content-sha256', hash('sha256', $body->stringify()));
173134
$request->setHeader('content-length', $contentLength);
174135

175136
return;
176137
}
177138

178-
$streamReader = null;
179-
if (\is_callable($body)) {
180-
$eof = false;
181-
$buffer = '';
182-
$streamReader = static function () use ($body, &$buffer, &$eof): string {
183-
while (!$eof && \strlen($buffer) < self::CHUNK_SIZE) {
184-
if (!\is_string($data = $body(self::CHUNK_SIZE))) {
185-
throw new InvalidArgument(sprintf('The return value of the "body" option callback must be a string, %s returned.', \gettype($data)));
186-
}
187-
188-
$buffer .= $data;
189-
$eof = '' === $data;
190-
}
191-
192-
$data = substr($buffer, 0, self::CHUNK_SIZE);
193-
$buffer = substr($buffer, self::CHUNK_SIZE);
194-
195-
return $data;
196-
};
197-
} elseif (\is_resource($body)) {
198-
$streamReader = static function () use ($body): string {
199-
return \fread($body, self::CHUNK_SIZE);
200-
};
201-
}
202-
if (null === $contentLength) {
203-
throw new RuntimeException('Unable to get resource size');
204-
}
205-
if (null === $streamReader) {
206-
throw new InvalidArgument(\sprintf('Unexpected body "%s".', \is_object($body) ? \get_class($body) : \gettype($body)));
207-
}
208-
209139
$request->setHeader('content-encoding', 'aws-chunked');
210140
$request->setHeader('x-amz-decoded-content-length', $contentLength);
211141
$request->setHeader('x-amz-content-sha256', 'STREAMING-' . self::ALGORITHM_CHUNK);
212142

143+
// Compute size of content + metadata used sign each Chunk
213144
$chunkCount = (int) ceil($contentLength / self::CHUNK_SIZE);
214145
$fullChunkCount = $chunkCount * self::CHUNK_SIZE === $contentLength ? $chunkCount : ($chunkCount - 1);
215-
216146
$metaLength = \strlen(";chunk-signature=\r\n\r\n") + 64;
217-
$contentLength = $contentLength + $fullChunkCount * ($metaLength + \strlen((string) dechex(self::CHUNK_SIZE))) + ($chunkCount - $fullChunkCount) * ($metaLength + \strlen((string) dechex($contentLength % self::CHUNK_SIZE))) + $metaLength + 1;
218-
$request->setHeader('content-length', $contentLength);
219-
220-
$last = false;
221-
$streamBody = static function () use ($streamReader, $amzDate, $credentialScope, &$signature, &$last, $signingKey): string {
222-
if ($last) {
223-
return '';
224-
}
225-
226-
if ('' === $data = $streamReader()) {
227-
$last = true;
147+
$request->setHeader('content-length', $contentLength + $fullChunkCount * ($metaLength + \strlen((string) dechex(self::CHUNK_SIZE))) + ($chunkCount - $fullChunkCount) * ($metaLength + \strlen((string) dechex($contentLength % self::CHUNK_SIZE))) + $metaLength + 1);
148+
149+
$body = IterableStream::create((static function (Stream $body) use ($amzDate, $credentialScope, $signingKey, &$signature): iterable {
150+
$emptyHash = hash('sha256', '');
151+
foreach (FixedSizeStream::create($body, self::CHUNK_SIZE) as $chunk) {
152+
$signature = hash_hmac('sha256', implode("\n", [
153+
self::ALGORITHM_CHUNK,
154+
$amzDate,
155+
$credentialScope,
156+
$signature,
157+
$emptyHash,
158+
hash('sha256', $chunk),
159+
]), $signingKey);
160+
161+
yield sprintf("%s;chunk-signature=%s\r\n", dechex(\strlen($chunk)), $signature) . "$chunk\r\n";
228162
}
229163

230-
$stringToSign = implode("\n", [
164+
$signature = hash_hmac('sha256', implode("\n", [
231165
self::ALGORITHM_CHUNK,
232166
$amzDate,
233167
$credentialScope,
234168
$signature,
235-
hash('sha256', ''),
236-
hash('sha256', $data),
237-
]);
169+
$emptyHash,
170+
$emptyHash,
171+
]), $signingKey);
172+
173+
yield sprintf("%s;chunk-signature=%s\r\n\r\n", dechex(0), $signature);
174+
})($body));
238175

239-
return sprintf('%s;chunk-signature=%s' . "\r\n", dechex(\strlen($data)), $signature = hash_hmac('sha256', $stringToSign, $signingKey)) . $data . "\r\n";
240-
};
241-
$request->setBody($streamBody);
176+
$request->setBody($body);
242177
}
243178

244179
private function getCanonicalizedQuery(array $parseUrl): string

0 commit comments

Comments
 (0)