Skip to content

Commit 8bdb71e

Browse files
authored
fix: correctly return number of bytes read from chunked streams (#1386)
1 parent 2542cd9 commit 8bdb71e

File tree

10 files changed

+191
-324
lines changed

10 files changed

+191
-324
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "7251f5e7-0e9a-4ea4-b4c7-30dad31f4622",
3+
"type": "bugfix",
4+
"description": "⚠️ **IMPORTANT**: Correctly return number of bytes read from chunked streams",
5+
"issues": [
6+
"awslabs/smithy-kotlin#1285"
7+
]
8+
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedByteReadChannel.kt

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package aws.smithy.kotlin.runtime.auth.awssigning
77

88
import aws.smithy.kotlin.runtime.InternalApi
9-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
9+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
1010
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1111
import aws.smithy.kotlin.runtime.io.SdkBuffer
1212
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
@@ -30,34 +30,17 @@ public class AwsChunkedByteReadChannel(
3030
private var previousSignature: ByteArray,
3131
private val trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3232
) : SdkByteReadChannel by delegate {
33-
34-
private val chunkReader = AwsChunkedReader(
35-
delegate.asStream(),
36-
signer,
37-
signingConfig,
38-
previousSignature,
39-
trailingHeaders,
40-
)
41-
42-
override val isClosedForRead: Boolean
43-
get() = chunkReader.chunk.size == 0L && chunkReader.hasLastChunkBeenSent && delegate.isClosedForRead
44-
45-
override val availableForRead: Int
46-
get() = chunkReader.chunk.size.toInt() + delegate.availableForRead
33+
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
4734

4835
override suspend fun read(sink: SdkBuffer, limit: Long): Long {
4936
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
50-
if (!chunkReader.ensureValidChunk()) return -1L
51-
return chunkReader.chunk.read(sink, limit)
37+
return chunker.readAndMaybeWrite(sink, limit)
5238
}
5339
}
5440

55-
private fun SdkByteReadChannel.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
56-
private val delegate = this@asStream
57-
58-
override fun isClosedForRead(): Boolean =
59-
delegate.isClosedForRead
41+
private fun SdkByteReadChannel.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
42+
val delegate = this@adapt
6043

61-
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
62-
delegate.read(sink, limit)
44+
override val eof: Boolean get() = delegate.isClosedForRead
45+
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
6346
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource.kt

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package aws.smithy.kotlin.runtime.auth.awssigning
66

77
import aws.smithy.kotlin.runtime.InternalApi
8-
import aws.smithy.kotlin.runtime.auth.awssigning.internal.AwsChunkedReader
8+
import aws.smithy.kotlin.runtime.auth.awssigning.internal.StreamChunker
99
import aws.smithy.kotlin.runtime.http.DeferredHeaders
1010
import aws.smithy.kotlin.runtime.io.SdkBuffer
1111
import aws.smithy.kotlin.runtime.io.SdkSource
@@ -32,35 +32,22 @@ public class AwsChunkedSource(
3232
previousSignature: ByteArray,
3333
trailingHeaders: DeferredHeaders = DeferredHeaders.Empty,
3434
) : SdkSource {
35-
private val chunkReader = AwsChunkedReader(
36-
delegate.asStream(),
37-
signer,
38-
signingConfig,
39-
previousSignature,
40-
trailingHeaders,
41-
)
35+
private val chunker = StreamChunker(delegate.adapt(), signer, signingConfig, previousSignature, trailingHeaders)
4236

4337
override fun read(sink: SdkBuffer, limit: Long): Long {
4438
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
4539
// COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface
46-
val isChunkValid = runBlocking {
47-
chunkReader.ensureValidChunk()
48-
}
49-
if (!isChunkValid) return -1L
50-
return chunkReader.chunk.read(sink, limit)
40+
return runBlocking { chunker.readAndMaybeWrite(sink, limit) }
5141
}
5242

5343
override fun close() {
5444
delegate.close()
5545
}
5646
}
5747

58-
private fun SdkSource.asStream(): AwsChunkedReader.Stream = object : AwsChunkedReader.Stream {
59-
private val delegate = this@asStream.buffer()
48+
private fun SdkSource.adapt(): StreamChunker.Adapter = object : StreamChunker.Adapter {
49+
private val delegate = this@adapt.buffer()
6050

61-
override fun isClosedForRead(): Boolean =
62-
delegate.exhausted()
63-
64-
override suspend fun read(sink: SdkBuffer, limit: Long): Long =
65-
delegate.read(sink, limit)
51+
override val eof: Boolean get() = delegate.exhausted()
52+
override suspend fun read(buffer: SdkBuffer, limit: Long): Long = delegate.read(buffer, limit)
6653
}

runtime/auth/aws-signing-common/common/src/aws/smithy/kotlin/runtime/auth/awssigning/internal/AwsChunkedReader.kt

Lines changed: 0 additions & 234 deletions
This file was deleted.

0 commit comments

Comments
 (0)