Skip to content

Commit 762d583

Browse files
authored
feat(rt): add InputStream adapter for ByteStream (#945)
1 parent d741069 commit 762d583

File tree

7 files changed

+186
-17
lines changed

7 files changed

+186
-17
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "d47756d3-1127-4ed0-a71f-44ca2daebf9a",
3+
"type": "feature",
4+
"description": "Add conversion to InputStream from ByteStream",
5+
"issues": [
6+
"awslabs/aws-sdk-kotlin#617"
7+
]
8+
}

runtime/runtime-core/api/runtime-core.api

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public final class aws/smithy/kotlin/runtime/content/ByteStreamJVMKt {
122122
public static synthetic fun asByteStream$default (Ljava/io/File;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/content/ByteStream;
123123
public static synthetic fun asByteStream$default (Ljava/nio/file/Path;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/content/ByteStream;
124124
public static final fun fromFile (Laws/smithy/kotlin/runtime/content/ByteStream$Companion;Ljava/io/File;)Laws/smithy/kotlin/runtime/content/ByteStream;
125+
public static final fun toInputStream (Laws/smithy/kotlin/runtime/content/ByteStream;)Ljava/io/InputStream;
125126
public static final fun writeToFile (Laws/smithy/kotlin/runtime/content/ByteStream;Ljava/io/File;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
126127
public static final fun writeToFile (Laws/smithy/kotlin/runtime/content/ByteStream;Ljava/nio/file/Path;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
127128
}
@@ -512,6 +513,10 @@ public abstract interface class aws/smithy/kotlin/runtime/io/SdkByteReadChannel
512513
public abstract fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
513514
}
514515

516+
public final class aws/smithy/kotlin/runtime/io/SdkByteReadChannelJVMKt {
517+
public static final fun toInputStream (Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;)Ljava/io/InputStream;
518+
}
519+
515520
public final class aws/smithy/kotlin/runtime/io/SdkByteReadChannelKt {
516521
public static final fun readAll (Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;Laws/smithy/kotlin/runtime/io/SdkSink;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
517522
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;Laws/smithy/kotlin/runtime/io/SdkBuffer;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;

runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ package aws.smithy.kotlin.runtime.content
88
import aws.smithy.kotlin.runtime.io.*
99
import kotlinx.coroutines.Dispatchers
1010
import kotlinx.coroutines.withContext
11+
import java.io.ByteArrayInputStream
1112
import java.io.File
13+
import java.io.InputStream
1214
import java.nio.file.Path
1315
import kotlin.io.use
1416

@@ -86,3 +88,12 @@ private suspend fun File.writeAll(chan: SdkByteReadChannel): Long =
8688
* @return the number of bytes written
8789
*/
8890
public suspend fun ByteStream.writeToFile(path: Path): Long = writeToFile(path.toFile())
91+
92+
/**
93+
* Create a blocking [InputStream] that reads from the underlying [ByteStream].
94+
*/
95+
public fun ByteStream.toInputStream(): InputStream = when (this) {
96+
is ByteStream.Buffer -> ByteArrayInputStream(bytes())
97+
is ByteStream.ChannelStream -> readFrom().toInputStream()
98+
is ByteStream.SourceStream -> readFrom().buffer().inputStream()
99+
}

runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/io/SdkBufferedSourceJVM.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public actual sealed interface SdkBufferedSource : SdkSource, ReadableByteChanne
109109
public actual fun readUtf8(byteCount: Long): String
110110

111111
/**
112-
* Get an input stream that writes to this source
112+
* Get an input stream that reads from this source
113113
*/
114114
public fun inputStream(): InputStream
115115

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.smithy.kotlin.runtime.io
6+
7+
import kotlinx.coroutines.runBlocking
8+
import java.io.InputStream
9+
import java.util.*
10+
11+
/**
12+
* Create a blocking [InputStream] that blocks everytime the channel suspends at [SdkByteReadChannel.read]
13+
*/
14+
public fun SdkByteReadChannel.toInputStream(): InputStream = InputAdapter(this)
15+
16+
private const val DEFAULT_READ_BYTES = 8192L
17+
private class InputAdapter(private val ch: SdkByteReadChannel) : InputStream() {
18+
19+
private val buffer = SdkBuffer()
20+
21+
override fun read(): Int {
22+
if (ch.isClosedForRead && buffer.size == 0L) return -1
23+
24+
if (buffer.size == 0L) {
25+
val rc = readBlocking()
26+
if (rc == -1L) return -1
27+
}
28+
29+
return buffer.readByte().toInt() and 0xff
30+
}
31+
32+
override fun read(b: ByteArray, off: Int, len: Int): Int {
33+
if (off < 0 || len < 0 || len > b.size - off) {
34+
throw IndexOutOfBoundsException()
35+
} else if (len == 0) {
36+
return 0
37+
}
38+
39+
if (ch.isClosedForRead && buffer.size == 0L) return -1
40+
if (buffer.size == 0L) {
41+
val rc = readBlocking()
42+
if (rc == -1L) return -1
43+
}
44+
45+
return buffer.read(b, off, len)
46+
}
47+
48+
private fun readBlocking(): Long =
49+
runBlocking {
50+
ch.read(buffer, DEFAULT_READ_BYTES)
51+
}
52+
53+
override fun available(): Int = ch.availableForRead
54+
55+
override fun close() {
56+
super.close()
57+
ch.cancel(null)
58+
}
59+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.smithy.kotlin.runtime.content
6+
7+
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
8+
import aws.smithy.kotlin.runtime.io.SdkSource
9+
import aws.smithy.kotlin.runtime.io.source
10+
import java.io.InputStream
11+
import kotlin.test.Test
12+
import kotlin.test.assertContentEquals
13+
import kotlin.test.assertEquals
14+
15+
fun interface ByteStreamFactory {
16+
fun inputStream(input: ByteArray): InputStream
17+
companion object {
18+
val BYTE_ARRAY: ByteStreamFactory = ByteStreamFactory { input -> ByteStream.fromBytes(input).toInputStream() }
19+
20+
val SDK_SOURCE: ByteStreamFactory = ByteStreamFactory { input ->
21+
object : ByteStream.SourceStream() {
22+
override fun readFrom(): SdkSource = input.source()
23+
override val contentLength: Long = input.size.toLong()
24+
}.toInputStream()
25+
}
26+
27+
val SDK_CHANNEL: ByteStreamFactory = ByteStreamFactory { input ->
28+
object : ByteStream.ChannelStream() {
29+
override fun readFrom(): SdkByteReadChannel = SdkByteReadChannel(input)
30+
override val contentLength: Long = input.size.toLong()
31+
}.toInputStream()
32+
}
33+
}
34+
}
35+
36+
class ByteStreamBufferInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.BYTE_ARRAY)
37+
class ByteStreamSourceStreamInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.SDK_SOURCE)
38+
class ByteStreamChannelSourceInputStreamTest : ByteStreamInputStreamTest(ByteStreamFactory.SDK_CHANNEL)
39+
40+
abstract class ByteStreamInputStreamTest(
41+
private val factory: ByteStreamFactory,
42+
) {
43+
@Test
44+
fun testReadOneByteAtATime() {
45+
val expected = "a lep is a ball".repeat(1024).encodeToByteArray()
46+
val istream = factory.inputStream(expected)
47+
val bytes = mutableListOf<Byte>()
48+
do {
49+
val next = istream.read()
50+
if (next >= 0) {
51+
bytes.add(next.toByte())
52+
}
53+
} while (next >= 0)
54+
55+
val actual = bytes.toByteArray()
56+
assertEquals(0, istream.available())
57+
assertEquals(-1, istream.read())
58+
assertContentEquals(expected, actual)
59+
}
60+
61+
@Test
62+
fun testReadFully() {
63+
val expected = "a tay is a hammer".repeat(768).encodeToByteArray()
64+
val istream = factory.inputStream(expected)
65+
val actual = istream.readBytes()
66+
assertEquals(0, istream.available())
67+
assertEquals(-1, istream.read())
68+
assertContentEquals(expected, actual)
69+
}
70+
71+
@Test
72+
fun testReadOffset() {
73+
val expected = "a flix is a comb".repeat(1024).encodeToByteArray()
74+
val istream = factory.inputStream(expected)
75+
var offset = 0
76+
val actual = ByteArray(expected.size)
77+
while (offset < actual.size) {
78+
val len = minOf(16, actual.size - offset)
79+
val rc = istream.read(actual, offset, len)
80+
if (rc == -1) break
81+
offset += rc
82+
}
83+
84+
assertEquals(0, istream.available())
85+
assertEquals(-1, istream.read())
86+
assertContentEquals(expected, actual)
87+
}
88+
}

runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
package aws.smithy.kotlin.runtime.content
77

88
import aws.smithy.kotlin.runtime.testing.RandomTempFile
9-
import kotlinx.coroutines.ExperimentalCoroutinesApi
109
import kotlinx.coroutines.test.runTest
1110
import java.nio.file.Files
1211
import kotlin.test.*
1312

14-
@OptIn(ExperimentalCoroutinesApi::class)
1513
class ByteStreamJVMTest {
1614
@Test
17-
fun `file as byte stream validates start`() = runTest {
15+
fun testFileAsByteStreamValidatesStart() = runTest {
1816
val file = RandomTempFile(1024)
1917
val e = assertFailsWith<Throwable> {
2018
file.asByteStream(-1)
@@ -23,7 +21,7 @@ class ByteStreamJVMTest {
2321
}
2422

2523
@Test
26-
fun `file as byte stream validates end`() = runTest {
24+
fun testFileAsAByteStreamValidatesEnd() = runTest {
2725
val file = RandomTempFile(1024)
2826
val e = assertFailsWith<Throwable> {
2927
file.asByteStream(endInclusive = 1024)
@@ -32,7 +30,7 @@ class ByteStreamJVMTest {
3230
}
3331

3432
@Test
35-
fun `file as byte stream validates start and end`() = runTest {
33+
fun testFileAsByteStreamValidatesStartAndEnd() = runTest {
3634
val file = RandomTempFile(1024)
3735
val e = assertFailsWith<Throwable> {
3836
file.asByteStream(5, 1)
@@ -41,31 +39,31 @@ class ByteStreamJVMTest {
4139
}
4240

4341
@Test
44-
fun `file as byte stream has contentLength`() = runTest {
42+
fun testFileAsByteStreamHasContentLength() = runTest {
4543
val file = RandomTempFile(1024)
4644
val stream = file.asByteStream()
4745

4846
assertEquals(1024, stream.contentLength)
4947
}
5048

5149
@Test
52-
fun `partial file as byte stream has contentLength`() = runTest {
50+
fun testPartialFileAsByteStreamHasContentLength() = runTest {
5351
val file = RandomTempFile(1024)
5452
val stream = file.asByteStream(1, 1023)
5553

5654
assertEquals(1023, stream.contentLength)
5755
}
5856

5957
@Test
60-
fun `partial file as byte stream has contentLength with implicit end`() = runTest {
58+
fun testPartialFileAsByteStreamHasImplicitEnd() = runTest {
6159
val file = RandomTempFile(1024)
6260
val stream = file.asByteStream(1)
6361

6462
assertEquals(1023, stream.contentLength)
6563
}
6664

6765
@Test
68-
fun `file as byte stream matches read`() = runTest {
66+
fun testFileAsByteStreamRead() = runTest {
6967
val file = RandomTempFile(1024)
7068

7169
val expected = file.readBytes()
@@ -75,7 +73,7 @@ class ByteStreamJVMTest {
7573
}
7674

7775
@Test
78-
fun `partial file as byte stream matches read`() = runTest {
76+
fun testPartialFileAsByteStreamRead() = runTest {
7977
val file = RandomTempFile(1024)
8078

8179
val expected = file.readBytes()
@@ -87,7 +85,7 @@ class ByteStreamJVMTest {
8785
}
8886

8987
@Test
90-
fun `partial file as byte stream using range`() = runTest {
88+
fun testPartialFileRangeAsByteStreamRead() = runTest {
9189
val file = RandomTempFile(1024)
9290

9391
val expected = file.readBytes()
@@ -99,7 +97,7 @@ class ByteStreamJVMTest {
9997
}
10098

10199
@Test
102-
fun `partial path as byte stream`() = runTest {
100+
fun testPartialPathAsByteStreamRead() = runTest {
103101
val file = RandomTempFile(1024)
104102
val path = file.toPath()
105103

@@ -112,7 +110,7 @@ class ByteStreamJVMTest {
112110
}
113111

114112
@Test
115-
fun `partial path as byte stream using range`() = runTest {
113+
fun testPartialPathRangeAsByteStreamRead() = runTest {
116114
val file = RandomTempFile(1024)
117115
val path = file.toPath()
118116

@@ -125,15 +123,15 @@ class ByteStreamJVMTest {
125123
}
126124

127125
@Test
128-
fun `path as byte stream has contentLength`() = runTest {
126+
fun testPathAsByteStreamHasContentLength() = runTest {
129127
val path = RandomTempFile(1024).toPath()
130128
val stream = path.asByteStream()
131129

132130
assertEquals(1024, stream.contentLength)
133131
}
134132

135133
@Test
136-
fun `can create byte stream from empty file and path using createTempFile`() = runTest {
134+
fun testCanCreateByteStreamFromEmptyFileAndPathUsingTempFile() = runTest {
137135
val file = Files.createTempFile(null, null)
138136
val byteStream = file.asByteStream()
139137
assertEquals(0, byteStream.contentLength)
@@ -143,7 +141,7 @@ class ByteStreamJVMTest {
143141
}
144142

145143
@Test
146-
fun `can create byte stream from empty file and path using RandomTempFile`() = runTest {
144+
fun testCanCreateByteStreamFromEmptyFileAndPathUsingRandomFile() = runTest {
147145
val file = RandomTempFile(sizeInBytes = 0)
148146
val byteStream = file.asByteStream()
149147
assertEquals(0, byteStream.contentLength)

0 commit comments

Comments
 (0)