Skip to content

Commit 34c9244

Browse files
Jozott00Mr3zee
authored andcommitted
grpc-native: Add packed Fixed32 encoding and decoding
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 47e7eff commit 34c9244

File tree

7 files changed

+122
-10
lines changed

7 files changed

+122
-10
lines changed

grpc/grpc-core/build.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ kotlin {
6161
}
6262
}
6363

64+
nativeMain {
65+
dependencies {
66+
implementation(libs.kotlinx.collections.immutable)
67+
}
68+
}
69+
6470
nativeTest {
6571
dependencies {
6672
implementation(kotlin("test"))

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/WireDecoder.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ internal interface WireDecoder: AutoCloseable {
4646
fun readEnum(): Int?
4747
fun readString(): String?
4848
fun readBytes(): ByteArray?
49+
fun readPackedFixed32(): List<UInt>?
4950
}
5051

5152
/**

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/WireEncoder.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ internal interface WireEncoder {
3232
fun writeString(fieldNr: Int, value: String): Boolean
3333
fun flush()
3434
fun writeBytes(fieldNr: Int, value: ByteArray): Boolean
35+
fun writePackedFixed32(fieldNr: Int, value: UIntArray): Boolean
3536
}
3637

3738

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/WireDecoder.native.kt

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
package kotlinx.rpc.grpc.internal
66

77
import kotlinx.cinterop.*
8+
import kotlinx.collections.immutable.persistentListOf
89
import kotlinx.io.Buffer
910
import libprotowire.*
1011
import kotlin.experimental.ExperimentalNativeApi
12+
import kotlin.math.min
13+
14+
private const val MAX_PACKED_BULK_SIZE: Int = 1_000_000
1115

1216
@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
13-
internal class WireDecoderNative(source: Buffer): WireDecoder {
17+
internal class WireDecoderNative(private val source: Buffer): WireDecoder {
1418

1519
// wraps the source in a class that allows to pass data from the source buffer to the C++ encoder
1620
// without copying it to an intermediate byte array.
@@ -180,18 +184,67 @@ internal class WireDecoderNative(source: Buffer): WireDecoder {
180184
}
181185
}
182186

183-
// TODO: Should readBytes return a buffer? The current approach is dangerous as one could send a
184-
// huge length (max 2GB) and we would just allocate the array of that length.
187+
// TODO: Should readBytes return a buffer, to prevent allocation of large contiguous memory blocks ? KRPC-182
185188
override fun readBytes(): ByteArray? {
186189
val length = readInt32() ?: return null
187190
if (length < 0) return null
191+
// check if the remaining buffer size is less than the set length,
192+
// we can early abort, without allocating unnecessary memory
193+
if (source.size < length) return null
188194
if (length == 0) return ByteArray(0)
189195
val bytes = ByteArray(length)
196+
var ok = true
190197
bytes.usePinned {
191-
pw_decoder_read_raw_bytes(raw, it.addressOf(0), length)
198+
ok = pw_decoder_read_raw_bytes(raw, it.addressOf(0), length)
192199
}
200+
if (!ok) return null
193201
return bytes
194202
}
203+
204+
/*
205+
* Based on the length of the packed repeated field, one of two list strategies is chosen.
206+
* If the length is less or equal a specific threshold (MAX_PACKED_BULK_SIZE),
207+
* a single array list is filled with the buffer-packed value (two copies).
208+
* Otherwise, a kotlinx.collections.immutable.PersistentList is used to split allocation in several chunks.
209+
* To build the persistent list, a buffer array is allocated that is used for fast copy from C++ to Kotlin.
210+
*
211+
* Note that this implementation assumes a little endian memory order.
212+
*/
213+
override fun readPackedFixed32(): List<UInt>? {
214+
var byteLen = readInt32() ?: return null
215+
if (byteLen < 0) return null
216+
if (source.size < byteLen) return null
217+
if (byteLen % UInt.SIZE_BYTES != 0 ) return null
218+
val count = byteLen / UInt.SIZE_BYTES
219+
if (byteLen == 0) return emptyList()
220+
221+
if (count <= MAX_PACKED_BULK_SIZE) {
222+
// this implementation assumes that the program is running on little endian machines.
223+
val arr = UIntArray(count)
224+
arr.usePinned {
225+
pw_decoder_read_raw_bytes(raw, it.addressOf(0), byteLen)
226+
}
227+
return ArrayList(arr)
228+
} else {
229+
val bufByteLen = MAX_PACKED_BULK_SIZE
230+
val bufLen = bufByteLen / UInt.SIZE_BYTES
231+
val buffer = UIntArray(bufLen)
232+
var list = persistentListOf<UInt>()
233+
buffer.usePinned {
234+
while (byteLen > 0) {
235+
val written = min(bufByteLen, byteLen)
236+
pw_decoder_read_raw_bytes(raw, it.addressOf(0), written)
237+
list = if (written == bufByteLen) {
238+
list.addAll(buffer)
239+
} else {
240+
list.addAll(buffer.copyOfRange(0, written / UInt.SIZE_BYTES))
241+
}
242+
byteLen -= written
243+
}
244+
}
245+
return list
246+
}
247+
}
195248
}
196249

197250
internal actual fun WireDecoder(source: Buffer): WireDecoder = WireDecoderNative(source)

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/WireEncoder.native.kt

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.rpc.grpc.internal
77
import kotlinx.cinterop.*
88
import kotlinx.io.Sink
99
import libprotowire.*
10+
import kotlin.collections.isEmpty
1011
import kotlin.experimental.ExperimentalNativeApi
1112
import kotlin.native.ref.createCleaner
1213
import kotlin.text.isEmpty
@@ -48,6 +49,10 @@ internal class WireEncoderNative(private val sink: Sink): WireEncoder {
4849
pw_encoder_delete(it)
4950
}
5051

52+
override fun flush() {
53+
pw_encoder_flush(raw)
54+
}
55+
5156
override fun writeBool(field: Int, value: Boolean): Boolean {
5257
return pw_encoder_write_bool(raw, field, value)
5358
}
@@ -104,13 +109,12 @@ internal class WireEncoderNative(private val sink: Sink): WireEncoder {
104109
return pw_encoder_write_enum(raw, fieldNr, value)
105110
}
106111

107-
override fun writeString(fieldNr: Int, value: String): Boolean {
112+
override fun writeString(fieldNr: Int, value: String): Boolean = memScoped {
108113
if (value.isEmpty()) {
109114
return pw_encoder_write_string(raw, fieldNr, null, 0)
110115
}
111-
return value.usePinned {
112-
pw_encoder_write_string(raw, fieldNr, it.addressOf(0).reinterpret(), value.length)
113-
}
116+
val cStr = value.cstr
117+
return pw_encoder_write_string(raw, fieldNr, cStr.ptr, cStr.size)
114118
}
115119

116120
override fun writeBytes(fieldNr: Int, value: ByteArray): Boolean {
@@ -122,9 +126,16 @@ internal class WireEncoderNative(private val sink: Sink): WireEncoder {
122126
}
123127
}
124128

125-
override fun flush() {
126-
pw_encoder_flush(raw)
129+
override fun writePackedFixed32(fieldNr: Int, value: UIntArray): Boolean {
130+
if (value.isEmpty()) {
131+
return pw_encoder_write_bytes(raw, fieldNr, null, 0)
132+
}
133+
val bytes = value.size * UInt.SIZE_BYTES
134+
return value.usePinned {
135+
pw_encoder_write_bytes(raw, fieldNr, it.addressOf(0), bytes)
136+
}
127137
}
138+
128139
}
129140

130141
internal actual fun WireEncoder(sink: Sink): WireEncoder = WireEncoderNative(sink)

grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/WireCodecTest.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
package kotlinx.rpc.grpc.internal
66

77
import kotlinx.cinterop.ExperimentalForeignApi
8+
import kotlinx.cinterop.addressOf
9+
import kotlinx.cinterop.convert
10+
import kotlinx.cinterop.readBytes
11+
import kotlinx.cinterop.usePinned
812
import kotlinx.io.Buffer
13+
import platform.posix.memcpy
914
import kotlin.experimental.ExperimentalNativeApi
1015
import kotlin.test.Test
1116
import kotlin.test.assertEquals
@@ -609,6 +614,39 @@ class WireCodecTest {
609614
assertNotNull(tag)
610615
assertEquals(1, tag.fieldNr)
611616
assertEquals(WireType.LENGTH_DELIMITED, tag.wireType)
617+
618+
val actualBytes = decoder.readBytes()
619+
assertNotNull(actualBytes)
620+
assertEquals(1000000, actualBytes.size)
621+
assertTrue(bytes.contentEquals(actualBytes))
622+
623+
decoder.close()
624+
assertTrue(buffer.exhausted())
625+
}
626+
627+
@Test
628+
fun testPackedFixed32EncodeDecode() {
629+
val buffer = Buffer()
630+
val encoder = WireEncoder(buffer)
631+
632+
val ints = UIntArray(100000) { it.toUInt() }
633+
634+
assertTrue(encoder.writePackedFixed32(1, ints))
635+
encoder.flush()
636+
637+
val decoder = WireDecoder(buffer)
638+
val tag = decoder.readTag()
639+
assertNotNull(tag)
640+
assertEquals(1, tag.fieldNr)
641+
assertEquals(WireType.LENGTH_DELIMITED, tag.wireType)
642+
643+
val actualInts = decoder.readPackedFixed32()
644+
assertNotNull(actualInts)
645+
assertEquals(ints.size, actualInts.size)
646+
assertEquals(ints.toList(), actualInts)
647+
648+
decoder.close()
649+
assertTrue(buffer.exhausted())
612650
}
613651

614652
@Test

versions-root/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ intellij = "241.19416.19"
2121
gradle-doctor = "0.11.0"
2222
kotlinx-browser = "0.3"
2323
kotlinx-io = "0.8.0"
24+
kotlinx-collections = "0.4.0"
2425
dokka = "2.0.0"
2526
puppeteer = "24.9.0"
2627
atomicfu = "0.29.0"
@@ -61,6 +62,7 @@ serialization-plugin = { module = "org.jetbrains.kotlin:kotlin-serialization-com
6162
serialization-plugin-forIde = { module = "org.jetbrains.kotlin:kotlinx-serialization-compiler-plugin-for-ide", version.ref = "kotlin-compiler" }
6263
kotlinx-browser = { module = "org.jetbrains.kotlinx:kotlinx-browser", version.ref = "kotlinx-browser" }
6364
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" }
65+
kotlinx-collections-immutable = { module = "org.jetbrains.kotlinx:kotlinx-collections-immutable", version.ref = "kotlinx-collections"}
6466

6567
# serialization
6668
serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "serialization" }

0 commit comments

Comments
 (0)