Skip to content

Commit 1128209

Browse files
committed
grpc-native: Fixes after rebase
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 64afb87 commit 1128209

File tree

3 files changed

+25
-18
lines changed

3 files changed

+25
-18
lines changed

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,10 @@
44

55
package kotlinx.rpc.grpc.internal
66

7-
import kotlinx.io.Source
87
import kotlinx.rpc.grpc.codec.MessageCodec
98
import kotlinx.rpc.internal.utils.InternalRpcApi
109
import kotlinx.rpc.protobuf.input.stream.InputStream
1110

12-
@InternalRpcApi
13-
internal actual val MethodDescriptor<*, *>.type: MethodType
14-
get() = TODO("Not yet implemented")
15-
1611
@InternalRpcApi
1712
public actual class MethodDescriptor<Request, Response> internal constructor(
1813
private val fullMethodName: String,
@@ -72,23 +67,21 @@ public actual fun <Request, Response> methodDescriptor(
7267
): MethodDescriptor<Request, Response> {
7368
val requestMarshaller = object : MethodDescriptor.Marshaller<Request> {
7469
override fun stream(value: Request): InputStream {
75-
val source = requestCodec.encode(value)
76-
return object : InputStream(source) {}
70+
return requestCodec.encode(value)
7771
}
7872

7973
override fun parse(stream: InputStream): Request {
80-
return requestCodec.decode(stream.source)
74+
return requestCodec.decode(stream)
8175
}
8276
}
8377

8478
val responseMarshaller = object : MethodDescriptor.Marshaller<Response> {
8579
override fun stream(value: Response): InputStream {
86-
val source = responseCodec.encode(value)
87-
return object : InputStream(source) {}
80+
return responseCodec.encode(value)
8881
}
8982

9083
override fun parse(stream: InputStream): Response {
91-
return responseCodec.decode(stream.source)
84+
return responseCodec.decode(stream)
9285
}
9386
}
9487

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import kotlinx.atomicfu.atomic
1111
import kotlinx.cinterop.*
1212
import kotlinx.coroutines.CancellationException
1313
import kotlinx.coroutines.CompletableJob
14-
import kotlinx.io.Buffer
1514
import kotlinx.rpc.grpc.GrpcTrailers
1615
import kotlinx.rpc.grpc.Status
1716
import kotlinx.rpc.grpc.StatusCode
17+
import kotlinx.rpc.protobuf.input.stream.asInputStream
18+
import kotlinx.rpc.protobuf.input.stream.asSource
1819
import libgrpcpp_c.*
1920
import kotlin.experimental.ExperimentalNativeApi
2021
import kotlin.native.ref.createCleaner
@@ -228,7 +229,7 @@ internal class NativeClientCall<Request, Response>(
228229

229230
val arena = Arena()
230231
val inputStream = methodDescriptor.getRequestMarshaller().stream(message)
231-
val byteBuffer = (inputStream.source as Buffer).toGrpcByteBuffer()
232+
val byteBuffer = inputStream.asSource().toGrpcByteBuffer()
232233
val op = arena.alloc<grpc_op> {
233234
op = GRPC_OP_SEND_MESSAGE
234235
data.send_message.send_message = byteBuffer

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,29 @@
77
package kotlinx.rpc.grpc.internal
88

99
import kotlinx.cinterop.*
10-
import kotlinx.io.Buffer
11-
import kotlinx.io.Source
12-
import kotlinx.io.UnsafeIoApi
10+
import kotlinx.io.*
1311
import kotlinx.io.unsafe.UnsafeBufferOperations
1412
import kotlinx.rpc.grpc.StatusCode
1513
import libgrpcpp_c.*
1614
import platform.posix.memcpy
1715

16+
@OptIn(ExperimentalForeignApi::class, InternalIoApi::class, UnsafeIoApi::class)
17+
internal fun Sink.writeFully(buffer: CPointer<ByteVar>, offset: Long, length: Long) {
18+
var consumed = 0L
19+
while (consumed < length) {
20+
UnsafeBufferOperations.writeToTail(this.buffer, 1) { array, start, endExclusive ->
21+
val size = minOf(length - consumed, (endExclusive - start).toLong())
22+
23+
array.usePinned {
24+
memcpy(it.addressOf(start), buffer + offset + consumed, size.convert())
25+
}
26+
27+
consumed += size
28+
size.toInt()
29+
}
30+
}
31+
}
32+
1833
internal suspend fun withArena(block: suspend (Arena) -> Unit) =
1934
Arena().let { arena ->
2035
try {
@@ -24,8 +39,6 @@ internal suspend fun withArena(block: suspend (Arena) -> Unit) =
2439
}
2540
}
2641

27-
internal fun Buffer.asInputStream(): InputStream = object : InputStream(this) {}
28-
2942
internal fun CPointer<grpc_byte_buffer>.toKotlin(destroy: Boolean = true): Buffer = memScoped {
3043
val reader = alloc<grpc_byte_buffer_reader>()
3144
check(grpc_byte_buffer_reader_init(reader.ptr, this@toKotlin) == 1)

0 commit comments

Comments
 (0)