|
| 1 | +# Kotlin Event Streaming Design |
| 2 | + |
| 3 | +* **Type**: Design |
| 4 | +* **Author(s)**: Aaron Todd |
| 5 | + |
| 6 | +# Abstract |
| 7 | + |
| 8 | +This document covers the client interfaces to be generated for requests/responses with event stream payloads |
| 9 | +(`@streaming` Smithy trait applied to a `union` shape). See the [event stream spec](https://awslabs.github.io/smithy/1.0/spec/core/stream-traits.html#event-streams). |
| 10 | + |
| 11 | +Reference the additional documents listed in the Appendix for surrounding context on Smithy. |
| 12 | + |
| 13 | + |
| 14 | +# Design |
| 15 | + |
| 16 | +## Client Interface |
| 17 | + |
| 18 | +### Model |
| 19 | + |
| 20 | +We will use the examples from the event stream spec to explore the client interfaces to be generated: |
| 21 | + |
| 22 | + |
| 23 | +**Event stream on an input** |
| 24 | + |
| 25 | +``` |
| 26 | +namespace smithy.example |
| 27 | +
|
| 28 | +operation PublishMessages { |
| 29 | + input: PublishMessagesInput |
| 30 | +} |
| 31 | +
|
| 32 | +@input |
| 33 | +structure PublishMessagesInput { |
| 34 | + room: String, |
| 35 | + messages: PublishEvents, |
| 36 | +} |
| 37 | +
|
| 38 | +@streaming |
| 39 | +union PublishEvents { |
| 40 | + message: Message, |
| 41 | + leave: LeaveEvent, |
| 42 | +} |
| 43 | +
|
| 44 | +structure Message { |
| 45 | + message: String, |
| 46 | +} |
| 47 | +
|
| 48 | +structure LeaveEvent {} |
| 49 | +``` |
| 50 | + |
| 51 | +**Event stream on an output** |
| 52 | + |
| 53 | +``` |
| 54 | +namespace smithy.example |
| 55 | +
|
| 56 | +operation SubscribeToMovements { |
| 57 | + input: SubscribeToMovementsInput, |
| 58 | + output: SubscribeToMovementsOutput |
| 59 | +} |
| 60 | +
|
| 61 | +@input |
| 62 | +structure SubscribeToMovementsInput {} |
| 63 | +
|
| 64 | +@output |
| 65 | +structure SubscribeToMovementsOutput { |
| 66 | + movements: MovementEvents, |
| 67 | +} |
| 68 | +
|
| 69 | +@streaming |
| 70 | +union MovementEvents { |
| 71 | + up: Movement, |
| 72 | + down: Movement, |
| 73 | + left: Movement, |
| 74 | + right: Movement, |
| 75 | + throttlingError: ThrottlingError |
| 76 | +} |
| 77 | +
|
| 78 | +structure Movement { |
| 79 | + velocity: Float, |
| 80 | +} |
| 81 | +
|
| 82 | +/// An example error emitted when the client is throttled |
| 83 | +/// and should terminate the event stream. |
| 84 | +@error("client") |
| 85 | +@retryable(throttling: true) |
| 86 | +structure ThrottlingError {} |
| 87 | +``` |
| 88 | + |
| 89 | + |
| 90 | +### Event Stream Type Representation |
| 91 | + |
| 92 | +The members of an operation input or output that target a stream will be represented with an asynchronous [Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html) |
| 93 | +from the `kotlinx-coroutines-core` library. `Flow` is a natural fit for representing asynchronous streams. |
| 94 | + |
| 95 | +`Flow` was chosen for pagination and already in use as part of our public API contract. Any alternative to this would require a custom but similar type that doesn't play well with |
| 96 | +the rest of the coroutine ecosystem. There is also prior art for representing streaming requests and responses, see [gRPC Kotlin](https://github.com/grpc/grpc-kotlin). |
| 97 | + |
| 98 | +The following types and service would be generated. |
| 99 | + |
| 100 | +NOTE: only the input and output types are shown, the other structures or unions in the model would be generated as described in [Kotlin Smithy Design](kotlin-smithy-sdk.md). |
| 101 | + |
| 102 | +#### Input Event Streams |
| 103 | + |
| 104 | + |
| 105 | +```kt |
| 106 | +class PublishMessagesRequest private constructor(builder: Builder){ |
| 107 | + val room: String? = builder.room |
| 108 | + val messages: Flow<PublishEvents>? = builder.messages |
| 109 | + |
| 110 | + ... |
| 111 | + |
| 112 | + public class Builder { |
| 113 | + var room: String? = null |
| 114 | + var messages: Flow<PublishEvents>? = null |
| 115 | + fun build(): PublishMessagesRequest = PublishMessagesRequest(this) |
| 116 | + } |
| 117 | +} |
| 118 | + |
| 119 | +``` |
| 120 | + |
| 121 | +#### Output Event Streams |
| 122 | + |
| 123 | +Output event streams would be modeled the same way as input streams. The response object would have a `Flow<T>` field that represents the response stream. |
| 124 | + |
| 125 | +```kt |
| 126 | +class SubscribeToMovementsResponse private constructor(builder: Builder){ |
| 127 | + val movements: Flow<PublishEvents>? = builder.movements |
| 128 | + |
| 129 | + ... |
| 130 | + |
| 131 | + public class Builder { |
| 132 | + var movements: Flow<MovementEvents>? = null |
| 133 | + fun build(): SubscribeToMovementsResponse = SubscribeToMovementsResponse(this) |
| 134 | + } |
| 135 | +} |
| 136 | + |
| 137 | +``` |
| 138 | + |
| 139 | + |
| 140 | +Modeling the event stream as a field of the request or response allows for [initial messages](https://awslabs.github.io/smithy/1.0/spec/core/stream-traits.html#initial-messages) |
| 141 | +to be implemented. If we directly returned or took a `Flow<T>` as the input or output type we would not be able to represent the initial request or response fields when present. |
| 142 | + |
| 143 | + |
| 144 | +### **Service and Usage** |
| 145 | + |
| 146 | +NOTE: There are types and internal details here not important to the design of how customers will interact with |
| 147 | +streaming requests/responses (e.g. serialization/deserialization). |
| 148 | +Those details are subject to change and not part of this design document. The focus here is on the way |
| 149 | +streaming is exposed to a customer. |
| 150 | + |
| 151 | + |
| 152 | +The signatures generated match that of binary streaming requests and responses. Notably that output streams take a lambda instead of returning the response directly (see [binary-streaming design](binary-streaming.md) which discusses this pattern). |
| 153 | +The response (and event stream) are only valid in that scope, after which the resources consumed by the stream are closed and no longer valid. |
| 154 | + |
| 155 | + |
| 156 | +```kt |
| 157 | +package aws.sdk.kotlin.service.Example |
| 158 | + |
| 159 | +interface ExampleClient: SdkClient { |
| 160 | + |
| 161 | + // input event stream signature |
| 162 | + suspend fun publishMessages(input: PublishMessagesRequest): PublishMessagesResponse |
| 163 | + |
| 164 | + // output event stream signature |
| 165 | + suspend fun <T> subscribeToMovements(input: SubscribeToMovementsRequest, block: suspend (SubscribeToMovementsResponse) -> T): T |
| 166 | +} |
| 167 | +``` |
| 168 | + |
| 169 | + |
| 170 | +Example Usage |
| 171 | + |
| 172 | +```kt |
| 173 | + |
| 174 | +// NOTE: Flows are cold, they do nothing until collected. They BODY will not be ran until then. |
| 175 | +suspend fun generateMessages(): Flow<PublishEvents> = flow { |
| 176 | + // BODY |
| 177 | + repeat(5) { |
| 178 | + emit(PublishEvents.Message("message-$it")) |
| 179 | + } |
| 180 | + |
| 181 | + emit(PublishEvents.LeaveEvent()) |
| 182 | +} |
| 183 | + |
| 184 | + |
| 185 | +fun main() = runBlocking{ |
| 186 | + val client = ExampleClient() |
| 187 | + |
| 188 | + // STREAMING REQUEST BODY EXAMPLE |
| 189 | + val publishRequest = PublishMessagesRequest { |
| 190 | + room = "test-room" |
| 191 | + messages = generateMessages() |
| 192 | + |
| 193 | + } |
| 194 | + |
| 195 | + client.publishMessages(publishRequest) |
| 196 | + |
| 197 | + |
| 198 | + // STREAMING RESPONSE BODY EXAMPLE |
| 199 | + |
| 200 | + val subscribeRequest = SubscribeToMovementsRequest { } |
| 201 | + |
| 202 | + client.subscribeToMovements(subscribeRequest) { resp -> |
| 203 | + resp.movements.collect { event -> |
| 204 | + when(event) { |
| 205 | + is MovementEvents.Up, |
| 206 | + is MovementEvents.Down, |
| 207 | + is MovementEvents.Left, |
| 208 | + is MovementEvents.Right -> handleMovement(event) |
| 209 | + is MovementEvents.ThrottlingError -> throw event.throttlingError |
| 210 | + else -> error("unknown event type: $event") |
| 211 | + } |
| 212 | + } |
| 213 | + |
| 214 | + } // the response/stream will no longer be valid at the end of this block though |
| 215 | + |
| 216 | +} |
| 217 | + |
| 218 | +private fun handleMovement(event: MovementEvents) { ... } |
| 219 | +``` |
| 220 | + |
| 221 | +Accepting a lambda matches what is generated for binary streams (see [binary-streaming design](binary-streaming.md)) and will provide a consistent API experience as well |
| 222 | +as the same benefits to the SDK (properly scoped lifetime for resources). |
| 223 | + |
| 224 | + |
| 225 | +# Appendix |
| 226 | + |
| 227 | + |
| 228 | +## Java Interop |
| 229 | + |
| 230 | +`Flow<T>` is not easily consumable directly from Java due to the `suspend` nature of it. JetBrains provides |
| 231 | +[reactive adapters](https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive) that can be used to convert rxJava and JDK-9 |
| 232 | +reactive streams to or from an equivalent `Flow`. Users would be responsible for creating a shim layer using these primitives provided |
| 233 | +by JetBrains which would allow them to expose the Kotlin functions however they see fit to their applications. |
| 234 | + |
| 235 | + |
| 236 | +## Additional References |
| 237 | + |
| 238 | +* [Smithy Core Spec](https://awslabs.github.io/smithy/1.0/spec/core/shapes.html) |
| 239 | +* [Event Stream Spec](https://awslabs.github.io/smithy/1.0/spec/core/stream-traits.html#event-streams) |
| 240 | +* [Kotlin Asynchronous Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html) |
| 241 | +* [Kotlin Smithy SDK](kotlin-smithy-sdk.md) |
| 242 | +* [Binary Streaming](binary-streaming.md) |
| 243 | + |
| 244 | +# Revision history |
| 245 | + |
| 246 | +* 01/19/2022 - Created |
0 commit comments