Skip to content

Commit cf91733

Browse files
authored
feat(rt): event stream framing support (#320)
1 parent 8ca710b commit cf91733

File tree

15 files changed

+877
-30
lines changed

15 files changed

+877
-30
lines changed

aws-runtime/crt-util/common/src/aws/sdk/kotlin/runtime/crt/ReadChannelBodyStream.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ package aws.sdk.kotlin.runtime.crt
88
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
99
import aws.sdk.kotlin.crt.io.MutableBuffer
1010
import aws.sdk.kotlin.runtime.InternalSdkApi
11-
import aws.smithy.kotlin.runtime.io.SdkBuffer
11+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
1212
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
1313
import aws.smithy.kotlin.runtime.io.readAvailable
1414
import kotlinx.atomicfu.atomic
@@ -19,7 +19,7 @@ import kotlin.coroutines.CoroutineContext
1919
/**
2020
* write as much of [outgoing] to [dest] as possible
2121
*/
22-
internal expect fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer)
22+
internal expect fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer)
2323

2424
/**
2525
* Implement's [HttpRequestBodyStream] which proxies an SDK request body channel [SdkByteReadChannel]
@@ -34,8 +34,8 @@ public class ReadChannelBodyStream(
3434
private val producerJob = Job(callContext.job)
3535
override val coroutineContext: CoroutineContext = callContext + producerJob
3636

37-
private val currBuffer = atomic<SdkBuffer?>(null)
38-
private val bufferChan = Channel<SdkBuffer>(Channel.UNLIMITED)
37+
private val currBuffer = atomic<SdkByteBuffer?>(null)
38+
private val bufferChan = Channel<SdkByteBuffer>(Channel.UNLIMITED)
3939

4040
init {
4141
producerJob.invokeOnCompletion { cause ->
@@ -74,7 +74,7 @@ public class ReadChannelBodyStream(
7474
// immediately in the current thread. The coroutine will fill the buffer but won't suspend because
7575
// we know data is available.
7676
launch(start = CoroutineStart.UNDISPATCHED) {
77-
val sdkBuffer = SdkBuffer(bodyChan.availableForRead)
77+
val sdkBuffer = SdkByteBuffer(bodyChan.availableForRead.toULong())
7878
bodyChan.readAvailable(sdkBuffer)
7979
bufferChan.send(sdkBuffer)
8080
}.invokeOnCompletion { cause ->
@@ -99,7 +99,7 @@ public class ReadChannelBodyStream(
9999

100100
transferRequestBody(outgoing, buffer)
101101

102-
if (outgoing.readRemaining > 0) {
102+
if (outgoing.readRemaining > 0u) {
103103
currBuffer.value = outgoing
104104
}
105105

aws-runtime/crt-util/jvm/src/aws/sdk/kotlin/runtime/crt/RequestUtilsJVM.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
package aws.sdk.kotlin.runtime.crt
77

88
import aws.sdk.kotlin.crt.io.MutableBuffer
9-
import aws.smithy.kotlin.runtime.io.SdkBuffer
9+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
1010
import aws.smithy.kotlin.runtime.io.readAvailable
1111

12-
internal actual fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer) {
12+
internal actual fun transferRequestBody(outgoing: SdkByteBuffer, dest: MutableBuffer) {
1313
outgoing.readAvailable(dest.buffer)
1414
}

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package aws.sdk.kotlin.runtime.http.engine.crt
77

88
import aws.sdk.kotlin.crt.io.Buffer
9-
import aws.smithy.kotlin.runtime.io.SdkBuffer
9+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
1010
import aws.smithy.kotlin.runtime.io.bytes
1111
import kotlinx.atomicfu.AtomicRef
1212
import kotlinx.atomicfu.atomic
@@ -92,7 +92,7 @@ internal abstract class AbstractBufferedReadChannel(
9292
}
9393

9494
override suspend fun readRemaining(limit: Int): ByteArray {
95-
val buffer = SdkBuffer(minOf(availableForRead, limit))
95+
val buffer = SdkByteBuffer(minOf(availableForRead, limit).toULong())
9696

9797
val consumed = readAsMuchAsPossible(buffer, limit)
9898

@@ -103,7 +103,7 @@ internal abstract class AbstractBufferedReadChannel(
103103
}
104104
}
105105

106-
protected fun readAsMuchAsPossible(dest: SdkBuffer, limit: Int): Int {
106+
protected fun readAsMuchAsPossible(dest: SdkByteBuffer, limit: Int): Int {
107107
var consumed = 0
108108
var remaining = limit
109109

@@ -116,15 +116,15 @@ internal abstract class AbstractBufferedReadChannel(
116116

117117
markBytesConsumed(rc)
118118

119-
if (segment.readRemaining > 0) {
119+
if (segment.readRemaining > 0u) {
120120
currSegment.update { segment }
121121
}
122122
}
123123

124124
return consumed
125125
}
126126

127-
private suspend fun readRemainingSuspend(buffer: SdkBuffer, limit: Int): ByteArray {
127+
private suspend fun readRemainingSuspend(buffer: SdkByteBuffer, limit: Int): ByteArray {
128128
check(currSegment.value == null) { "current segment should be drained already" }
129129

130130
var consumed = 0
@@ -137,7 +137,7 @@ internal abstract class AbstractBufferedReadChannel(
137137
markBytesConsumed(rc)
138138

139139
if (remaining <= 0) {
140-
if (segment.readRemaining > 0) {
140+
if (segment.readRemaining > 0u) {
141141
currSegment.update { segment }
142142
}
143143
break
@@ -162,7 +162,7 @@ internal abstract class AbstractBufferedReadChannel(
162162

163163
markBytesConsumed(rc)
164164

165-
if (segment.readRemaining > 0) {
165+
if (segment.readRemaining > 0u) {
166166
currSegment.update { segment }
167167
}
168168
}

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/Segment.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@
55

66
package aws.sdk.kotlin.runtime.http.engine.crt
77

8-
import aws.smithy.kotlin.runtime.io.SdkBuffer
8+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
99
import aws.smithy.kotlin.runtime.io.readFully
1010

11-
internal typealias Segment = SdkBuffer
11+
internal typealias Segment = SdkByteBuffer
1212

1313
/**
1414
* Create a segment from the given src [ByteArray] and mark the entire contents readable
1515
*/
16-
internal fun newReadableSegment(src: ByteArray): Segment = Segment.of(src).apply { commitWritten(src.size) }
16+
internal fun newReadableSegment(src: ByteArray): Segment = Segment.of(src).apply { advance(src.size.toULong()) }
1717

