Skip to content

Commit 870a64b

Browse files
authored
refactor: bind default HTTP engine to CRT (#148)
1 parent 2ca4e3f commit 870a64b

File tree

24 files changed

+1844
-7
lines changed

24 files changed

+1844
-7
lines changed

builder.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
"name": "smithy-kotlin"
3838
},
3939
{
40-
"name": "aws-crt-kotlin",
41-
"revision": "bootstrap-jvm"
40+
"name": "aws-crt-kotlin"
4241
}
4342
]
4443
}

client-runtime/crt-util/common/src/aws/sdk/kotlin/crt/SdkDefaultIO.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import aws.sdk.kotlin.crt.io.EventLoopGroup
1010
import aws.sdk.kotlin.crt.io.HostResolver
1111
import aws.sdk.kotlin.runtime.InternalSdkApi
1212

13+
// FIXME - this should default to number of processors
1314
private const val DEFAULT_EVENT_LOOP_THREAD_COUNT: Int = 1
1415

1516
/**
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
buildscript {
6+
val atomicFuVersion: String by project
7+
repositories {
8+
mavenCentral()
9+
}
10+
11+
dependencies {
12+
classpath("org.jetbrains.kotlinx:atomicfu-gradle-plugin:$atomicFuVersion")
13+
}
14+
}
15+
16+
apply(plugin = "kotlinx-atomicfu")
17+
18+
description = "HTTP client engine backed by CRT"
19+
extra["displayName"] = "Software :: AWS :: Kotlin SDK :: HTTP"
20+
extra["moduleName"] = "aws.sdk.kotlin.runtime.http.engine.crt"
21+
22+
val smithyKotlinVersion: String by project
23+
val coroutinesVersion: String by project
24+
val atomicFuVersion: String by project
25+
26+
kotlin {
27+
sourceSets {
28+
commonMain {
29+
dependencies {
30+
api(project(":client-runtime:aws-client-rt"))
31+
api("software.aws.smithy.kotlin:http:$smithyKotlinVersion")
32+
implementation("software.aws.smithy.kotlin:logging:$smithyKotlinVersion")
33+
implementation(project(":client-runtime:crt-util"))
34+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
35+
36+
implementation("org.jetbrains.kotlinx:atomicfu:$atomicFuVersion")
37+
}
38+
}
39+
40+
commonTest {
41+
dependencies {
42+
implementation(project(":client-runtime:testing"))
43+
}
44+
}
45+
}
46+
}
47+
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package aws.sdk.kotlin.runtime.http.engine.crt
7+
8+
import aws.sdk.kotlin.crt.io.Buffer
9+
import kotlinx.atomicfu.AtomicRef
10+
import kotlinx.atomicfu.atomic
11+
import kotlinx.atomicfu.update
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.channels.Channel
14+
import kotlinx.coroutines.channels.ClosedReceiveChannelException
15+
import software.aws.clientrt.io.SdkBuffer
16+
import software.aws.clientrt.io.bytes
17+
import kotlin.coroutines.resume
18+
import kotlin.coroutines.resumeWithException
19+
20+
internal data class ClosedSentinel(val cause: Throwable?)
21+
22+
/**
23+
* Abstract base class that platform implementations should inherit from
24+
*/
25+
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
26+
internal abstract class AbstractBufferedReadChannel(
27+
// function invoked every time n bytes are read
28+
private val onBytesRead: (n: Int) -> Unit
29+
) : BufferedReadChannel {
30+
31+
// NOTE: the channel is configured as unlimited but will always be constrained by the window size such
32+
// that there are only ever WINDOW_SIZE _bytes_ in-flight at any given time
33+
private val segments = Channel<Segment>(Channel.UNLIMITED)
34+
35+
private val currSegment: AtomicRef<Segment?> = atomic(null)
36+
37+
private val readOp: AtomicRef<CancellableContinuation<Boolean>?> = atomic(null)
38+
39+
private val _closed: AtomicRef<ClosedSentinel?> = atomic(null)
40+
protected val closed: ClosedSentinel?
41+
get() = _closed.value
42+
43+
private val _availableForRead = atomic(0)
44+
45+
override val isClosedForWrite: Boolean
46+
get() = segments.isClosedForSend
47+
48+
override val isClosedForRead: Boolean
49+
get() = closed != null && segments.isClosedForReceive
50+
51+
override val availableForRead: Int
52+
get() = _availableForRead.value
53+
54+
/**
55+
* Suspend reading until at least [requested] bytes are available to read or the channel is closed.
56+
* If the requested amount can be fulfilled immediately this function will return without suspension.
57+
*/
58+
protected suspend fun readSuspend(requested: Int): Boolean {
59+
// can fulfill immediately without suspension
60+
if (availableForRead >= requested) return true
61+
62+
closed?.let { closed ->
63+
// if already closed - rethrow
64+
closed.cause?.let { rethrowClosed(it) }
65+
66+
// no more data is coming
67+
return availableForRead >= requested
68+
}
69+
70+
return suspendCancellableCoroutine { cont ->
71+
setReadContinuation(cont)
72+
}
73+
}
74+
75+
private fun setReadContinuation(cont: CancellableContinuation<Boolean>) {
76+
val success = readOp.compareAndSet(null, cont)
77+
check(success) { "Read operation already in progress" }
78+
}
79+
80+
private fun resumeRead() {
81+
readOp.getAndSet(null)?.resume(true)
82+
}
83+
84+
/**
85+
* Decrease the amount of bytes available for reading and notify the callback
86+
*/
87+
@Suppress("NOTHING_TO_INLINE")
88+
private inline fun markBytesConsumed(size: Int) {
89+
// NOTE: +/- operators ARE atomic
90+
_availableForRead -= size
91+
onBytesRead(size)
92+
}
93+
94+
override suspend fun readRemaining(limit: Int): ByteArray {
95+
val buffer = SdkBuffer(minOf(availableForRead, limit))
96+
97+
val consumed = readAsMuchAsPossible(buffer, limit)
98+
99+
return if (consumed >= limit) {
100+
buffer.bytes()
101+
} else {
102+
readRemainingSuspend(buffer, limit - consumed)
103+
}
104+
}
105+
106+
protected fun readAsMuchAsPossible(dest: SdkBuffer, limit: Int): Int {
107+
var consumed = 0
108+
var remaining = limit
109+
110+
while (availableForRead > 0 && remaining > 0) {
111+
val segment = currSegment.getAndSet(null) ?: segments.tryReceive().getOrNull() ?: break
112+
113+
val rc = segment.copyTo(dest, remaining)
114+
consumed += rc
115+
remaining = limit - consumed
116+
117+
markBytesConsumed(rc)
118+
119+
if (segment.readRemaining > 0) {
120+
currSegment.update { segment }
121+
}
122+
}
123+
124+
return consumed
125+
}
126+
127+
private suspend fun readRemainingSuspend(buffer: SdkBuffer, limit: Int): ByteArray {
128+
check(currSegment.value == null) { "current segment should be drained already" }
129+
130+
var consumed = 0
131+
132+
for (segment in segments) {
133+
val remaining = limit - consumed
134+
val rc = segment.copyTo(buffer, remaining)
135+
consumed += rc
136+
137+
markBytesConsumed(rc)
138+
139+
if (remaining <= 0) {
140+
if (segment.readRemaining > 0) {
141+
currSegment.update { segment }
142+
}
143+
break
144+
}
145+
}
146+
147+
return buffer.bytes()
148+
}
149+
150+
private fun readAsMuchAsPossible(dest: ByteArray, offset: Int, length: Int): Int {
151+
var consumed = 0
152+
var currOffset = offset
153+
var remaining = length
154+
155+
while (availableForRead > 0 && remaining > 0) {
156+
val segment = currSegment.getAndSet(null) ?: segments.tryReceive().getOrNull() ?: break
157+
158+
val rc = segment.copyTo(dest, currOffset, remaining)
159+
consumed += rc
160+
currOffset += rc
161+
remaining = length - consumed
162+
163+
markBytesConsumed(rc)
164+
165+
if (segment.readRemaining > 0) {
166+
currSegment.update { segment }
167+
}
168+
}
169+
170+
return consumed
171+
}
172+
173+
override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) {
174+
val rc = readAsMuchAsPossible(sink, offset, length)
175+
if (rc < length) {
176+
readFullySuspend(sink, offset + rc, length - rc)
177+
}
178+
}
179+
180+
private suspend fun readFullySuspend(dest: ByteArray, offset: Int, length: Int) {
181+
var consumed = 0
182+
var currOffset = offset
183+
var remaining = length
184+
185+
do {
186+
if (!readSuspend(1)) {
187+
throw ClosedReceiveChannelException("Unexpeced EOF: expected $remaining more bytes")
188+
}
189+
190+
consumed += readAsMuchAsPossible(dest, currOffset, remaining)
191+
currOffset += consumed
192+
remaining -= consumed
193+
} while (remaining > 0)
194+
}
195+
196+
override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int {
197+
val consumed = readAsMuchAsPossible(sink, offset, length)
198+
return when {
199+
consumed == 0 && closed != null -> -1
200+
consumed > 0 || length == 0 -> consumed
201+
else -> readAvailableSuspend(sink, offset, length)
202+
}
203+
}
204+
205+
private suspend fun readAvailableSuspend(dest: ByteArray, offset: Int, length: Int): Int {
206+
if (!readSuspend(1)) {
207+
return -1
208+
}
209+
return readAvailable(dest, offset, length)
210+
}
211+
212+
override fun write(data: Buffer) {
213+
// TODO - we could pool these allocations
214+
val bytesIn = ByteArray(data.len)
215+
val wc = data.copyTo(bytesIn)
216+
check(wc == bytesIn.size) { "short read: copied $wc; expected: ${bytesIn.size} " }
217+
218+
// TODO - only emit full segments or partial when closed?
219+
220+
val segment = newReadableSegment(bytesIn)
221+
val result = segments.trySend(segment)
222+
check(result.isSuccess) { "failed to queue segment" }
223+
224+
// advertise bytes available
225+
_availableForRead.getAndAdd(bytesIn.size)
226+
227+
resumeRead()
228+
}
229+
230+
override suspend fun awaitContent() {
231+
readSuspend(1)
232+
}
233+
234+
override fun cancel(cause: Throwable?): Boolean {
235+
val success = _closed.compareAndSet(null, ClosedSentinel(cause))
236+
if (!success) return false
237+
238+
segments.close()
239+
240+
readOp.getAndSet(null)?.let { cont ->
241+
if (cause != null) {
242+
cont.resumeWithException(cause)
243+
} else {
244+
cont.resume(availableForRead > 0)
245+
}
246+
}
247+
248+
return true
249+
}
250+
251+
override fun close() {
252+
cancel(null)
253+
}
254+
255+
private fun rethrowClosed(cause: Throwable): Nothing {
256+
throw cause
257+
}
258+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package aws.sdk.kotlin.runtime.http.engine.crt
7+
8+
import aws.sdk.kotlin.crt.io.Buffer
9+
import software.aws.clientrt.io.SdkByteReadChannel
10+
11+
/**
12+
* Create a new [BufferedReadChannel] that invokes [onBytesRead] as data is consumed
13+
*/
14+
internal expect fun bufferedReadChannel(onBytesRead: (n: Int) -> Unit): BufferedReadChannel
15+
16+
/**
17+
* A buffered [SdkByteReadChannel] that can always satisfy writing without blocking / suspension
18+
*/
19+
internal interface BufferedReadChannel : SdkByteReadChannel {
20+
/**
21+
* Write the data from the buffer to the channel IMMEDIATELY without blocking or suspension
22+
*/
23+
fun write(data: Buffer)
24+
}

0 commit comments

Comments
 (0)