Skip to content

Commit 6fac0b2

Browse files
committed
grpc-native: Implement all packed fixed sized field decoding
Signed-off-by: Johannes Zottele <[email protected]>
1 parent f382589 commit 6fac0b2

File tree

5 files changed

+208
-76
lines changed

5 files changed

+208
-76
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/WireDecoder.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import kotlinx.io.Buffer
2828
* }
2929
* ```
3030
*/
31-
internal interface WireDecoder: AutoCloseable {
31+
internal interface WireDecoder : AutoCloseable {
3232
fun readTag(): KTag?
3333
fun readBool(): Boolean?
3434
fun readInt32(): Int?
@@ -47,6 +47,11 @@ internal interface WireDecoder: AutoCloseable {
4747
fun readString(): String?
4848
fun readBytes(): ByteArray?
4949
fun readPackedFixed32(): List<UInt>?
50+
fun readPackedFixed64(): List<ULong>?
51+
fun readPackedSFixed32(): List<Int>?
52+
fun readPackedSFixed64(): List<Long>?
53+
fun readPackedFloat(): List<Float>?
54+
fun readPackedDouble(): List<Double>?
5055
}
5156

5257
/**

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/WireEncoder.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import kotlinx.io.Sink
1414
*
1515
* [flush] must be called to ensure that all data is written to the [Sink].
1616
*/
17+
@OptIn(ExperimentalUnsignedTypes::class)
1718
internal interface WireEncoder {
19+
fun flush()
1820
fun writeBool(field: Int, value: Boolean): Boolean
1921
fun writeInt32(fieldNr: Int, value: Int): Boolean
2022
fun writeInt64(fieldNr: Int, value: Long): Boolean
@@ -29,10 +31,14 @@ internal interface WireEncoder {
2931
fun writeFloat(fieldNr: Int, value: Float): Boolean
3032
fun writeDouble(fieldNr: Int, value: Double): Boolean
3133
fun writeEnum(fieldNr: Int, value: Int): Boolean
32-
fun writeString(fieldNr: Int, value: String): Boolean
33-
fun flush()
3434
fun writeBytes(fieldNr: Int, value: ByteArray): Boolean
35+
fun writeString(fieldNr: Int, value: String): Boolean
3536
fun writePackedFixed32(fieldNr: Int, value: UIntArray): Boolean
37+
fun writePackedFixed64(fieldNr: Int, value: ULongArray): Boolean
38+
fun writePackedSFixed32(fieldNr: Int, value: IntArray): Boolean
39+
fun writePackedSFixed64(fieldNr: Int, value: LongArray): Boolean
40+
fun writePackedFloat(fieldNr: Int, value: FloatArray): Boolean
41+
fun writePackedDouble(fieldNr: Int, value: DoubleArray): Boolean
3642
}
3743

3844

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/WireDecoder.native.kt

Lines changed: 80 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import libprotowire.*
1111
import kotlin.experimental.ExperimentalNativeApi
1212
import kotlin.math.min
1313

14+
// maximum buffer size to allocate as contiguous memory in bytes
1415
private const val MAX_PACKED_BULK_SIZE: Int = 1_000_000
1516

1617
@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
17-
internal class WireDecoderNative(private val source: Buffer): WireDecoder {
18+
internal class WireDecoderNative(private val source: Buffer) : WireDecoder {
1819

1920
// wraps the source in a class that allows to pass data from the source buffer to the C++ encoder
2021
// without copying it to an intermediate byte array.
@@ -201,6 +202,48 @@ internal class WireDecoderNative(private val source: Buffer): WireDecoder {
201202
return bytes
202203
}
203204

205+
override fun readPackedFixed32() = readPackedFixedInternal(
206+
UInt.SIZE_BYTES,
207+
::UIntArray,
208+
Pinned<UIntArray>::addressOf,
209+
UIntArray::asList,
210+
)
211+
212+
override fun readPackedFixed64() = readPackedFixedInternal(
213+
ULong.SIZE_BYTES,
214+
::ULongArray,
215+
Pinned<ULongArray>::addressOf,
216+
ULongArray::asList,
217+
)
218+
219+
override fun readPackedSFixed32() = readPackedFixedInternal(
220+
Int.SIZE_BYTES,
221+
::IntArray,
222+
Pinned<IntArray>::addressOf,
223+
IntArray::asList,
224+
)
225+
226+
override fun readPackedSFixed64() = readPackedFixedInternal(
227+
Long.SIZE_BYTES,
228+
::LongArray,
229+
Pinned<LongArray>::addressOf,
230+
LongArray::asList,
231+
)
232+
233+
override fun readPackedFloat() = readPackedFixedInternal(
234+
Float.SIZE_BYTES,
235+
::FloatArray,
236+
Pinned<FloatArray>::addressOf,
237+
FloatArray::asList,
238+
)
239+
240+
override fun readPackedDouble() = readPackedFixedInternal(
241+
Double.SIZE_BYTES,
242+
::DoubleArray,
243+
Pinned<DoubleArray>::addressOf,
244+
DoubleArray::asList,
245+
)
246+
204247
/*
205248
* Based on the length of the packed repeated field, one of two list strategies is chosen.
206249
* If the length is less or equal a specific threshold (MAX_PACKED_BULK_SIZE),
@@ -210,39 +253,52 @@ internal class WireDecoderNative(private val source: Buffer): WireDecoder {
210253
*
211254
* Note that this implementation assumes a little endian memory order.
212255
*/
213-
override fun readPackedFixed32(): List<UInt>? {
256+
private inline fun <T : Any, R : Any> readPackedFixedInternal(
257+
sizeBytes: Int,
258+
crossinline createArray: (Int) -> R,
259+
crossinline getAddress: Pinned<R>.(Int) -> COpaquePointer,
260+
crossinline asList: (R) -> List<T>
261+
): List<T>? {
262+
// fetch the size of the packed repeated field
214263
var byteLen = readInt32() ?: return null
215264
if (byteLen < 0) return null
216265
if (source.size < byteLen) return null
217-
if (byteLen % UInt.SIZE_BYTES != 0 ) return null
218-
val count = byteLen / UInt.SIZE_BYTES
266+
if (byteLen % sizeBytes != 0) return null
219267
if (byteLen == 0) return emptyList()
220268

221-
if (count <= MAX_PACKED_BULK_SIZE) {
222-
// this implementation assumes that the program is running on little endian machines.
223-
val arr = UIntArray(count)
224-
arr.usePinned {
225-
pw_decoder_read_raw_bytes(raw, it.addressOf(0), byteLen)
226-
}
227-
return ArrayList(arr)
228-
} else {
229-
val bufByteLen = MAX_PACKED_BULK_SIZE
230-
val bufLen = bufByteLen / UInt.SIZE_BYTES
231-
val buffer = UIntArray(bufLen)
232-
var list = persistentListOf<UInt>()
233-
buffer.usePinned {
269+
// allocate the buffer array (has at most MAX_PACKED_BULK_SIZE bytes)
270+
val bufByteLen = minOf(byteLen, MAX_PACKED_BULK_SIZE)
271+
val bufElemCount = bufByteLen / sizeBytes
272+
val buffer = createArray(bufElemCount)
273+
274+
buffer.usePinned {
275+
val bufAddr = it.getAddress(0)
276+
277+
if (byteLen == bufByteLen) {
278+
// the whole packed field fits into the buffer -> copy into buffer and returns it as a list.
279+
pw_decoder_read_raw_bytes(raw, bufAddr, byteLen)
280+
return asList(buffer)
281+
} else {
282+
// the packed field is too large for the buffer, so we load it into a persistent list
283+
var chunkedList = persistentListOf<T>()
284+
234285
while (byteLen > 0) {
235-
val written = min(bufByteLen, byteLen)
236-
pw_decoder_read_raw_bytes(raw, it.addressOf(0), written)
237-
list = if (written == bufByteLen) {
238-
list.addAll(buffer)
286+
// copy data into the buffer.
287+
val copySize = min(bufByteLen, byteLen)
288+
pw_decoder_read_raw_bytes(raw, bufAddr, copySize)
289+
290+
// add buffer to the chunked list
291+
chunkedList = if (copySize == bufByteLen) {
292+
chunkedList.addAll(asList(buffer))
239293
} else {
240-
list.addAll(buffer.copyOfRange(0, written / UInt.SIZE_BYTES))
294+
chunkedList.addAll(asList(buffer).subList(0, copySize / sizeBytes))
241295
}
242-
byteLen -= written
296+
297+
byteLen -= copySize
243298
}
299+
300+
return chunkedList
244301
}
245-
return list
246302
}
247303
}
248304
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/WireEncoder.native.kt

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ package kotlinx.rpc.grpc.internal
77
import kotlinx.cinterop.*
88
import kotlinx.io.Sink
99
import libprotowire.*
10-
import kotlin.collections.isEmpty
1110
import kotlin.experimental.ExperimentalNativeApi
1211
import kotlin.native.ref.createCleaner
13-
import kotlin.text.isEmpty
1412

1513

1614
// TODO: Evaluate if we should implement a ZeroCopyOutputSink (similar to the ZeroCopyInputSource)
1715
// to reduce the number of copies during encoding.
1816
@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
19-
internal class WireEncoderNative(private val sink: Sink): WireEncoder {
17+
internal class WireEncoderNative(private val sink: Sink) : WireEncoder {
2018
/**
2119
* The context object provides a stable reference to the kotlin context.
2220
* This is required, as functions must be static and cannot capture environment references.
@@ -32,6 +30,7 @@ internal class WireEncoderNative(private val sink: Sink): WireEncoder {
3230

3331
// create context as a stable reference that can be passed to static function callback
3432
private val context = StableRef.create(this.Ctx())
33+
3534
// construct encoder with a callback that calls write() on this.context
3635
internal val raw = run {
3736
pw_encoder_new(context.asCPointer(), staticCFunction { ctx, buf, size ->
@@ -126,16 +125,46 @@ internal class WireEncoderNative(private val sink: Sink): WireEncoder {
126125
}
127126
}
128127

129-
override fun writePackedFixed32(fieldNr: Int, value: UIntArray): Boolean {
130-
if (value.isEmpty()) {
131-
return pw_encoder_write_bytes(raw, fieldNr, null, 0)
132-
}
133-
val bytes = value.size * UInt.SIZE_BYTES
134-
return value.usePinned {
135-
pw_encoder_write_bytes(raw, fieldNr, it.addressOf(0), bytes)
136-
}
137-
}
128+
override fun writePackedFixed32(fieldNr: Int, value: UIntArray) =
129+
writePackedInternal(fieldNr, value, UIntArray::size, UInt.SIZE_BYTES)
130+
{ it.addressOf(0) }
138131

132+
override fun writePackedFixed64(fieldNr: Int, value: ULongArray) =
133+
writePackedInternal(fieldNr, value, ULongArray::size, ULong.SIZE_BYTES)
134+
{ it.addressOf(0) }
135+
136+
override fun writePackedSFixed32(fieldNr: Int, value: IntArray) =
137+
writePackedInternal(fieldNr, value, IntArray::size, Int.SIZE_BYTES)
138+
{ it.addressOf(0) }
139+
140+
override fun writePackedSFixed64(fieldNr: Int, value: LongArray) =
141+
writePackedInternal(fieldNr, value, LongArray::size, Long.SIZE_BYTES)
142+
{ it.addressOf(0) }
143+
144+
override fun writePackedFloat(fieldNr: Int, value: FloatArray) =
145+
writePackedInternal(fieldNr, value, FloatArray::size, Float.SIZE_BYTES)
146+
{ it.addressOf(0) }
147+
148+
override fun writePackedDouble(fieldNr: Int, value: DoubleArray) =
149+
writePackedInternal(fieldNr, value, DoubleArray::size, Double.SIZE_BYTES)
150+
{ it.addressOf(0) }
139151
}
140152

141-
internal actual fun WireEncoder(sink: Sink): WireEncoder = WireEncoderNative(sink)
153+
internal actual fun WireEncoder(sink: Sink): WireEncoder = WireEncoderNative(sink)
154+
155+
156+
@OptIn(ExperimentalForeignApi::class)
157+
private inline fun <A : Any> WireEncoderNative.writePackedInternal(
158+
fieldNr: Int,
159+
value: A,
160+
crossinline sizeOf: A.() -> Int,
161+
sizeBytes: Int,
162+
crossinline ptr: (Pinned<A>) -> COpaquePointer
163+
): Boolean {
164+
val len = sizeOf(value)
165+
if (len == 0) return pw_encoder_write_bytes(raw, fieldNr, null, 0)
166+
val bytes = len * sizeBytes
167+
return value.usePinned { pinned ->
168+
pw_encoder_write_bytes(raw, fieldNr, ptr(pinned), bytes)
169+
}
170+
}

grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/WireCodecTest.kt

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,9 @@
55
package kotlinx.rpc.grpc.internal
66

77
import kotlinx.cinterop.ExperimentalForeignApi
8-
import kotlinx.cinterop.addressOf
9-
import kotlinx.cinterop.convert
10-
import kotlinx.cinterop.readBytes
11-
import kotlinx.cinterop.usePinned
128
import kotlinx.io.Buffer
13-
import platform.posix.memcpy
149
import kotlin.experimental.ExperimentalNativeApi
15-
import kotlin.test.Test
16-
import kotlin.test.assertEquals
17-
import kotlin.test.assertFalse
18-
import kotlin.test.assertNotNull
19-
import kotlin.test.assertNull
20-
import kotlin.test.assertTrue
10+
import kotlin.test.*
2111

2212
// TODO: Move this to the commonTest
2313
@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
@@ -624,31 +614,6 @@ class WireCodecTest {
624614
assertTrue(buffer.exhausted())
625615
}
626616

627-
@Test
628-
fun testPackedFixed32EncodeDecode() {
629-
val buffer = Buffer()
630-
val encoder = WireEncoder(buffer)
631-
632-
val ints = UIntArray(100000) { it.toUInt() }
633-
634-
assertTrue(encoder.writePackedFixed32(1, ints))
635-
encoder.flush()
636-
637-
val decoder = WireDecoder(buffer)
638-
val tag = decoder.readTag()
639-
assertNotNull(tag)
640-
assertEquals(1, tag.fieldNr)
641-
assertEquals(WireType.LENGTH_DELIMITED, tag.wireType)
642-
643-
val actualInts = decoder.readPackedFixed32()
644-
assertNotNull(actualInts)
645-
assertEquals(ints.size, actualInts.size)
646-
assertEquals(ints.toList(), actualInts)
647-
648-
decoder.close()
649-
assertTrue(buffer.exhausted())
650-
}
651-
652617
@Test
653618
fun testDoubleEncodeDecode() {
654619
val fieldNr = 21
@@ -698,4 +663,75 @@ class WireCodecTest {
698663
decoder.close()
699664
assertTrue(buffer.exhausted())
700665
}
666+
667+
668+
private inline fun <A, T> runPackedTest(
669+
arr: A,
670+
crossinline write: WireEncoder.(Int, A) -> Boolean,
671+
crossinline read: WireDecoder.() -> List<T>?,
672+
crossinline asList: (A) -> List<T>
673+
) {
674+
val buf = Buffer()
675+
with(WireEncoder(buf)) {
676+
assertTrue(write(1, arr))
677+
flush()
678+
}
679+
WireDecoder(buf).use { dec ->
680+
dec.readTag()!!.apply {
681+
assertEquals(1, fieldNr)
682+
assertEquals(WireType.LENGTH_DELIMITED, wireType)
683+
}
684+
val test = dec.read()
685+
assertEquals(asList(arr), test)
686+
}
687+
assertTrue(buf.exhausted())
688+
}
689+
690+
@Test
691+
fun testPackedFixed32() = runPackedTest(
692+
UIntArray(1_000_000) { UInt.MAX_VALUE + it.toUInt() },
693+
WireEncoder::writePackedFixed32,
694+
WireDecoder::readPackedFixed32,
695+
UIntArray::asList
696+
)
697+
698+
@Test
699+
fun testPackedFixed64() = runPackedTest(
700+
ULongArray(1_000_000) { UInt.MAX_VALUE + it.toULong() },
701+
WireEncoder::writePackedFixed64,
702+
WireDecoder::readPackedFixed64,
703+
ULongArray::asList
704+
)
705+
706+
@Test
707+
fun testPackedSFixed32() = runPackedTest(
708+
IntArray(1_000_000) { Int.MAX_VALUE + it },
709+
WireEncoder::writePackedSFixed32,
710+
WireDecoder::readPackedSFixed32,
711+
IntArray::asList
712+
)
713+
714+
@Test
715+
fun testPackedSFixed64() = runPackedTest(
716+
LongArray(1_000_000) { Long.MAX_VALUE + it },
717+
WireEncoder::writePackedSFixed64,
718+
WireDecoder::readPackedSFixed64,
719+
LongArray::asList
720+
)
721+
722+
@Test
723+
fun testPackedFloat() = runPackedTest(
724+
FloatArray(1_000_000) { it.toFloat() / 3.3f * ((it and 1) * 2 - 1) },
725+
WireEncoder::writePackedFloat,
726+
WireDecoder::readPackedFloat,
727+
FloatArray::asList
728+
)
729+
730+
@Test
731+
fun testPackedDouble() = runPackedTest(
732+
DoubleArray(1_000_000) { it.toDouble() / 3.3 * ((it and 1) * 2 - 1) },
733+
WireEncoder::writePackedDouble,
734+
WireDecoder::readPackedDouble,
735+
DoubleArray::asList
736+
)
701737
}

0 commit comments

Comments
 (0)