Skip to content

Commit 1ffaf31

Browse files
authored
refactor(rt): use explicit scope for event stream collection (#689)
1 parent d586c51 commit 1ffaf31

File tree

4 files changed

+24
-15
lines changed

4 files changed

+24
-15
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id": "354c9857-8af1-43b7-9bcf-bff91aa32bc5",
3+
"type": "misc",
4+
"description": "Use explict CoroutineScope for consuming event stream flow"
5+
}

aws-runtime/protocols/aws-event-stream/common/src/aws/sdk/kotlin/runtime/protocol/eventstream/FrameEncoder.kt

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import aws.smithy.kotlin.runtime.io.SdkByteChannel
1212
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
1313
import aws.smithy.kotlin.runtime.io.bytes
1414
import kotlinx.coroutines.CoroutineScope
15+
import kotlinx.coroutines.Job
1516
import kotlinx.coroutines.flow.Flow
1617
import kotlinx.coroutines.flow.map
1718
import kotlinx.coroutines.launch
18-
import kotlin.coroutines.coroutineContext
1919

2020
/**
2121
* Transform the stream of messages into a stream of raw bytes. Each
@@ -31,33 +31,37 @@ public fun Flow<Message>.encode(): Flow<ByteArray> = map {
3131

3232
/**
3333
* Transform a stream of encoded messages into an [HttpBody].
34+
* @param scope parent scope to launch a coroutine in that consumes the flow and populates a [SdkByteReadChannel]
3435
*/
3536
@InternalSdkApi
36-
public suspend fun Flow<ByteArray>.asEventStreamHttpBody(): HttpBody {
37+
public suspend fun Flow<ByteArray>.asEventStreamHttpBody(scope: CoroutineScope): HttpBody {
3738
val encodedMessages = this
3839
val ch = SdkByteChannel(true)
3940

40-
// FIXME - we should probably tie this to our own scope (off ExecutionContext) but for now
41-
// tie it to whatever arbitrary scope we are in
42-
val scope = CoroutineScope(coroutineContext)
43-
4441
return object : HttpBody.Streaming() {
4542
override val contentLength: Long? = null
4643
override val isReplayable: Boolean = false
4744
override val isDuplex: Boolean = true
45+
46+
private var job: Job? = null
47+
4848
override fun readFrom(): SdkByteReadChannel {
4949
// FIXME - delaying launch here until the channel is consumed from the HTTP engine is a hacky way
5050
// of enforcing ordering to ensure the ExecutionContext is updated with the
5151
// AwsSigningAttributes.RequestSignature by the time the messages are collected and sign() is called
52-
val job = scope.launch {
53-
encodedMessages.collect {
54-
ch.writeFully(it)
52+
53+
// Although rare, nothing stops downstream consumers from invoking readFrom() more than once.
54+
// Only launch background collection task on first call
55+
if (job == null) {
56+
job = scope.launch {
57+
encodedMessages.collect {
58+
ch.writeFully(it)
59+
}
5560
}
56-
}
5761

58-
job.invokeOnCompletion { cause ->
59-
cause?.let { it.printStackTrace() }
60-
ch.close(cause)
62+
job?.invokeOnCompletion { cause ->
63+
ch.close(cause)
64+
}
6165
}
6266

6367
return ch

aws-runtime/protocols/aws-event-stream/common/test/aws/sdk/kotlin/runtime/protocol/eventstream/FrameEncoderTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class FrameEncoderTest {
5252
"baz",
5353
).map { it.encodeToByteArray() }
5454

55-
val body = messages.asEventStreamHttpBody()
55+
val body = messages.asEventStreamHttpBody(this)
5656
val actual = body.readAll()
5757
val expected = "foobarbaz"
5858
assertEquals(expected, actual?.decodeToString())

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/eventstream/EventStreamSerializerGenerator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class EventStreamSerializerGenerator(
7575
}
7676

7777
writer.write("")
78-
writer.write("return messages.#T()", AwsRuntimeTypes.AwsEventStream.asEventStreamHttpBody)
78+
writer.write("return messages.#T(context)", AwsRuntimeTypes.AwsEventStream.asEventStreamHttpBody)
7979
}
8080

8181
private fun encodeEventStreamMessage(

0 commit comments

Comments
 (0)