Skip to content

Commit 5ba0d30

Browse files
authored
Revert "fix: correctly return number of bytes read from chunked streams (#1386)" (#1397)
1 parent ef2e5b2 commit 5ba0d30

File tree

9 files changed

+324
-183
lines changed

9 files changed

+324
-183
lines changed

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package aws.smithy.kotlin.runtime.auth.awssigning
77

88
import aws.smithy.kotlin.runtime.InternalApi
9-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
9+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
1010
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1111
import aws.smithy.kotlin.runtime.io.SdkBuffer
1212
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
@@ -30,17 +30,34 @@ public class AwsChunkedByteReadChannel(
3030
private var previousSignature: ByteArray,
3131
private val trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3232
) : SdkByteReadChannel by delegate {
33-
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
33+
34+
private val chunkReader = AwsChunkedReader(
35+
delegate.asStream(),
36+
signer,
37+
signingConfig,
38+
previousSignature,
39+
trailingHeaders,
40+
)
41+
42+
override val isClosedForRead: Boolean
43+
get() = chunkReader.chunk.size == 0L && chunkReader.hasLastChunkBeenSent && delegate.isClosedForRead
44+
45+
override val availableForRead: Int
46+
get() = chunkReader.chunk.size.toInt() + delegate.availableForRead
3447

3548
override suspend fun read(sink: SdkBuffer, limit: Long): Long {
3649
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
37-
return chunker.readAndMaybeWrite(sink, limit)
50+
if (!chunkReader.ensureValidChunk()) return -1L
51+
return chunkReader.chunk.read(sink, limit)
3852
}
3953
}
4054

41-
private fun SdkByteReadChannel.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
42-
val delegate = this@adapt
55+
private fun SdkByteReadChannel.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
56+
private val delegate = this@asStream
57+
58+
override fun isClosedForRead(): Boolean =
59+
delegate.isClosedForRead
4360

44-
override val eof: Boolean get() = delegate.isClosedForRead
45-
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
61+
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
62+
delegate.read(sink, limit)
4663
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package aws.smithy.kotlin.runtime.auth.awssigning
66

77
import aws.smithy.kotlin.runtime.InternalApi
8-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
8+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
99
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1010
import aws.smithy.kotlin.runtime.io.SdkBuffer
1111
import aws.smithy.kotlin.runtime.io.SdkSource
@@ -32,22 +32,35 @@ public class AwsChunkedSource(
3232
previousSignature: ByteArray,
3333
trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3434
) : SdkSource {
35-
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
35+
private val chunkReader = AwsChunkedReader(
36+
delegate.asStream(),
37+
signer,
38+
signingConfig,
39+
previousSignature,
40+
trailingHeaders,
41+
)
3642

3743
override fun read(sink: SdkBuffer, limit: Long): Long {
3844
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
3945
// COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface
40-
return runBlocking { chunker.readAndMaybeWrite(sink, limit) }
46+
val isChunkValid = runBlocking {
47+
chunkReader.ensureValidChunk()
48+
}
49+
if (!isChunkValid) return -1L
50+
return chunkReader.chunk.read(sink, limit)
4151
}
4252

4353
override fun close() {
4454
delegate.close()
4555
}
4656
}
4757

48-
private fun SdkSource.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
49-
private val delegate = this@adapt.buffer()
58+
private fun SdkSource.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
59+
private val delegate = this@asStream.buffer()
5060

51-
override val eof: Boolean get() = delegate.exhausted()
52-
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
61+
override fun isClosedForRead(): Boolean =
62+
delegate.exhausted()
63+
64+
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
65+
delegate.read(sink, limit)
5366
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package aws.smithy.kotlin.runtime.auth.awssigning.internal
7+
8+
import aws.smithy.kotlin.runtime.auth.awssigning.AwsSignatureType
9+
import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigner
10+
import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigningConfig
11+
import aws.smithy.kotlin.runtime.auth.awssigning.HashSpecification
12+
import aws.smithy.kotlin.runtime.http.DeferredHeaders
13+
import aws.smithy.kotlin.runtime.http.Headers
14+
import aws.smithy.kotlin.runtime.http.toHeaders
15+
import aws.smithy.kotlin.runtime.io.SdkBuffer
16+
17+
/**
18+
* Common implementation of aws-chunked content encoding. Operations on this class can not be invoked concurrently.
19+
* This class wraps a [Stream] which actually provides the raw bytes.
20+
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html">SigV4 Streaming</a>
21+
* @param stream the underlying IO abstraction which will have its data encoded in aws-chunked format
22+
* @param signer the signer to use to sign chunks and (optionally) chunk trailer
23+
* @param signingConfig the config to use for signing
24+
* @param previousSignature the previous signature to use for signing. in most cases, this should be the seed signature
25+
* @param trailingHeaders the optional trailing headers to include in the final chunk
26+
*/
27+
internal class AwsChunkedReader(
28+
private val stream: Stream,
29+
private val signer: AwsSigner,
30+
private val signingConfig: AwsSigningConfig,
31+
private var previousSignature: ByteArray,
32+
private val trailingHeaders: DeferredHeaders,
33+
) {
34+
35+
/**
36+
* Common interface abstracting over [SdkSource] and [SdkByteReadChannel]
37+
*/
38+
internal interface Stream {
39+
fun isClosedForRead(): Boolean
40+
41+
/**
42+
* Read data from the underlying IO source.
43+
* NOTE: Implementations may or may not suspend/block. The suspend coloring of this function
44+
* is to gloss over differences between underlying IO abstractions and share aws-chunked encoding
45+
* internals.
46+
*/
47+
suspend fun read(sink: SdkBuffer, limit: Long): Long
48+
}
49+
50+
/**
51+
* The current chunk to read from
52+
*/
53+
internal val chunk: SdkBuffer = SdkBuffer()
54+
55+
/**
56+
* Flag indicating if the last chunk (empty + trailers) has been sent
57+
*/
58+
internal var hasLastChunkBeenSent: Boolean = false
59+
60+
/**
61+
* Ensures that the internal [chunk] is valid for reading. If it's not valid, try to load the next chunk. Note that
62+
* this function will suspend until the whole chunk has been loaded.
63+
*
64+
* @return true if the [chunk] is valid for reading, false if it's invalid (chunk data is exhausted)
65+
*/
66+
internal suspend fun ensureValidChunk(): Boolean {
67+
// check if the current chunk is still valid
68+
if (chunk.size > 0L) return true
69+
70+
// if not, try to fetch a new chunk
71+
val nextChunk = when {
72+
stream.isClosedForRead() && hasLastChunkBeenSent -> null
73+
else -> {
74+
var next = if (signingConfig.isUnsigned) getUnsignedChunk() else getSignedChunk()
75+
if (next == null) {
76+
check(stream.isClosedForRead()) { "Expected underlying reader to be closed" }
77+
next = getFinalChunk()
78+
hasLastChunkBeenSent = true
79+
}
80+
next
81+
}
82+
}
83+
84+
nextChunk?.writeUtf8("\r\n") // terminating CRLF to signal end of chunk
85+
86+
// transfer all segments to the working chunk
87+
nextChunk?.let { chunk.writeAll(it) }
88+
89+
return chunk.size > 0L
90+
}
91+
92+
/**
93+
* Get the last chunk that will be sent which consists of an empty signed chunk + any
94+
* trailers
95+
*/
96+
private suspend fun getFinalChunk(): SdkBuffer {
97+
// empty chunk
98+
val lastChunk = checkNotNull(if (signingConfig.isUnsigned) getUnsignedChunk(SdkBuffer()) else getSignedChunk(SdkBuffer()))
99+
100+
// + any trailers
101+
if (!trailingHeaders.isEmpty()) {
102+
val trailingHeaderChunk = getTrailingHeadersChunk(trailingHeaders.toHeaders())
103+
lastChunk.writeAll(trailingHeaderChunk)
104+
}
105+
return lastChunk
106+
}
107+
108+
/**
109+
* Read a chunk from the underlying [stream], suspending until a whole chunk has been read OR the channel is exhausted.
110+
* @return an SdkBuffer containing a chunk of data, or null if the channel is exhausted.
111+
*/
112+
private suspend fun Stream.readChunk(): SdkBuffer? {
113+
val sink = SdkBuffer()
114+
115+
// fill up to chunk size bytes
116+
var remaining = CHUNK_SIZE_BYTES.toLong()
117+
while (remaining > 0L) {
118+
val rc = read(sink, remaining)
119+
if (rc == -1L) break
120+
remaining -= rc
121+
}
122+
123+
return when (sink.size) {
124+
0L -> null // delegate closed without reading any data
125+
else -> sink
126+
}
127+
}
128+
129+
/**
130+
* Get a signed aws-chunked encoding of [data].
131+
* If [data] is not set, read the next chunk from [delegate] and add hex-formatted chunk size and chunk signature to the front.
132+
* Note that this function will suspend until the whole chunk has been read OR the channel is exhausted.
133+
* The chunk structure is: `string(IntHexBase(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n`
134+
*
135+
* @param data the data which will be encoded to aws-chunked. if not provided, will default to
136+
* reading up to [CHUNK_SIZE_BYTES] from [delegate].
137+
* @return a buffer containing the chunked data or null if no data is available (channel is closed)
138+
*/
139+
private suspend fun getSignedChunk(data: SdkBuffer? = null): SdkBuffer? {
140+
val bodyBuffer = data ?: stream.readChunk()
141+
142+
// signer takes a ByteArray unfortunately...
143+
val chunkBody = bodyBuffer?.readByteArray() ?: return null
144+
145+
val chunkSignature = signer.signChunk(chunkBody, previousSignature, signingConfig.toChunkSigningConfig()).signature
146+
previousSignature = chunkSignature
147+
148+
val signedChunk = SdkBuffer()
149+
150+
// headers
151+
signedChunk.apply {
152+
writeUtf8(chunkBody.size.toString(16))
153+
writeUtf8(";")
154+
writeUtf8("chunk-signature=")
155+
write(chunkSignature)
156+
writeUtf8("\r\n")
157+
}
158+
159+
// append the body
160+
signedChunk.write(chunkBody)
161+
162+
return signedChunk
163+
}
164+
165+
/**
166+
* Get an unsigned aws-chunked encoding of [data].
167+
* If [data] is not set, read the next chunk from [delegate] and add hex-formatted chunk size to the front.
168+
* Note that this function will suspend until the whole chunk has been read OR the channel is exhausted.
169+
* The unsigned chunk structure is: `string(IntHexBase(chunk-size)) + \r\n + chunk-data + \r\n`
170+
*
171+
* @param data the data which will be encoded to aws-chunked. if not provided, will default to
172+
* reading up to [CHUNK_SIZE_BYTES] from [delegate].
173+
* @return a buffer containing the chunked data or null if no data is available (channel is closed)
174+
*/
175+
private suspend fun getUnsignedChunk(data: SdkBuffer? = null): SdkBuffer? {
176+
val bodyBuffer = data ?: stream.readChunk() ?: return null
177+
178+
val unsignedChunk = SdkBuffer()
179+
180+
// headers
181+
unsignedChunk.apply {
182+
writeUtf8(bodyBuffer.size.toString(16))
183+
writeUtf8("\r\n")
184+
writeAll(bodyBuffer) // append the body
185+
}
186+
187+
return unsignedChunk
188+
}
189+
190+
/**
191+
* Get the trailing headers chunk. The grammar for trailing headers is:
192+
* trailing-header-A:value CRLF
193+
* trailing-header-B:value CRLF
194+
* ...
195+
* x-amz-trailer-signature:signature_value CRLF
196+
*
197+
* @param trailingHeaders a list of [Headers] which will be sent
198+
* @return a [SdkBuffer] containing the trailing headers in aws-chunked encoding, ready to send on the wire
199+
*/
200+
private suspend fun getTrailingHeadersChunk(trailingHeaders: Headers): SdkBuffer {
201+
val trailerSignature = signer.signChunkTrailer(trailingHeaders, previousSignature, signingConfig.toTrailingHeadersSigningConfig()).signature
202+
previousSignature = trailerSignature
203+
204+
val trailerBody = SdkBuffer()
205+
trailerBody.writeTrailers(trailingHeaders)
206+
if (!signingConfig.isUnsigned) {
207+
trailerBody.writeTrailerSignature(trailerSignature.decodeToString())
208+
}
209+
210+
return trailerBody
211+
}
212+
213+
/**
214+
* Make a copy of the signing config, changing the signatureType and hashSpecification configuration values
215+
* to specify chunk signing.
216+
* @return an [AwsSigningConfig] which can be used by a signer to sign chunks
217+
*/
218+
private fun AwsSigningConfig.toChunkSigningConfig(): AwsSigningConfig = this.toBuilder().apply {
219+
signatureType = AwsSignatureType.HTTP_REQUEST_CHUNK // signature is for a chunk
220+
hashSpecification = HashSpecification.CalculateFromPayload // calculate the hash from the chunk payload
221+
}.build()
222+
223+
/**
224+
* Make a copy of the signing config, changing the signatureType and hashSpecification configuration values
225+
* to specify trailing headers signing.
226+
* @return an [AwsSigningConfig] which can be used by a signer to sign trailing headers
227+
*/
228+
private fun AwsSigningConfig.toTrailingHeadersSigningConfig(): AwsSigningConfig = this.toBuilder().apply {
229+
signatureType = AwsSignatureType.HTTP_REQUEST_TRAILING_HEADERS // signature is for trailing headers
230+
hashSpecification = HashSpecification.CalculateFromPayload // calculate the hash from the trailing headers payload
231+
}.build()
232+
233+
private val AwsSigningConfig.isUnsigned: Boolean get() = hashSpecification == HashSpecification.StreamingUnsignedPayloadWithTrailers
234+
}

0 commit comments

Comments
 (0)