Skip to content

Commit 0f15c3f

Browse files
committed
Buffer implementation
1 parent f5c5af0 commit 0f15c3f

File tree

8 files changed

+95
-346
lines changed

8 files changed

+95
-346
lines changed

runtime/runtime-core/api/runtime-core.api

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,9 @@ public final class aws/smithy/kotlin/runtime/io/SdkSourceJVMKt {
10311031

10321032
public final class aws/smithy/kotlin/runtime/io/SdkSourceKt {
10331033
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
1034+
public static final fun readToByteArray (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1035+
public static final fun toSdkByteReadChannel (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
1036+
public static synthetic fun toSdkByteReadChannel$default (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
10341037
}
10351038

10361039
public final class aws/smithy/kotlin/runtime/io/internal/ConvertKt {

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
package aws.smithy.kotlin.runtime.io
77

88
import aws.smithy.kotlin.runtime.InternalApi
9+
import aws.smithy.kotlin.runtime.io.internal.JobChannel
10+
import kotlinx.coroutines.CoroutineName
911
import kotlinx.coroutines.CoroutineScope
12+
import kotlinx.coroutines.DelicateCoroutinesApi
13+
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.GlobalScope
15+
import kotlinx.coroutines.IO
16+
import kotlinx.coroutines.ensureActive
17+
import kotlinx.coroutines.launch
18+
import kotlinx.coroutines.withContext
1019

1120
/**
1221
* A source for reading a stream of bytes (e.g. from file, network, or in-memory buffer). Sources may
@@ -43,16 +52,42 @@ public interface SdkSource : Closeable {
4352
* Consume the [SdkSource] and pull the entire contents into memory as a [ByteArray].
4453
*/
4554
@InternalApi
46-
public expect suspend fun SdkSource.readToByteArray(): ByteArray
55+
public suspend fun SdkSource.readToByteArray(): ByteArray = withContext(Dispatchers.IO) {
56+
use { it.buffer().readByteArray() }
57+
}
4758

4859
/**
4960
* Convert the [SdkSource] to an [SdkByteReadChannel]. Content is read from the source and forwarded
5061
* to the channel.
5162
* @param coroutineScope the coroutine scope to use to launch a background reader channel responsible for propagating data
5263
* between source and the returned channel
5364
*/
65+
@OptIn(DelicateCoroutinesApi::class)
5466
@InternalApi
55-
public expect fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel
67+
public fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel {
68+
val source = this
69+
val ch = JobChannel()
70+
val scope = coroutineScope ?: GlobalScope
71+
val job = scope.launch(Dispatchers.IO + CoroutineName("sdk-source-reader")) {
72+
val buffer = SdkBuffer()
73+
val result = runCatching {
74+
source.use {
75+
while (true) {
76+
ensureActive()
77+
val rc = source.read(buffer, DEFAULT_BYTE_CHANNEL_MAX_BUFFER_SIZE.toLong())
78+
if (rc == -1L) break
79+
ch.write(buffer)
80+
}
81+
}
82+
}
83+
84+
ch.close(result.exceptionOrNull())
85+
}
86+
87+
ch.attachJob(job)
88+
89+
return ch
90+
}
5691

5792
/**
5893
* Remove exactly [byteCount] bytes from this source and appends them to [sink].

runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/io/SdkBufferedSinkTest.kt

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ abstract class AbstractBufferedSinkTest(
2525
private val data = SdkBuffer()
2626
private val sink = factory(data)
2727

28-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
2928
@Test
3029
fun testWriteByte() {
3130
sink.writeByte(0xDE.toByte())
@@ -36,7 +35,6 @@ abstract class AbstractBufferedSinkTest(
3635
assertEquals("[hex=deadbeef]", data.toString())
3736
}
3837

39-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
4038
@Test
4139
fun testWriteShort() {
4240
sink.writeShort(0xdead.toShort())
@@ -45,7 +43,6 @@ abstract class AbstractBufferedSinkTest(
4543
assertEquals("[hex=deadbeef]", data.toString())
4644
}
4745

48-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
4946
@Test
5047
fun testWriteShortLe() {
5148
sink.writeShortLe(0xdead.toShort())
@@ -54,55 +51,48 @@ abstract class AbstractBufferedSinkTest(
5451
assertEquals("[hex=addeefbe]", data.toString())
5552
}
5653

57-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
5854
@Test
5955
fun testWriteInt() {
6056
sink.writeInt(0xdeadbeef.toInt())
6157
sink.flush()
6258
assertEquals("[hex=deadbeef]", data.toString())
6359
}
6460

65-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
6661
@Test
6762
fun testWriteLe() {
6863
sink.writeIntLe(0xdeadbeef.toInt())
6964
sink.flush()
7065
assertEquals("[hex=efbeadde]", data.toString())
7166
}
7267

73-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
7468
@Test
7569
fun testWriteLong() {
7670
sink.writeLong(-2401053092341600192)
7771
sink.flush()
7872
assertEquals("[hex=deadbeef10203040]", data.toString())
7973
}
8074

81-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
8275
@Test
8376
fun testWriteLongLe() {
8477
sink.writeLongLe(4625232074423315934)
8578
sink.flush()
8679
assertEquals("[hex=deadbeef10203040]", data.toString())
8780
}
8881

89-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
9082
@Test
9183
fun testWriteString() {
9284
sink.writeUtf8("レップはボールです")
9385
sink.flush()
9486
assertEquals("[text=レップはボールです]", data.toString())
9587
}
9688

97-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
9889
@Test
9990
fun testWriteSubstring() {
10091
sink.writeUtf8("a lep is a ball", start = 2, endExclusive = 10)
10192
sink.flush()
10293
assertEquals("lep is a", data.readUtf8())
10394
}
10495

105-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
10696
@Test
10797
fun testWriteAll() {
10898
val contents = "a tay is a hammer"
@@ -113,7 +103,6 @@ abstract class AbstractBufferedSinkTest(
113103
assertEquals(contents.length.toLong(), rc)
114104
}
115105

116-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
117106
@Test
118107
fun testReadSourceFully() {
119108
val source = object : SdkSource by SdkBuffer() {
@@ -128,7 +117,7 @@ abstract class AbstractBufferedSinkTest(
128117
assertEquals("12341234", data.readUtf8())
129118
}
130119

131-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
120+
@IgnoreNative // FIXME "Expected an exception of aws.smithy.kotlin.runtime.io.EOFException to be thrown, but was okio.EOFException"
132121
@Test
133122
fun testWriteEof() {
134123
val source: SdkSource = SdkBuffer().apply { writeUtf8("1234") }
@@ -137,15 +126,13 @@ abstract class AbstractBufferedSinkTest(
137126
assertEquals("1234", data.readUtf8())
138127
}
139128

140-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
141129
@Test
142130
fun testWriteExhausted() {
143131
val source: SdkSource = SdkBuffer()
144132
assertEquals(0, sink.writeAll(source))
145133
assertEquals(0, data.size)
146134
}
147135

148-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
149136
@Test
150137
fun testWriteExplicitZero() {
151138
val source = object : SdkSource by SdkBuffer() {
@@ -156,15 +143,13 @@ abstract class AbstractBufferedSinkTest(
156143
assertEquals(0, data.size)
157144
}
158145

159-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
160146
@Test
161147
fun testCloseFlushes() {
162148
sink.writeUtf8("a flix is a comb")
163149
sink.close()
164150
assertEquals("a flix is a comb", data.readUtf8())
165151
}
166152

167-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
168153
@Test
169154
fun testWriteByteArray() {
170155
val expected = bytes(0xde, 0xad, 0xbe, 0xef)
@@ -174,7 +159,6 @@ abstract class AbstractBufferedSinkTest(
174159
assertContentEquals(expected, actual)
175160
}
176161

177-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
178162
@Test
179163
fun testWriteByteArrayOffset() {
180164
val expected = bytes(0xde, 0xad, 0xbe, 0xef)
@@ -184,7 +168,6 @@ abstract class AbstractBufferedSinkTest(
184168
assertContentEquals(expected.sliceArray(2..3), actual)
185169
}
186170

187-
@IgnoreNative // FIXME Re-enable after Kotlin/Native implementation
188171
@Test
189172
fun testWriteByteArrayOffsetAndLimit() {
190173
val expected = bytes(0xde, 0xad, 0xbe, 0xef)

runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/io/SdkSourceJVM.kt

Lines changed: 0 additions & 42 deletions
This file was deleted.

runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/io/BufferedSinkAdapterNative.kt

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,67 +4,4 @@
44
*/
55
package aws.smithy.kotlin.runtime.io
66

7-
internal actual class BufferedSinkAdapter actual constructor(sink: okio.BufferedSink) : SdkBufferedSink {
8-
actual override val buffer: SdkBuffer
9-
get() = TODO("Not yet implemented")
10-
11-
actual override fun write(source: ByteArray, offset: Int, limit: Int) {
12-
TODO("Not yet implemented")
13-
}
14-
15-
actual override fun writeAll(source: SdkSource): Long {
16-
TODO("Not yet implemented")
17-
}
18-
19-
actual override fun write(source: SdkSource, byteCount: Long) {
20-
TODO("Not yet implemented")
21-
}
22-
23-
actual override fun writeUtf8(string: String, start: Int, endExclusive: Int) {
24-
TODO("Not yet implemented")
25-
}
26-
27-
actual override fun writeByte(x: Byte) {
28-
TODO("Not yet implemented")
29-
}
30-
31-
actual override fun writeShort(x: Short) {
32-
TODO("Not yet implemented")
33-
}
34-
35-
actual override fun writeShortLe(x: Short) {
36-
TODO("Not yet implemented")
37-
}
38-
39-
actual override fun writeInt(x: Int) {
40-
TODO("Not yet implemented")
41-
}
42-
43-
actual override fun writeIntLe(x: Int) {
44-
TODO("Not yet implemented")
45-
}
46-
47-
actual override fun writeLong(x: Long) {
48-
TODO("Not yet implemented")
49-
}
50-
51-
actual override fun writeLongLe(x: Long) {
52-
TODO("Not yet implemented")
53-
}
54-
55-
actual override fun flush() {
56-
TODO("Not yet implemented")
57-
}
58-
59-
actual override fun emit() {
60-
TODO("Not yet implemented")
61-
}
62-
63-
actual override fun write(source: SdkBuffer, byteCount: Long) {
64-
TODO("Not yet implemented")
65-
}
66-
67-
actual override fun close() {
68-
TODO("Not yet implemented")
69-
}
70-
}
7+
internal actual class BufferedSinkAdapter actual constructor(sink: okio.BufferedSink) : SdkBufferedSink, AbstractBufferedSinkAdapter(sink)

0 commit comments

Comments
 (0)