Skip to content

Commit 624411b

Browse files
authored
feat(rt): add support for HTTP_REQUEST_EVENT signing (#644)
* feat(rt): add support for HTTP_REQUEST_EVENT signing * fix: chunk signature uses raw signature bytes not hex encoding * add sigv4 model dependency * only add signer to context when signer is expected to exist * simplify error check
1 parent 02739b5 commit 624411b

File tree

7 files changed

+77
-29
lines changed

7 files changed

+77
-29
lines changed

aws-runtime/protocols/aws-event-stream/common/src/aws/sdk/kotlin/runtime/protocol/eventstream/EventStreamSigning.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import aws.smithy.kotlin.runtime.io.bytes
1212
import aws.smithy.kotlin.runtime.time.Clock
1313
import aws.smithy.kotlin.runtime.time.Instant
1414
import aws.smithy.kotlin.runtime.util.InternalApi
15+
import aws.smithy.kotlin.runtime.util.decodeHexBytes
1516
import aws.smithy.kotlin.runtime.util.get
1617
import kotlinx.coroutines.flow.Flow
1718
import kotlinx.coroutines.flow.flow
@@ -30,8 +31,6 @@ public fun Flow<Message>.sign(
3031
): Flow<Message> = flow {
3132
val messages = this@sign
3233

33-
// FIXME Nothing actually populates this context attribute yet. It's possible we'll need some middleware or an
34-
// alternate way of passing signers to this method.
3534
val signer = context.getOrNull(AwsSigningAttributes.Signer) ?: error("No signer was found in context")
3635

3736
// NOTE: We need the signature of the initial HTTP request to seed the event stream signatures
@@ -70,10 +69,12 @@ internal suspend fun AwsSigner.signPayload(
7069

7170
val result = signChunk(messagePayload, prevSignature, config)
7271
val signature = result.signature
72+
// TODO - consider adding a direct Bytes -> Bytes hex decode rather than having to go through string
73+
val binarySignature = signature.decodeToString().decodeHexBytes()
7374

7475
val signedMessage = buildMessage {
7576
addHeader(":date", HeaderValue.Timestamp(dt))
76-
addHeader(":chunk-signature", HeaderValue.ByteArray(signature))
77+
addHeader(":chunk-signature", HeaderValue.ByteArray(binarySignature))
7778
payload = messagePayload
7879
}
7980

@@ -92,7 +93,7 @@ private fun Instant.truncateSubsecs(): Instant = Instant.fromEpochSeconds(epochS
9293
@InternalApi
9394
public fun ExecutionContext.newEventStreamSigningConfig(): AwsSigningConfig = AwsSigningConfig {
9495
algorithm = AwsSigningAlgorithm.SIGV4
95-
signatureType = AwsSignatureType.HTTP_REQUEST_CHUNK
96+
signatureType = AwsSignatureType.HTTP_REQUEST_EVENT
9697
region = this@newEventStreamSigningConfig[AwsSigningAttributes.SigningRegion]
9798
service = this@newEventStreamSigningConfig[AwsSigningAttributes.SigningService]
9899
credentialsProvider = this@newEventStreamSigningConfig[AwsSigningAttributes.CredentialsProvider]

aws-runtime/protocols/aws-event-stream/common/src/aws/sdk/kotlin/runtime/protocol/eventstream/FrameDecoder.kt

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,8 @@ public class EventStreamFramingException(message: String, cause: Throwable? = nu
2323
public suspend fun decodeFrames(chan: SdkByteReadChannel): Flow<Message> = flow {
2424
while (!chan.isClosedForRead) {
2525
// get the prelude to figure out how much is left to read of the message
26-
val preludeBytes = ByteArray(PRELUDE_BYTE_LEN_WITH_CRC)
27-
28-
try {
29-
chan.readFully(preludeBytes)
30-
} catch (ex: Exception) {
31-
throw EventStreamFramingException("failed to read message prelude from channel", ex)
32-
}
26+
// null indicates the channel was closed and that no more messages are coming
27+
val preludeBytes = readPrelude(chan) ?: return@flow
3328

3429
val preludeBuf = SdkByteBuffer.of(preludeBytes).apply { advance(preludeBytes.size.toULong()) }
3530
val prelude = Prelude.decode(preludeBuf)
@@ -52,3 +47,27 @@ public suspend fun decodeFrames(chan: SdkByteReadChannel): Flow<Message> = flow
5247
emit(message)
5348
}
5449
}
50+
51+
/**
52+
* Read the message prelude from the channel.
53+
* @return prelude bytes or null if the channel is closed and no additional prelude is coming
54+
*/
55+
private suspend fun readPrelude(chan: SdkByteReadChannel): ByteArray? {
56+
val dest = ByteArray(PRELUDE_BYTE_LEN_WITH_CRC)
57+
var remaining = dest.size
58+
var offset = 0
59+
while (remaining > 0 && !chan.isClosedForRead) {
60+
val rc = chan.readAvailable(dest, offset, remaining)
61+
if (rc == -1) break
62+
offset += rc
63+
remaining -= rc
64+
}
65+
66+
// 0 bytes read and channel closed indicates no messages remaining -> null
67+
if (remaining == PRELUDE_BYTE_LEN_WITH_CRC && chan.isClosedForRead) return null
68+
69+
// partial read -> failure
70+
if (remaining > 0) throw EventStreamFramingException("failed to read event stream message prelude from channel: read: $offset bytes, expected $remaining more bytes")
71+
72+
return dest
73+
}

aws-runtime/protocols/aws-event-stream/common/test/aws/sdk/kotlin/runtime/protocol/eventstream/EventStreamSigningTest.kt

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,27 @@ package aws.sdk.kotlin.runtime.protocol.eventstream
77

88
import aws.smithy.kotlin.runtime.auth.awscredentials.Credentials
99
import aws.smithy.kotlin.runtime.auth.awscredentials.CredentialsProvider
10-
import aws.smithy.kotlin.runtime.auth.awssigning.AwsSignatureType
11-
import aws.smithy.kotlin.runtime.auth.awssigning.AwsSigningConfig
12-
import aws.smithy.kotlin.runtime.auth.awssigning.DefaultAwsSigner
10+
import aws.smithy.kotlin.runtime.auth.awssigning.*
11+
import aws.smithy.kotlin.runtime.client.ExecutionContext
1312
import aws.smithy.kotlin.runtime.hashing.sha256
1413
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
1514
import aws.smithy.kotlin.runtime.io.bytes
1615
import aws.smithy.kotlin.runtime.time.Instant
1716
import aws.smithy.kotlin.runtime.time.ManualClock
1817
import aws.smithy.kotlin.runtime.util.encodeToHex
1918
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.flowOf
20+
import kotlinx.coroutines.flow.toList
2021
import kotlinx.coroutines.test.runTest
21-
import kotlin.test.Ignore
2222
import kotlin.test.Test
2323
import kotlin.test.assertEquals
2424

2525
@OptIn(ExperimentalCoroutinesApi::class)
2626
class EventStreamSigningTest {
27+
private val testCredentialsProvider = object : CredentialsProvider {
28+
override suspend fun getCredentials() = Credentials("fake access key", "fake secret key")
29+
}
2730

28-
// FIXME - see https://github.com/awslabs/aws-sdk-kotlin/issues/543
29-
@Ignore
3031
@Test
3132
fun testSignPayload() = runTest {
3233
val messageToSign = buildMessage {
@@ -37,12 +38,10 @@ class EventStreamSigningTest {
3738
val epoch = Instant.fromEpochSeconds(123_456_789L, 1234)
3839
val testClock = ManualClock(epoch)
3940
val signingConfig = AwsSigningConfig.Builder().apply {
40-
credentialsProvider = object : CredentialsProvider {
41-
override suspend fun getCredentials() = Credentials("fake access key", "fake secret key")
42-
}
41+
credentialsProvider = testCredentialsProvider
4342
region = "us-east-1"
4443
service = "testservice"
45-
signatureType = AwsSignatureType.HTTP_REQUEST_CHUNK
44+
signatureType = AwsSignatureType.HTTP_REQUEST_EVENT
4645
}
4746

4847
val prevSignature = "last message sts".encodeToByteArray().sha256().encodeToHex().encodeToByteArray()
@@ -58,12 +57,32 @@ class EventStreamSigningTest {
5857
assertEquals(0, dateHeader.nanosecondsOfSecond)
5958

6059
assertEquals(":chunk-signature", result.output.headers[1].name)
61-
val expectedSignature = result.signature.encodeToHex()
60+
// signature is hex encoded string bytes, the header value is the raw bytes
61+
val expectedSignature = result.signature.decodeToString()
6262
val actualSignature = result.output.headers[1].value.expectByteArray().encodeToHex()
6363
assertEquals(expectedSignature, actualSignature)
6464

65-
// FIXME - based on Rust test: https://github.com/awslabs/smithy-rs/blob/v0.38.0/aws/rust-runtime/aws-sigv4/src/event_stream.rs#L166
6665
val expected = "1ea04a4f6becd85ae3e38e379ffaf4bb95042603f209512476cc6416868b31ee"
6766
assertEquals(expected, actualSignature)
6867
}
68+
69+
@Test
70+
fun testEmptyEndFrameSent() = runTest {
71+
val messageToSign = buildMessage {
72+
addHeader("some-header", HeaderValue.String("value"))
73+
payload = "test payload".encodeToByteArray()
74+
}
75+
76+
val context = ExecutionContext()
77+
context[AwsSigningAttributes.Signer] = DefaultAwsSigner
78+
context[AwsSigningAttributes.RequestSignature] = HashSpecification.EmptyBody.hash.encodeToByteArray()
79+
context[AwsSigningAttributes.SigningRegion] = "us-east-2"
80+
context[AwsSigningAttributes.SigningService] = "test"
81+
context[AwsSigningAttributes.CredentialsProvider] = testCredentialsProvider
82+
83+
val config = context.newEventStreamSigningConfig()
84+
val signedEvents = flowOf(messageToSign).sign(context, config).toList()
85+
// 1 message + empty signed frame
86+
assertEquals(2, signedEvents.size)
87+
}
6988
}

aws-runtime/protocols/aws-event-stream/common/test/aws/sdk/kotlin/runtime/protocol/eventstream/FrameDecoderTest.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
package aws.sdk.kotlin.runtime.protocol.eventstream
77

88
import aws.smithy.kotlin.runtime.io.*
9+
import io.kotest.matchers.string.shouldContain
910
import kotlinx.coroutines.ExperimentalCoroutinesApi
1011
import kotlinx.coroutines.flow.*
1112
import kotlinx.coroutines.test.runTest
12-
import kotlin.test.Ignore
1313
import kotlin.test.Test
1414
import kotlin.test.assertEquals
15+
import kotlin.test.assertFailsWith
1516

1617
@OptIn(ExperimentalCoroutinesApi::class)
1718
class FrameDecoderTest {
@@ -52,9 +53,14 @@ class FrameDecoderTest {
5253
assertEquals(expected3, actual[2])
5354
}
5455

55-
@Ignore
5656
@Test
5757
fun testChannelClosed() = runTest {
58-
TODO("not implemented yet: need to add test for channel closed normally while waiting on prelude")
58+
// contents don't matter
59+
val partialPrelude = ByteArray(PRELUDE_BYTE_LEN_WITH_CRC - 4)
60+
val chan = SdkByteReadChannel(partialPrelude)
61+
62+
assertFailsWith<EventStreamFramingException> {
63+
decodeFrames(chan).collect()
64+
}.message.shouldContain("failed to read event stream message prelude from channel: read: 8 bytes, expected 4 more bytes")
5965
}
6066
}

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/core/AwsHttpProtocolClientGenerator.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ open class AwsHttpProtocolClientGenerator(
7676
if (AwsSignatureVersion4.isSupportedAuthentication(ctx.model, ctx.service)) {
7777
val signingServiceName = AwsSignatureVersion4.signingServiceName(ctx.service)
7878
write("ctx.#T(#T.SigningService, #S)", putIfAbsentSym, RuntimeTypes.Auth.Signing.AwsSigningCommon.AwsSigningAttributes, signingServiceName)
79+
write("ctx.#T(#T.Signer, config.signer)", putIfAbsentSym, RuntimeTypes.Auth.Signing.AwsSigningCommon.AwsSigningAttributes)
7980
}
8081
write("ctx.#T(#T.SigningRegion, config.region)", putIfAbsentSym, RuntimeTypes.Auth.Signing.AwsSigningCommon.AwsSigningAttributes)
8182
write("ctx.#T(#T.CredentialsProvider, config.credentialsProvider)", putIfAbsentSym, RuntimeTypes.Auth.Signing.AwsSigningCommon.AwsSigningAttributes)

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/eventstream/EventStreamSerializerGenerator.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ class EventStreamSerializerGenerator(
6363

6464
writer.write("val stream = input.#L ?: return #T.Empty", streamingMember.defaultName(), RuntimeTypes.Http.HttpBody)
6565
writer.write("val signingConfig = context.#T()", AwsRuntimeTypes.AwsEventStream.newEventStreamSigningConfig)
66-
// FIXME - needs to be set on the operation for initial request
67-
// context[AwsSigningAttributes.SignedBodyHeader] = AwsSignedBodyHeader.X_AMZ_CONTENT_SHA256.name
68-
// context[AwsSigningAttributes.HashSpecification] = HashSpecification.StreamingAws4HmacSha256Events
66+
67+
// initial HTTP request should use an empty body hash since the actual body is the event stream
68+
writer.write("context[#T.HashSpecification] = #T.EmptyBody", RuntimeTypes.Auth.Signing.AwsSigningCommon.AwsSigningAttributes, RuntimeTypes.Auth.Signing.AwsSigningCommon.HashSpecification)
6969

7070
val encodeFn = encodeEventStreamMessage(ctx, op, streamShape)
7171
writer.withBlock("val messages = stream", "") {

tests/codegen/event-stream/event-stream-model-template.smithy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ namespace aws.sdk.kotlin.test.eventstream
22

33
use aws.protocols#{protocol-name}
44
use aws.api#service
5+
use aws.auth#sigv4
56

67
@{protocol-name}
8+
@sigv4(name: "event-stream-test")
79
@service(sdkId: "EventStreamTest")
810
service TestService { version: "123", operations: [TestStreamOp] }
911

0 commit comments

Comments
 (0)