Skip to content

Commit 23791f8

Browse files
authored
fix: merging Flows (#1032)
1 parent 177f282 commit 23791f8

File tree

7 files changed

+87
-2
lines changed

7 files changed

+87
-2
lines changed

aws-runtime/aws-core/api/aws-core.api

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ public class aws/sdk/kotlin/runtime/ConfigurationException : aws/sdk/kotlin/runt
2626
public fun <init> (Ljava/lang/Throwable;)V
2727
}
2828

29+
public final class aws/sdk/kotlin/runtime/FlowUtilKt {
30+
}
31+
2932
public abstract interface annotation class aws/sdk/kotlin/runtime/InternalSdkApi : java/lang/annotation/Annotation {
3033
}
3134

aws-runtime/aws-core/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ extra["displayName"] = "AWS :: SDK :: Kotlin :: Client Runtime"
88
extra["moduleName"] = "aws.sdk.kotlin.runtime"
99

1010
val smithyKotlinVersion: String by project
11+
val coroutinesVersion: String by project
1112

1213
kotlin {
1314
sourceSets {
@@ -18,6 +19,12 @@ kotlin {
1819
}
1920
}
2021

22+
commonTest {
23+
dependencies {
24+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion")
25+
}
26+
}
27+
2128
all {
2229
languageSettings.optIn("aws.smithy.kotlin.runtime.InternalApi")
2330
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.sdk.kotlin.runtime
6+
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.emitAll
9+
import kotlinx.coroutines.flow.flow
10+
11+
// FIXME relocate to smithy-kotlin
12+
@InternalSdkApi
13+
public fun <T> mergeSequential(vararg flows: Flow<T>): Flow<T> = flow {
14+
flows.forEach { flow -> emitAll(flow) }
15+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
import aws.sdk.kotlin.runtime.InternalSdkApi
6+
import aws.sdk.kotlin.runtime.mergeSequential
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.flowOf
9+
import kotlinx.coroutines.flow.toList
10+
import kotlinx.coroutines.test.runTest
11+
import kotlin.test.Test
12+
import kotlin.test.assertEquals
13+
14+
@OptIn(InternalSdkApi::class)
15+
class FlowUtilTest {
16+
17+
@Test
18+
fun testMergingFlows() = runTest {
19+
val a = flowOf(1)
20+
val b = flowOf(2, 3, 4)
21+
val merged = mergeSequential(a, b).toList()
22+
assertEquals(listOf(1, 2, 3, 4), merged)
23+
}
24+
25+
@Test
26+
fun testMergingEmptyFlow() = runTest {
27+
val a: Flow<Int> = flowOf()
28+
val b: Flow<Int> = flowOf(4, 5, 6)
29+
30+
val merged = mergeSequential(a, b).toList()
31+
assertEquals(listOf(4, 5, 6), merged)
32+
}
33+
34+
@Test
35+
fun testMergingOneFlow() = runTest {
36+
val a = flowOf(1, 2, 3)
37+
val merged = mergeSequential(a).toList()
38+
39+
assertEquals(listOf(1, 2, 3), merged)
40+
}
41+
42+
@Test
43+
fun testMergingSameFlow() = runTest {
44+
val a = flowOf(1, 2, 3)
45+
val merged = mergeSequential(a, a).toList()
46+
assertEquals(listOf(1, 2, 3, 1, 2, 3), merged)
47+
}
48+
49+
@Test
50+
fun testMergingMoreThanTwoFlows() = runTest {
51+
val a = flowOf(1)
52+
val b = flowOf(2)
53+
val c = flowOf(3)
54+
55+
val merged = mergeSequential(a, b, c).toList()
56+
assertEquals(listOf(1, 2, 3), merged)
57+
}
58+
}

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/AwsRuntimeTypes.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ object AwsRuntimeTypes {
1515
object Core : RuntimeTypePackage(AwsKotlinDependency.AWS_CORE) {
1616
val AwsErrorMetadata = symbol("AwsErrorMetadata")
1717
val ClientException = symbol("ClientException")
18+
val mergeSequential = symbol("mergeSequential")
1819

1920
object Client : RuntimeTypePackage(AwsKotlinDependency.AWS_CORE, "client") {
2021
val AwsClientOption = symbol("AwsClientOption")

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/eventstream/EventStreamParserGenerator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ class EventStreamParserGenerator(
218218
.closeAndOpenBlock("} else {")
219219
.write(
220220
"#T(#T(firstMessage), frames)",
221-
RuntimeTypes.KotlinxCoroutines.Flow.merge,
221+
AwsRuntimeTypes.Core.mergeSequential,
222222
RuntimeTypes.KotlinxCoroutines.Flow.flowOf,
223223
)
224224
.closeBlock("}")

codegen/smithy-aws-kotlin-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/protocols/eventstream/EventStreamSerializerGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package aws.sdk.kotlin.codegen.protocols.eventstream
77

8+
import aws.sdk.kotlin.codegen.AwsRuntimeTypes
89
import software.amazon.smithy.codegen.core.CodegenException
910
import software.amazon.smithy.codegen.core.Symbol
1011
import software.amazon.smithy.kotlin.codegen.core.*
@@ -93,7 +94,7 @@ class EventStreamSerializerGenerator(
9394
writer.withBlock(
9495
"val messages = #T(#T(initialRequest), stream.#T(::#T))",
9596
"",
96-
RuntimeTypes.KotlinxCoroutines.Flow.merge,
97+
AwsRuntimeTypes.Core.mergeSequential,
9798
RuntimeTypes.KotlinxCoroutines.Flow.flowOf,
9899
RuntimeTypes.KotlinxCoroutines.Flow.map,
99100
encodeFn,

0 commit comments

Comments
 (0)