18-
internal fun Segment.copyTo(dest: SdkBuffer, limit: Int = Int.MAX_VALUE): Int {
19-
check(readRemaining > 0) { "nothing left to read from segment" }
20-
val wc = minOf(readRemaining, limit)
18+
internal fun Segment.copyTo(dest: SdkByteBuffer, limit: Int = Int.MAX_VALUE): Int {
19+
check(readRemaining > 0u) { "nothing left to read from segment" }
20+
val wc = minOf(readRemaining, limit.toULong())
2121
readFully(dest, wc)
22-
return wc
22+
return wc.toInt()
2323
}
2424

2525
internal fun Segment.copyTo(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset): Int {
26-
check(readRemaining > 0) { "nothing left to read from segment" }
27-
val wc = minOf(length, readRemaining)
26+
check(readRemaining > 0u) { "nothing left to read from segment" }
27+
val wc = minOf(length.toULong(), readRemaining).toInt()
2828
readFully(dest, offset, wc)
2929
return wc
3030
}

aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/SegmentTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package aws.sdk.kotlin.runtime.http.engine.crt
77

8-
import aws.smithy.kotlin.runtime.io.SdkBuffer
8+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
99
import aws.smithy.kotlin.runtime.io.decodeToString
1010
import kotlin.test.Test
1111
import kotlin.test.assertEquals
@@ -23,11 +23,11 @@ class SegmentTest {
2323
@Test
2424
fun testCopyToSdkBuffer() {
2525
val segment = newReadableSegment("1234".encodeToByteArray())
26-
val dest = SdkBuffer(16)
26+
val dest = SdkByteBuffer(16u)
2727
val rc = segment.copyTo(dest)
2828
assertEquals(4, rc)
29-
assertEquals(4, dest.writePosition)
30-
assertEquals(4, dest.readRemaining)
29+
assertEquals(4u, dest.writePosition)
30+
assertEquals(4u, dest.readRemaining)
3131
assertEquals("1234", dest.decodeToString())
3232
}
3333
}

aws-runtime/http-client-engine-crt/jvm/src/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelJVM.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package aws.sdk.kotlin.runtime.http.engine.crt
77

8-
import aws.smithy.kotlin.runtime.io.SdkBuffer
8+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
99
import aws.smithy.kotlin.runtime.io.of
1010
import java.nio.ByteBuffer
1111

@@ -18,7 +18,7 @@ internal class BufferedReadChannelImpl(
1818

1919
override suspend fun readAvailable(sink: ByteBuffer): Int {
2020
if (sink.remaining() == 0) return 0
21-
val sdkSink = SdkBuffer.of(sink)
21+
val sdkSink = SdkByteBuffer.of(sink)
2222
val consumed = readAsMuchAsPossible(sdkSink, sink.remaining())
2323
return when {
2424
consumed == 0 && closed != null -> -1
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
description = "Support for the vnd.amazon.event-stream content type"
7+
extra["displayName"] = "AWS :: SDK :: Kotlin :: Protocols :: Event Stream"
8+
extra["moduleName"] = "aws.sdk.kotlin.runtime.protocol.eventstream"
9+
10+
kotlin {
11+
sourceSets {
12+
commonMain {
13+
dependencies {
14+
api(project(":aws-runtime:aws-core"))
15+
}
16+
}
17+
18+
commonTest {
19+
dependencies {
20+
implementation(project(":aws-runtime:testing"))
21+
}
22+
}
23+
}
24+
}
25+
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package aws.sdk.kotlin.runtime.protocol.eventstream
7+
8+
import aws.sdk.kotlin.runtime.InternalSdkApi
9+
import aws.smithy.kotlin.runtime.io.Buffer
10+
import aws.smithy.kotlin.runtime.io.SdkByteBuffer
11+
import aws.smithy.kotlin.runtime.io.readFully
12+
13+
@InternalSdkApi
14+
public class FrameDecoder {
15+
private var prelude: Prelude? = null
16+
17+
/**
18+
* Reset the decoder discarding any intermediate state
19+
*/
20+
public fun reset() { prelude = null }
21+
22+
private fun isFrameAvailable(buffer: Buffer): Boolean {
23+
val totalLen = prelude?.totalLen ?: return false
24+
val remaining = totalLen - PRELUDE_BYTE_LEN_WITH_CRC
25+
return buffer.readRemaining >= remaining.toULong()
26+
}
27+
28+
/**
29+
* Attempt to decode a [Message] from the buffer. This function expects to be called over and over again
30+
* with more data in the buffer each time its called. When there is not enough data to decode this function
31+
* returns null.
32+
* The decoder will consume the prelude when enough data is available. When it is invoked with enough
33+
* data it will consume the remaining message bytes.
34+
*/
35+
public fun decodeFrame(buffer: Buffer): Message? {
36+
if (prelude == null && buffer.readRemaining >= PRELUDE_BYTE_LEN_WITH_CRC.toULong()) {
37+
prelude = Prelude.decode(buffer)
38+
}
39+
40+
return when (isFrameAvailable(buffer)) {
41+
true -> {
42+
val currPrelude = checkNotNull(prelude)
43+
val messageBuf = SdkByteBuffer(currPrelude.totalLen.toULong())
44+
currPrelude.encode(messageBuf)
45+
buffer.readFully(messageBuf)
46+
reset()
47+
Message.decode(messageBuf)
48+
}
49+
else -> null
50+
}
51+
}
52+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package aws.sdk.kotlin.runtime.protocol.eventstream
7+
8+
import aws.sdk.kotlin.runtime.InternalSdkApi
9+
import aws.smithy.kotlin.runtime.io.*
10+
11+
private const val MIN_HEADER_LEN = 2
12+
private const val MAX_HEADER_NAME_LEN = 255
13+
14+
/*
15+
Header Wire Format
16+
17+
+--------------------+
18+
|Hdr Name Len (8) |
19+
+--------------------+-----------------------------------------------+
20+
| Header Name (*) ... |
21+
+--------------------+-----------------------------------------------+
22+
|Hdr Value Type (8) |
23+
+--------------------+-----------------------------------------------+
24+
| Header Value (*) ... |
25+
+--------------------------------------------------------------------+
26+
*/
27+
28+
/**
29+
* An event stream frame header
30+
*/
31+
@InternalSdkApi
32+
public data class Header(val name: String, val value: HeaderValue) {
33+
public companion object {
34+
/**
35+
* Read an encoded header from the [buffer]
36+
*/
37+
public fun decode(buffer: Buffer): Header {
38+
check(buffer.readRemaining >= MIN_HEADER_LEN.toULong()) { "Invalid frame header; require at least $MIN_HEADER_LEN bytes" }
39+
val nameLen = buffer.readByte().toInt()
40+
check(nameLen > 0) { "Invalid header name length: $nameLen" }
41+
val nameBytes = ByteArray(nameLen)
42+
buffer.readFully(nameBytes)
43+
val value = HeaderValue.decode(buffer)
44+
return Header(nameBytes.decodeToString(), value)
45+
}
46+
}
47+
48+
/**
49+
* Encode a header to [dest] buffer
50+
*/
51+
public fun encode(dest: MutableBuffer) {
52+
val bytes = name.encodeToByteArray()
53+
check(bytes.size < MAX_HEADER_NAME_LEN) { "Header name too long" }
54+
dest.writeByte(bytes.size.toByte())
55+
dest.writeFully(bytes)
56+
value.encode(dest)
57+
}
58+
}

0 commit comments

Comments
 (0)