From 04ca746c0882fd9180582629798677f844c90959 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Tue, 2 Sep 2025 15:44:13 +0200 Subject: [PATCH 01/10] grpc-native: Add server support on native Signed-off-by: Johannes Zottele --- .../kotlinx/rpc/grpc/ManagedChannel.native.kt | 10 +- .../grpc/ServerServiceDefinition.native.kt | 15 ++- .../rpc/grpc/internal/CompletionQueue.kt | 28 +++++- .../rpc/grpc/internal/NativeManagedChannel.kt | 25 +++-- .../kotlinx/rpc/grpc/internal/NativeServer.kt | 91 +++++++++++++++++++ .../internal/ServerMethodDefinition.native.kt | 11 ++- .../grpc/internal/ServiceDescriptor.native.kt | 20 ++-- 7 files changed, 164 insertions(+), 36 deletions(-) create mode 100644 grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt index be43d6afc..766ffbb2b 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt @@ -6,7 +6,11 @@ package kotlinx.rpc.grpc -import kotlinx.rpc.grpc.internal.* +import kotlinx.rpc.grpc.internal.GrpcChannel +import kotlinx.rpc.grpc.internal.GrpcChannelCredentials +import kotlinx.rpc.grpc.internal.GrpcInsecureChannelCredentials +import kotlinx.rpc.grpc.internal.NativeManagedChannel +import kotlinx.rpc.grpc.internal.internalError /** * Same as [ManagedChannel], but is platform-exposed. @@ -25,10 +29,10 @@ public actual abstract class ManagedChannelBuilder> internal class NativeManagedChannelBuilder( private val target: String, ) : ManagedChannelBuilder() { - private var credentials: GrpcCredentials? = null + private var credentials: GrpcChannelCredentials? = null override fun usePlaintext(): NativeManagedChannelBuilder { - credentials = GrpcInsecureCredentials() + credentials = GrpcInsecureChannelCredentials() return this } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt index 2d5da178c..5559469f8 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt @@ -8,14 +8,13 @@ import kotlinx.rpc.grpc.internal.ServerMethodDefinition import kotlinx.rpc.grpc.internal.ServiceDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi -public actual class ServerServiceDefinition { - public actual fun getServiceDescriptor(): ServiceDescriptor { - TODO("Not yet implemented") - } +public actual class ServerServiceDefinition internal constructor( + private val serviceDescriptor: ServiceDescriptor, + private val methods: Collection>, +) { + public actual fun getServiceDescriptor(): ServiceDescriptor = serviceDescriptor - public actual fun getMethods(): Collection> { - TODO("Not yet implemented") - } + public actual fun getMethods(): Collection> = methods } @InternalRpcApi @@ -23,5 +22,5 @@ public actual fun serverServiceDefinition( serviceDescriptor: ServiceDescriptor, methods: Collection>, ): ServerServiceDefinition { - TODO("Not yet implemented") + return ServerServiceDefinition(serviceDescriptor, methods) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt index 799964abc..8d38a22db 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt @@ -9,8 +9,30 @@ package kotlinx.rpc.grpc.internal import kotlinx.atomicfu.atomic import kotlinx.atomicfu.locks.SynchronizedObject import kotlinx.atomicfu.locks.synchronized -import kotlinx.cinterop.* -import libkgrpc.* +import kotlinx.cinterop.CFunction +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.alloc +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.convert +import kotlinx.cinterop.free +import kotlinx.cinterop.nativeHeap +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.sizeOf +import kotlinx.cinterop.staticCFunction +import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT +import libkgrpc.grpc_call_error +import libkgrpc.grpc_call_start_batch +import libkgrpc.grpc_completion_queue_create_for_callback +import libkgrpc.grpc_completion_queue_destroy +import libkgrpc.grpc_completion_queue_functor +import libkgrpc.grpc_completion_queue_shutdown +import libkgrpc.grpc_op +import libkgrpc.kgrpc_cb_tag +import libkgrpc.kgrpc_iomgr_run_in_background import platform.posix.memset import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner @@ -84,7 +106,7 @@ internal class CompletionQueue { @Suppress("unused") private val shutdownFunctorCleaner = createCleaner(shutdownFunctor) { nativeHeap.free(it) } - + init { // Assert grpc_iomgr_run_in_background() to guarantee that the event manager provides // IO threads and supports the callback API. diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt index 3f03b92f1..d8fbb9e98 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt @@ -11,19 +11,30 @@ import cnames.structs.grpc_channel_credentials import kotlinx.atomicfu.atomic import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.coroutines.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.ManagedChannel import kotlinx.rpc.grpc.ManagedChannelPlatform -import libkgrpc.* +import libkgrpc.GPR_CLOCK_REALTIME +import libkgrpc.GRPC_PROPAGATE_DEFAULTS +import libkgrpc.gpr_inf_future +import libkgrpc.grpc_channel_create +import libkgrpc.grpc_channel_create_call +import libkgrpc.grpc_channel_credentials_release +import libkgrpc.grpc_channel_destroy +import libkgrpc.grpc_insecure_credentials_create import kotlin.coroutines.cancellation.CancellationException import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner import kotlin.time.Duration /** - * Wrapper for [grpc_channel_credentials]. + * Wrapper for [cnames.structs.grpc_channel_credentials]. */ -internal sealed class GrpcCredentials( +internal sealed class GrpcChannelCredentials( internal val raw: CPointer, ) { val rawCleaner = createCleaner(raw) { @@ -34,8 +45,8 @@ internal sealed class GrpcCredentials( /** * Insecure credentials. */ -internal class GrpcInsecureCredentials() : - GrpcCredentials(grpc_insecure_credentials_create() ?: error("Failed to create credentials")) +internal class GrpcInsecureChannelCredentials() : + GrpcChannelCredentials(grpc_insecure_credentials_create() ?: error("Failed to create channel credentials")) /** @@ -47,7 +58,7 @@ internal class GrpcInsecureCredentials() : internal class NativeManagedChannel( target: String, // we must store them, otherwise the credentials are getting released - credentials: GrpcCredentials, + credentials: GrpcChannelCredentials, ) : ManagedChannel, ManagedChannelPlatform() { // a reference to make sure the grpc_init() was called. (it is released after shutdown) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt new file mode 100644 index 000000000..a03970791 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_server +import cnames.structs.grpc_server_credentials +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.rpc.grpc.Server +import libkgrpc.grpc_insecure_server_credentials_create +import libkgrpc.grpc_server_add_http2_port +import libkgrpc.grpc_server_create +import libkgrpc.grpc_server_credentials_release +import libkgrpc.grpc_server_destroy +import libkgrpc.grpc_server_register_completion_queue +import libkgrpc.grpc_server_start +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner +import kotlin.time.Duration + +/** + * Wrapper for [cnames.structs.grpc_server_credentials]. + */ +internal sealed class GrpcServerCredentials( + internal val raw: CPointer, +) { + val rawCleaner = createCleaner(raw) { + grpc_server_credentials_release(it) + } +} + +/** + * Insecure credentials. + */ +internal class GrpcInsecureServerCredentials() : + GrpcServerCredentials(grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials")) + + +internal class NativeServer( + override val port: Int, + @Suppress("Redundant") + private val credentials: GrpcServerCredentials, +) : Server { + + // a reference to make sure the grpc_init() was called. (it is released after shutdown) + @Suppress("unused") + private val rt = GrpcRuntime.acquire() + + private val cq = CompletionQueue() + + val raw: CPointer = grpc_server_create(null, null)!! + + @Suppress("unused") + private val rawCleaner = createCleaner(raw) { + grpc_server_destroy(it) + } + + init { + grpc_server_register_completion_queue(raw, cq.raw, null) + grpc_server_add_http2_port(raw, "0.0.0.0:$port", credentials.raw) + } + + override val isShutdown: Boolean + get() { + TODO() + } + override val isTerminated: Boolean + get() { + TODO() + } + + override fun start(): Server { + grpc_server_start(raw) + } + + override fun shutdown(): Server { + TODO("Not yet implemented") + } + + override fun shutdownNow(): Server { + TODO("Not yet implemented") + } + + override suspend fun awaitTermination(duration: Duration): Server { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt index 3da552b02..b0e625840 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt @@ -7,13 +7,16 @@ package kotlinx.rpc.grpc.internal import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual class ServerMethodDefinition { +public actual class ServerMethodDefinition internal constructor( + private val methodDescriptor: MethodDescriptor, + private val serverCallHandler: ServerCallHandler, +) { public actual fun getMethodDescriptor(): MethodDescriptor { - TODO("Not yet implemented") + return methodDescriptor } public actual fun getServerCallHandler(): ServerCallHandler { - TODO("Not yet implemented") + return serverCallHandler } } @@ -21,5 +24,5 @@ public actual fun serverMethodDefinition( descriptor: MethodDescriptor, handler: ServerCallHandler, ): ServerMethodDefinition { - TODO("Not yet implemented") + return ServerMethodDefinition(descriptor, handler) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt index bf2415a08..ce890ae91 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt @@ -7,18 +7,16 @@ package kotlinx.rpc.grpc.internal import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual class ServiceDescriptor { - public actual fun getName(): String { - TODO("Not yet implemented") - } +public actual class ServiceDescriptor internal constructor( + private val name: String, + private val methods: Collection>, + private val schemaDescriptor: Any?, +) { + public actual fun getName(): String = name - public actual fun getMethods(): Collection> { - TODO("Not yet implemented") - } + public actual fun getMethods(): Collection> = methods - public actual fun getSchemaDescriptor(): Any? { - TODO("Not yet implemented") - } + public actual fun getSchemaDescriptor(): Any? = schemaDescriptor } @InternalRpcApi @@ -27,5 +25,5 @@ public actual fun serviceDescriptor( methods: Collection>, schemaDescriptor: Any?, ): ServiceDescriptor { - TODO("Not yet implemented") + return ServiceDescriptor(name, methods, schemaDescriptor) } From a76724c52573f1d04018f026183b24df6a69380b Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Thu, 4 Sep 2025 15:11:37 +0200 Subject: [PATCH 02/10] grpc-native: Working native server implementation Signed-off-by: Johannes Zottele --- cinterop-c/include/kgrpc.h | 66 +++- cinterop-c/src/kgrpc.cpp | 50 ++- cinterop-c/tools/collect_headers.bzl | 6 +- .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 18 +- .../kotlin/kotlinx/rpc/grpc/Server.native.kt | 38 ++- .../rpc/grpc/internal/AbstractNativeCall.kt | 75 +++++ .../rpc/grpc/internal/CompletionQueue.kt | 31 +- .../rpc/grpc/internal/NativeClientCall.kt | 38 ++- .../rpc/grpc/internal/NativeManagedChannel.kt | 3 +- .../kotlinx/rpc/grpc/internal/NativeServer.kt | 168 +++++++++- .../rpc/grpc/internal/NativeServerCall.kt | 308 ++++++++++++++++++ .../grpc/internal/ServerCallbackRequest.kt | 115 +++++++ .../kotlin/kotlinx/rpc/grpc/internal/utils.kt | 57 +++- 13 files changed, 927 insertions(+), 46 deletions(-) create mode 100644 grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt create mode 100644 grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt create mode 100644 grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt diff --git a/cinterop-c/include/kgrpc.h b/cinterop-c/include/kgrpc.h index 33536fe48..f401dda39 100644 --- a/cinterop-c/include/kgrpc.h +++ b/cinterop-c/include/kgrpc.h @@ -24,13 +24,71 @@ typedef struct { } kgrpc_cb_tag; - /* - * Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped. +// This is a duplicate of the RegisteredCallAllocation, which is defined in +// https://github.com/grpc/grpc/blob/893bdadd56dbb75fb156175afdaa2b0d47e1c15b/src/core/server/server.h#L150-L157. +// This is required, as RegisteredCallAllocation is not part of the exposed C API. +typedef struct { + void *tag; + grpc_call **call; + grpc_metadata_array *initial_metadata; + gpr_timespec *deadline; + grpc_byte_buffer **optional_payload; + grpc_completion_queue *cq; +} kgrpc_registered_call_allocation; + +typedef struct { + void* tag; + grpc_call** call; + grpc_metadata_array* initial_metadata; + grpc_call_details* details; + grpc_completion_queue* cq; +} kgrpc_batch_call_allocation; + +typedef kgrpc_registered_call_allocation (*kgrpc_registered_call_allocator)(void* ctx); +typedef kgrpc_batch_call_allocation (*kgrpc_batch_call_allocator)(void* ctx); + +/* +* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped. +*/ +bool kgrpc_iomgr_run_in_background(); + +/** + * Registers a C-style allocator callback for accepting gRPC calls to a specific method. + * + * Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable + * callback-driven method dispatch via the Core C API. + * + * When the gRPC Core needs to accept a new call for the specified method, it invokes: + * kgrpc_registered_call_allocation alloc = allocator(); + * to retrieve the accept context, including `tag`, `grpc_call*`, metadata, deadline, + * optional payload, and the completion queue. + * + * This omits the need of handling pre-registering requests using `grpc_server_request_registered_call`. + * + * @param server The gRPC C `grpc_server*` instance. + * @param cq A callback-style `grpc_completion_queue*` (must be registered earlier). + * @param method_tag Opaque identifier from `grpc_server_register_method()` for the RPC method. + * @param allocator_ctx The context for the callback to pass all necessary objects to the static function. + * @param allocator Function providing new accept contexts (`kgrpc_registered_call_allocation`). */ - bool kgrpc_iomgr_run_in_background(); +void kgrpc_server_set_register_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *method_tag, + void *allocator_ctx, + kgrpc_registered_call_allocator allocator +); + +void kgrpc_server_set_batch_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *allocator_ctx, + kgrpc_batch_call_allocator allocator +); + #ifdef __cplusplus - } +} #endif #endif //GRPCPP_C_H diff --git a/cinterop-c/src/kgrpc.cpp b/cinterop-c/src/kgrpc.cpp index e2be28299..a1a979f37 100644 --- a/cinterop-c/src/kgrpc.cpp +++ b/cinterop-c/src/kgrpc.cpp @@ -2,12 +2,56 @@ #include #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/server/server.h" extern "C" { - bool kgrpc_iomgr_run_in_background() { - return grpc_iomgr_run_in_background(); - } +bool kgrpc_iomgr_run_in_background() { + return grpc_iomgr_run_in_background(); +} + +void kgrpc_server_set_register_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *method_tag, + void *allocator_ctx, + kgrpc_registered_call_allocator allocator +) { + grpc_core::Server::FromC(server)->SetRegisteredMethodAllocator( + cq, + method_tag, + [allocator_ctx, allocator] { + auto result = allocator(allocator_ctx); + return grpc_core::Server::RegisteredCallAllocation{ + .tag = result.tag, + .call = result.call, + .initial_metadata = result.initial_metadata, + .deadline = result.deadline, + .optional_payload = result.optional_payload, + .cq = result.cq, + }; + }); +} + +void kgrpc_server_set_batch_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *allocator_ctx, + kgrpc_batch_call_allocator allocator +) { + grpc_core::Server::FromC(server)->SetBatchMethodAllocator( + cq, + [allocator_ctx, allocator] { + auto result = allocator(allocator_ctx); + return grpc_core::Server::BatchCallAllocation{ + .tag = result.tag, + .call = result.call, + .initial_metadata = result.initial_metadata, + .details = result.details, + .cq = result.cq, + }; + }); +} } diff --git a/cinterop-c/tools/collect_headers.bzl b/cinterop-c/tools/collect_headers.bzl index 46c3dbaef..cda0fe067 100644 --- a/cinterop-c/tools/collect_headers.bzl +++ b/cinterop-c/tools/collect_headers.bzl @@ -84,12 +84,8 @@ include_dir = rule( def _cc_headers_only_impl(ctx): dep_cc = ctx.attr.dep[CcInfo].compilation_context - - # keep only source headers; this skips generated headers and their actions. - all_hdrs = dep_cc.headers.to_list() - src_hdrs = [f for f in all_hdrs if getattr(f, "is_source", False)] cc_ctx = cc_common.create_compilation_context( - headers = depset(src_hdrs), + headers = dep_cc.headers, includes = dep_cc.includes, quote_includes = dep_cc.quote_includes, system_includes = dep_cc.system_includes, diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 399b9c5ac..5135cd769 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -8,8 +8,20 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout -import kotlinx.rpc.grpc.* -import kotlinx.rpc.grpc.internal.* +import kotlinx.rpc.grpc.GrpcServer +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.ManagedChannel +import kotlinx.rpc.grpc.ManagedChannelBuilder +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.grpc.buildChannel +import kotlinx.rpc.grpc.internal.ClientCall +import kotlinx.rpc.grpc.internal.GrpcDefaultCallOptions +import kotlinx.rpc.grpc.internal.MethodDescriptor +import kotlinx.rpc.grpc.internal.MethodType +import kotlinx.rpc.grpc.internal.clientCallListener +import kotlinx.rpc.grpc.internal.methodDescriptor +import kotlinx.rpc.grpc.statusCode import kotlinx.rpc.registerService import kotlin.test.Test import kotlin.test.assertEquals @@ -57,7 +69,7 @@ class GrpcCoreClientTest { } @Test - fun normalUnaryCall_ok() = repeat(1000) { + fun normalUnaryCall_ok() = repeat(10000) { val channel = createChannel() val call = channel.newHelloCall() val req = helloReq() diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt index 8d7ee951d..63e9ac47d 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt @@ -4,6 +4,9 @@ package kotlinx.rpc.grpc +import kotlinx.rpc.grpc.internal.GrpcServerCredentials +import kotlinx.rpc.grpc.internal.NativeServer + /** * Platform-specific gRPC server builder. */ @@ -11,12 +14,43 @@ public actual abstract class ServerBuilder> { public actual abstract fun addService(service: ServerServiceDefinition): T public actual abstract fun fallbackHandlerRegistry(registry: HandlerRegistry?): T + + public abstract fun build(): Server +} + +private class NativeServerBuilder( + val port: Int, +) : ServerBuilder() { + + // TODO: Add actual credentials + private val credentials = GrpcServerCredentials.createInsecure() + private val services = mutableListOf() + + override fun addService(service: ServerServiceDefinition): NativeServerBuilder { + services.add(service) + return this + } + + override fun fallbackHandlerRegistry(registry: HandlerRegistry?): NativeServerBuilder { + TODO("Not yet implemented") + } + + override fun build(): Server { + val server = NativeServer(port, credentials) + + for (service in services) { + server.addService(service) + } + + return server + } + } internal actual fun ServerBuilder(port: Int): ServerBuilder<*> { - error("Native target is not supported in gRPC") + return NativeServerBuilder(port) } internal actual fun Server(builder: ServerBuilder<*>): Server { - error("Native target is not supported in gRPC") + return builder.build() } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt new file mode 100644 index 000000000..9c6bfa662 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt @@ -0,0 +1,75 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import libkgrpc.grpc_op +import libkgrpc.grpc_status_code + +internal abstract class AbstractNativeCall( + val raw: CPointer, + val cq: CompletionQueue, +) { + + protected abstract val isClosed: Boolean + + protected abstract fun beginOp() + protected abstract fun endOp() + protected abstract fun cancelInternal(statusCode: grpc_status_code, message: String) + + /** + * Submits a batch operation to the [CompletionQueue] and handle the returned [BatchResult]. + * If the batch was successfully submitted, [onSuccess] is called. + * In any case, [cleanup] is called. + */ + internal fun runBatch( + ops: CPointer, + nOps: ULong, + cleanup: () -> Unit = {}, + onSuccess: () -> Unit = {}, + ) { + // we must not try to run a batch after the call is closed. + if (isClosed) return cleanup() + + // pre-book the batch, so onClose cannot be called before the batch finished. + beginOp() + + when (val callResult = cq.runBatch(this@AbstractNativeCall.raw, ops, nOps)) { + is BatchResult.Submitted -> { + callResult.future.onComplete { success -> + try { + if (success) { + onSuccess() + } + } finally { + // ignore failure, as it is reflected in the client status op + cleanup() + endOp() + } + } + } + + BatchResult.CQShutdown -> { + cleanup() + endOp() + cancelInternal(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Channel shutdown") + } + + is BatchResult.SubmitError -> { + cleanup() + endOp() + cancelInternal( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Batch could not be submitted: ${callResult.error}" + ) + } + } + } + +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt index 8d38a22db..6780dac25 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt @@ -6,6 +6,7 @@ package kotlinx.rpc.grpc.internal +import cnames.structs.grpc_call import kotlinx.atomicfu.atomic import kotlinx.atomicfu.locks.SynchronizedObject import kotlinx.atomicfu.locks.synchronized @@ -117,7 +118,7 @@ internal class CompletionQueue { * Submits a batch operation to the queue. * See [BatchResult] for possible outcomes. */ - fun runBatch(call: NativeClientCall<*, *>, ops: CPointer, nOps: ULong): BatchResult { + fun runBatch(call: CPointer, ops: CPointer, nOps: ULong): BatchResult { val completion = CallbackFuture() val tag = newCbTag(completion, OPS_COMPLETE_CB) @@ -138,7 +139,7 @@ internal class CompletionQueue { return BatchResult.CQShutdown } - err = grpc_call_start_batch(call.raw, ops, nOps, tag, null) + err = grpc_call_start_batch(call, ops, nOps, tag, null) } if (err != grpc_call_error.GRPC_CALL_OK) { @@ -216,4 +217,28 @@ private fun newCbTag( private fun deleteCbTag(tag: CPointer) { tag.pointed.user_data!!.asStableRef().dispose() nativeHeap.free(tag) -} \ No newline at end of file +} + +internal interface CallbackTag { + fun run(ok: Boolean) + + fun toCbTag(): CPointer { + return newCbTag(this, staticCFunction { functor, ok -> + val tag = functor!!.reinterpret() + val callbackTag = tag.pointed.user_data!!.asStableRef().get() + deleteCbTag(tag) + callbackTag.run(ok != 0) + }) + } + + companion object { + fun anonymous(run: (ok: Boolean) -> Unit): CPointer { + return object : CallbackTag { + override fun run(ok: Boolean) { + run(ok) + } + }.toCbTag() + } + } +} + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt index d77bd9fbc..69c637b32 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt @@ -8,7 +8,19 @@ package kotlinx.rpc.grpc.internal import cnames.structs.grpc_call import kotlinx.atomicfu.atomic -import kotlinx.cinterop.* +import kotlinx.cinterop.Arena +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert +import kotlinx.cinterop.get +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.toKString +import kotlinx.cinterop.value import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableJob import kotlinx.rpc.grpc.GrpcTrailers @@ -16,7 +28,25 @@ import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode import kotlinx.rpc.protobuf.input.stream.asInputStream import kotlinx.rpc.protobuf.input.stream.asSource -import libkgrpc.* +import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA +import libkgrpc.GRPC_OP_RECV_MESSAGE +import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT +import libkgrpc.GRPC_OP_SEND_CLOSE_FROM_CLIENT +import libkgrpc.GRPC_OP_SEND_INITIAL_METADATA +import libkgrpc.GRPC_OP_SEND_MESSAGE +import libkgrpc.gpr_free +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_destroy +import libkgrpc.grpc_call_cancel_with_status +import libkgrpc.grpc_call_error +import libkgrpc.grpc_call_unref +import libkgrpc.grpc_metadata_array +import libkgrpc.grpc_metadata_array_destroy +import libkgrpc.grpc_metadata_array_init +import libkgrpc.grpc_op +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner @@ -163,7 +193,7 @@ internal class NativeClientCall( // pre-book the batch, so onClose cannot be called before the batch finished. beginOp() - when (val callResult = cq.runBatch(this@NativeClientCall, ops, nOps)) { + when (val callResult = cq.runBatch(this@NativeClientCall.raw, ops, nOps)) { is BatchResult.Submitted -> { callResult.future.onComplete { success -> try { @@ -220,7 +250,7 @@ internal class NativeClientCall( data.recv_status_on_client.trailing_metadata = null } - when (val callResult = cq.runBatch(this@NativeClientCall, op.ptr, 1u)) { + when (val callResult = cq.runBatch(this@NativeClientCall.raw, op.ptr, 1u)) { is BatchResult.Submitted -> { callResult.future.onComplete { val details = statusDetails.toByteArray().toKString() diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt index d8fbb9e98..801a6d38c 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt @@ -83,7 +83,8 @@ internal class NativeManagedChannel( override val platformApi: ManagedChannelPlatform = this private var isShutdownInternal = atomic(false) - override val isShutdown: Boolean = isShutdownInternal.value + override val isShutdown: Boolean + get() = isShutdownInternal.value private val isTerminatedInternal = CompletableDeferred() override val isTerminated: Boolean get() = isTerminatedInternal.isCompleted diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt index a03970791..18fc2114f 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -8,37 +8,59 @@ package kotlinx.rpc.grpc.internal import cnames.structs.grpc_server import cnames.structs.grpc_server_credentials +import kotlinx.cinterop.COpaquePointer import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CValue import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.staticCFunction +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.Server +import kotlinx.rpc.grpc.ServerServiceDefinition import libkgrpc.grpc_insecure_server_credentials_create import libkgrpc.grpc_server_add_http2_port +import libkgrpc.grpc_server_cancel_all_calls import libkgrpc.grpc_server_create import libkgrpc.grpc_server_credentials_release import libkgrpc.grpc_server_destroy import libkgrpc.grpc_server_register_completion_queue +import libkgrpc.grpc_server_register_method +import libkgrpc.grpc_server_register_method_payload_handling +import libkgrpc.grpc_server_shutdown_and_notify import libkgrpc.grpc_server_start +import libkgrpc.kgrpc_batch_call_allocation +import libkgrpc.kgrpc_registered_call_allocation +import libkgrpc.kgrpc_server_set_batch_method_allocator +import libkgrpc.kgrpc_server_set_register_method_allocator import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner import kotlin.time.Duration /** - * Wrapper for [cnames.structs.grpc_server_credentials]. + * Wrapper for [grpc_server_credentials]. */ -internal sealed class GrpcServerCredentials( +internal class GrpcServerCredentials( internal val raw: CPointer, ) { val rawCleaner = createCleaner(raw) { grpc_server_credentials_release(it) } + + companion object { + fun createInsecure(): GrpcServerCredentials = GrpcServerCredentials( + grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials") + ) + } } -/** - * Insecure credentials. - */ -internal class GrpcInsecureServerCredentials() : - GrpcServerCredentials(grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials")) +internal typealias MethodTag = COpaquePointer +internal data class RegisteredMethod( + val methodDescriptor: ServerMethodDefinition<*, *>, + val tag: MethodTag, +) internal class NativeServer( override val port: Int, @@ -59,33 +81,143 @@ internal class NativeServer( grpc_server_destroy(it) } + // holds all stable references to MethodAllocationCtx objects. + // the stable references must eventually be disposed. + private val methodAllocationCtxs = mutableSetOf>() + + @Suppress("unused") + private val methodAllocationCtxCleaner = createCleaner(methodAllocationCtxs) { refs -> + refs.forEach { it.dispose() } + } + init { grpc_server_register_completion_queue(raw, cq.raw, null) - grpc_server_add_http2_port(raw, "0.0.0.0:$port", credentials.raw) + grpc_server_add_http2_port(raw, "localhost:$port", credentials.raw) + addUnknownService() } + private var started = false + private var isShutdownInternal = false override val isShutdown: Boolean - get() { - TODO() - } + get() = isShutdownInternal + + private val isTerminatedInternal = CompletableDeferred() override val isTerminated: Boolean - get() { - TODO() - } + get() = isTerminatedInternal.isCompleted override fun start(): Server { + check(!started) { internalError("Server already started") } + started = true grpc_server_start(raw) + return this + } + + fun addService(service: ServerServiceDefinition) { + check(!started) { internalError("Server already started") } + + service.getMethods().forEach { + val desc = it.getMethodDescriptor() + // to construct a valid HTTP/2 path, we must prepend the name with a slash. + // the user does not do this to align it with the java implementation. + val name = "/" + desc.getFullMethodName() + // TODO: don't hardcode localhost + val tag = grpc_server_register_method( + server = raw, + method = name, + host = "localhost:$port", + payload_handling = grpc_server_register_method_payload_handling.GRPC_SRM_PAYLOAD_NONE, + flags = 0u + ) ?: error("Failed to register method: $name") + + val ctx = StableRef.create( + RegisteredMethodAllocationCtx( + server = this, + method = it, + cq = cq, + ) + ) + methodAllocationCtxs.add(ctx) + + kgrpc_server_set_register_method_allocator( + server = raw, + cq = cq.raw, + method_tag = tag, + allocator_ctx = ctx.asCPointer(), + allocator = staticCFunction(::methodAllocationCallback) + ) + } } + private fun addUnknownService() { + val ctx = StableRef.create( + MethodAllocationCtx( + server = this, + cq = cq, + ) + ) + methodAllocationCtxs.add(ctx) + + kgrpc_server_set_batch_method_allocator( + server = raw, + cq = cq.raw, + allocator_ctx = ctx.asCPointer(), + allocator = staticCFunction(::batchMethodAllocationCallback) + ) + } + + override fun shutdown(): Server { - TODO("Not yet implemented") + if (isShutdownInternal) { + return this + } + isShutdownInternal = true + + grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous { + cq.shutdown().onComplete { + methodAllocationCtxs.forEach { it.dispose() } + isTerminatedInternal.complete(Unit) + } + }) + return this } override fun shutdownNow(): Server { - TODO("Not yet implemented") + shutdown() + grpc_server_cancel_all_calls(raw) + return this } override suspend fun awaitTermination(duration: Duration): Server { - TODO("Not yet implemented") + withTimeoutOrNull(duration) { + isTerminatedInternal.await() + } + return this } -} \ No newline at end of file +} + +@CName("kgrpc_method_allocation_callback") +private fun methodAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = ServerCallbackRequest(ctx.server, ctx.method, ctx.cq) + return request.toRaw() +} + +@CName("kgrpc_method_allocation_callback") +private fun batchMethodAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = BatchedCallbackRequest(ctx.server, ctx.cq) + return request.toRaw() +} + +private open class MethodAllocationCtx( + val server: NativeServer, + val cq: CompletionQueue, +) + +private class RegisteredMethodAllocationCtx( + server: NativeServer, + cq: CompletionQueue, + val method: ServerMethodDefinition<*, *>, +) : MethodAllocationCtx(server, cq) + + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt new file mode 100644 index 000000000..ed6a137b8 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -0,0 +1,308 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.ReentrantLock +import kotlinx.atomicfu.locks.withLock +import kotlinx.cinterop.Arena +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.IntVar +import kotlinx.cinterop.alloc +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.value +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.protobuf.input.stream.asInputStream +import kotlinx.rpc.protobuf.input.stream.asSource +import libkgrpc.GRPC_OP_RECV_CLOSE_ON_SERVER +import libkgrpc.GRPC_OP_RECV_MESSAGE +import libkgrpc.GRPC_OP_SEND_INITIAL_METADATA +import libkgrpc.GRPC_OP_SEND_MESSAGE +import libkgrpc.GRPC_OP_SEND_STATUS_FROM_SERVER +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_destroy +import libkgrpc.grpc_call_cancel_with_status +import libkgrpc.grpc_call_ref +import libkgrpc.grpc_call_unref +import libkgrpc.grpc_op +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code +import kotlin.concurrent.Volatile +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner + +internal class NativeServerCall( + val raw: CPointer, + val request: ServerCallbackRequest, + val methodDescriptor: MethodDescriptor, +) : ServerCall() { + + private val cq = request.cq + + @Suppress("unused") + private val rawCleaner = createCleaner(raw) { + grpc_call_unref(it) + } + + private var listener = DeferredCallListener() + private var callbackMutex = ReentrantLock() + private var initialized = false + private var cancelled = false + private var finalized = atomic(false) + + // we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192) + private val ready = atomic(true) + + init { + // take ownership of the raw pointer + grpc_call_ref(raw) + initialize() + } + + fun setListener(listener: Listener) { + this.listener.setDelegate(listener) + } + + private fun initialize() { + // finishes if the whole connection is closed. + // this triggers onClose()/onCanceled() callback. + val arena = Arena() + val cancelled = arena.alloc() + val op = arena.alloc { + op = GRPC_OP_RECV_CLOSE_ON_SERVER + data.recv_close_on_server.cancelled = cancelled.ptr + } + val result = cq.runBatch(raw, op.ptr, 1u) + if (result !is BatchResult.Submitted) { + // we couldn't submit the initialization batch, so nothing can be done. + finalize(true) + } else { + initialized = true + result.future.onComplete { + finalize(cancelled.value == 1) + } + } + } + + /** + * Called when the call is closed (both by the client and the server). + */ + private fun finalize(cancelled: Boolean) { + if (finalized.compareAndSet(expect = false, update = true)) { + request.dispose() + if (cancelled) { + this.cancelled = true + callbackMutex.withLock { + listener.onCancel() + } + } else { + callbackMutex.withLock { + listener.onComplete() + } + } + } + } + + private fun cancelCall(status: grpc_status_code, message: String) { + grpc_call_cancel_with_status(raw, status, message, null) + } + + /** + * Sets the [ready] flag to true and calls the listener's onReady callback. + * This is called as soon as the RECV_MESSAGE batch is finished (or failed). + */ + private fun turnReady() { + callbackMutex.withLock { + if (ready.compareAndSet(expect = false, update = true)) { + listener.onReady() + } + } + } + + override fun isReady(): Boolean { + return ready.value + } + + private fun runBatch( + ops: CPointer, + nOps: ULong, + cleanup: () -> Unit = {}, + onSuccess: () -> Unit = {}, + ) { + when (val result = cq.runBatch(raw, ops, nOps)) { + is BatchResult.Submitted -> { + result.future.onComplete { + try { + onSuccess() + } catch (e: Throwable) { + cancelCall(grpc_status_code.GRPC_STATUS_INTERNAL, e.message ?: "Unknown error") + } finally { + cleanup() + } + } + } + + BatchResult.CQShutdown -> { + cleanup() + cancelCall(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Server shutdown") + } + + is BatchResult.SubmitError -> { + cleanup() + cancelCall( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Batch could not be submitted: ${result.error}" + ) + } + } + } + + override fun request(numMessages: Int) { + check(initialized) { internalError("Call not initialized") } + // TODO: Remove the num constraint + require(numMessages == 1) { internalError("numMessages must be 1") } + + val arena = Arena() + val recvPtr = arena.alloc>() + val op = arena.alloc { + op = GRPC_OP_RECV_MESSAGE + data.recv_message.recv_message = recvPtr.ptr + } + + runBatch(op.ptr, 1u, cleanup = { arena.clear() }) { + // if the call was successful, but no message was received, we reached the end-of-stream. + val buf = recvPtr.value + if (buf == null) { + callbackMutex.withLock { + listener.onHalfClose() + } + } else { + val msg = methodDescriptor.getRequestMarshaller() + .parse(buf.toKotlin().asInputStream()) + callbackMutex.withLock { + listener.onMessage(msg) + } + } + } + } + + override fun sendHeaders(headers: GrpcTrailers) { + check(initialized) { internalError("Call not initialized") } + val arena = Arena() + val op = arena.alloc { + op = GRPC_OP_SEND_INITIAL_METADATA + data.send_initial_metadata.count = 0u + data.send_initial_metadata.metadata = null + } + + runBatch(op.ptr, 1u, cleanup = { arena.clear() }) { + // nothing to do here + } + } + + override fun sendMessage(message: Response) { + check(initialized) { internalError("Call not initialized") } + check(isReady()) { internalError("Not yet ready.") } + val arena = Arena() + val inputStream = methodDescriptor.getResponseMarshaller().stream(message) + val byteBuffer = inputStream.asSource().toGrpcByteBuffer() + ready.value = false + + val op = arena.alloc { + op = GRPC_OP_SEND_MESSAGE + data.send_message.send_message = byteBuffer + } + + runBatch(op.ptr, 1u, cleanup = { + arena.clear() + grpc_byte_buffer_destroy(byteBuffer) + }) { + turnReady() + } + } + + override fun close(status: Status, trailers: GrpcTrailers) { + check(initialized) { internalError("Call not initialized") } + val arena = Arena() + + val details = status.getDescription()?.let { + arena.alloc { + it.toGrpcSlice() + } + } + val op = arena.alloc { + op = GRPC_OP_SEND_STATUS_FROM_SERVER + data.send_status_from_server.status = status.statusCode.toRaw() + data.send_status_from_server.status_details = details?.ptr + data.send_status_from_server.trailing_metadata_count = 0u + data.send_status_from_server.trailing_metadata = null + } + + runBatch(op.ptr, 1u, cleanup = { + if (details != null) grpc_slice_unref(details.readValue()) + arena.clear() + }) { + // nothing to do here + } + } + + override fun isCancelled(): Boolean { + return cancelled + } + + override fun getMethodDescriptor(): MethodDescriptor { + return methodDescriptor + } +} + +private class DeferredCallListener : ServerCall.Listener() { + @Volatile + private var delegate: ServerCall.Listener? = null + private val mutex = ReentrantLock() + private val q = ArrayDeque<(ServerCall.Listener) -> Unit>() + + fun setDelegate(d: ServerCall.Listener) { + println("setting delegate...") + mutex.withLock { + if (delegate != null) return + delegate = d + } + // drain the queue + q.forEach { it(d) } + q.clear() + } + + private inline fun deliver(crossinline f: (ServerCall.Listener) -> Unit) { + val d = delegate + if (d != null) { + // fast path (delegate is already set) + f(d); return + } + println("delivering to queue...") + // slow path: re-check under lock + val dd = mutex.withLock { + val cur = delegate + if (cur == null) { + q.addLast { f(it) } + null + } else cur + } + // if the delegate was already set, call it + if (dd != null) f(dd) + } + + override fun onMessage(message: T) = deliver { it.onMessage(message) } + override fun onHalfClose() = deliver { it.onHalfClose() } + override fun onCancel() = deliver { it.onCancel() } + override fun onComplete() = deliver { it.onComplete() } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt new file mode 100644 index 000000000..1f072d545 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.cinterop.Arena +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.alloc +import kotlinx.cinterop.cValue +import kotlinx.cinterop.ptr +import kotlinx.cinterop.value +import kotlinx.rpc.grpc.GrpcTrailers +import libkgrpc.gpr_timespec +import libkgrpc.grpc_call_details +import libkgrpc.grpc_call_details_destroy +import libkgrpc.grpc_metadata_array +import libkgrpc.grpc_metadata_array_destroy +import libkgrpc.kgrpc_batch_call_allocation +import libkgrpc.kgrpc_registered_call_allocation +import kotlin.experimental.ExperimentalNativeApi + +internal class ServerCallbackRequest( + val server: NativeServer, + val method: ServerMethodDefinition, + val cq: CompletionQueue, +) : CallbackTag { + val arena = Arena() + val rawCall = arena.alloc>() + val rawDeadline = arena.alloc() + val rawRequestMetadata = arena.alloc() + + // the run() method disposes the callback request. + // so this is a self-disposing mechanism. + fun dispose() { + grpc_metadata_array_destroy(rawRequestMetadata.ptr) + arena.clear() + } + + fun toRaw(): CValue { + return cValue { + tag = toCbTag() + call = rawCall.ptr + initial_metadata = rawRequestMetadata.ptr + deadline = rawDeadline.ptr + cq = this@ServerCallbackRequest.cq.raw + // we are currently not optimizing the initial client payload + // for unary and server streaming (payload_handling is always GRPC_SRM_PAYLOAD_NONE) + optional_payload = null + } + } + + override fun run(ok: Boolean) { + if (!ok) { + // the call has been shutdown. + // free up the request. + dispose() + return + } + + // TODO: The NativeServerCall must call dispose() ones the call is completed. + // create a NativeServerCall to control the underlying core call. + // ownership of the core call is transferred to the NativeServerCall. + val call = NativeServerCall(rawCall.value!!, this, method.getMethodDescriptor()) + // TODO: Implement trailers. + val trailers = GrpcTrailers() + // start the actual call. + val listener = method.getServerCallHandler().startCall(call, trailers) + call.setListener(listener) + } +} + +internal class BatchedCallbackRequest( + val server: NativeServer, + val cq: CompletionQueue, +) : CallbackTag { + val arena = Arena() + val rawCall = arena.alloc>() + val rawDeadline = arena.alloc() + val rawRequestMetadata = arena.alloc() + val rawDetails = arena.alloc() + + // the run() method disposes the callback request. + // so this is a self-disposing mechanism. + fun dispose() { + grpc_metadata_array_destroy(rawRequestMetadata.ptr) + grpc_call_details_destroy(rawDetails.ptr) + arena.clear() + } + + fun toRaw(): CValue { + return cValue { + tag = toCbTag() + call = rawCall.ptr + initial_metadata = rawRequestMetadata.ptr + details = rawDetails.ptr + cq = this@BatchedCallbackRequest.cq.raw + } + } + + override fun run(ok: Boolean) { + val host = rawDetails.host.toByteArray().decodeToString() + val method = rawDetails.method.toByteArray().decodeToString() + println("Got unknown callback request trigger.") + println("Host: $host") + println("Method: $method") + dispose() + } +} + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt index 69bc3b1ee..76dc3ee58 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt @@ -6,11 +6,41 @@ package kotlinx.rpc.grpc.internal -import kotlinx.cinterop.* -import kotlinx.io.* +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.plus +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.sizeOf +import kotlinx.cinterop.useContents +import kotlinx.cinterop.usePinned +import kotlinx.io.Buffer +import kotlinx.io.InternalIoApi +import kotlinx.io.Sink +import kotlinx.io.Source +import kotlinx.io.UnsafeIoApi import kotlinx.io.unsafe.UnsafeBufferOperations import kotlinx.rpc.grpc.StatusCode -import libkgrpc.* +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_reader +import libkgrpc.grpc_byte_buffer_reader_destroy +import libkgrpc.grpc_byte_buffer_reader_init +import libkgrpc.grpc_byte_buffer_reader_next +import libkgrpc.grpc_raw_byte_buffer_create +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_from_copied_buffer +import libkgrpc.grpc_slice_from_copied_string +import libkgrpc.grpc_slice_malloc +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code import platform.posix.memcpy internal fun internalError(message: String) { @@ -44,6 +74,7 @@ internal fun grpc_slice.toByteArray(): ByteArray = memScoped { return out } + internal fun CPointer.toKotlin(): Buffer = memScoped { val reader = alloc() check(grpc_byte_buffer_reader_init(reader.ptr, this@toKotlin) == 1) @@ -159,4 +190,24 @@ internal fun grpc_status_code.toKotlin(): StatusCode = when (this) { grpc_status_code.GRPC_STATUS_DATA_LOSS -> StatusCode.DATA_LOSS grpc_status_code.GRPC_STATUS_UNAUTHENTICATED -> StatusCode.UNAUTHENTICATED grpc_status_code.GRPC_STATUS__DO_NOT_USE -> error("Invalid status code: ${this.ordinal}") +} + +internal fun StatusCode.toRaw(): grpc_status_code = when (this) { + StatusCode.OK -> grpc_status_code.GRPC_STATUS_OK + StatusCode.CANCELLED -> grpc_status_code.GRPC_STATUS_CANCELLED + StatusCode.UNKNOWN -> grpc_status_code.GRPC_STATUS_UNKNOWN + StatusCode.INVALID_ARGUMENT -> grpc_status_code.GRPC_STATUS_INVALID_ARGUMENT + StatusCode.DEADLINE_EXCEEDED -> grpc_status_code.GRPC_STATUS_DEADLINE_EXCEEDED + StatusCode.NOT_FOUND -> grpc_status_code.GRPC_STATUS_NOT_FOUND + StatusCode.ALREADY_EXISTS -> grpc_status_code.GRPC_STATUS_ALREADY_EXISTS + StatusCode.PERMISSION_DENIED -> grpc_status_code.GRPC_STATUS_PERMISSION_DENIED + StatusCode.RESOURCE_EXHAUSTED -> grpc_status_code.GRPC_STATUS_RESOURCE_EXHAUSTED + StatusCode.FAILED_PRECONDITION -> grpc_status_code.GRPC_STATUS_FAILED_PRECONDITION + StatusCode.ABORTED -> grpc_status_code.GRPC_STATUS_ABORTED + StatusCode.OUT_OF_RANGE -> grpc_status_code.GRPC_STATUS_OUT_OF_RANGE + StatusCode.UNIMPLEMENTED -> grpc_status_code.GRPC_STATUS_UNIMPLEMENTED + StatusCode.INTERNAL -> grpc_status_code.GRPC_STATUS_INTERNAL + StatusCode.UNAVAILABLE -> grpc_status_code.GRPC_STATUS_UNAVAILABLE + StatusCode.DATA_LOSS -> grpc_status_code.GRPC_STATUS_DATA_LOSS + StatusCode.UNAUTHENTICATED -> grpc_status_code.GRPC_STATUS_UNAUTHENTICATED } \ No newline at end of file From f5831129d410f1e4e8af43a9ea777cb28b25edc5 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Thu, 4 Sep 2025 18:18:32 +0200 Subject: [PATCH 03/10] grpc-native: Fixing memory leaks Signed-off-by: Johannes Zottele --- grpc/grpc-core/build.gradle.kts | 11 ++++++ .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 38 +++++++++++------- .../kotlinx/rpc/grpc/internal/NativeServer.kt | 39 ++++++++----------- .../rpc/grpc/internal/NativeServerCall.kt | 23 ++++++----- .../grpc/internal/ServerCallbackRequest.kt | 31 ++++++++------- 5 files changed, 81 insertions(+), 61 deletions(-) diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index c71250119..6acfe6c3b 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -100,6 +100,17 @@ kotlin { extraOpts("-libraryPath", "$cLibOutDir") } } + + targets.withType().configureEach { + binaries { + // Ensure test binaries are created for both debug and release + test( + buildTypes = listOf( + org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE + ) + ) + } + } } configureLocalProtocGenDevelopmentDependency() diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 5135cd769..612e7de55 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -5,8 +5,8 @@ package kotlinx.rpc.grpc.test import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout import kotlinx.rpc.grpc.GrpcServer import kotlinx.rpc.grpc.GrpcTrailers @@ -27,7 +27,6 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertFailsWith -import kotlin.time.Duration private const val PORT = 50051 @@ -271,20 +270,31 @@ class GreeterServiceImpl : GreeterService { * Run this on JVM before executing tests. */ @Test - fun runServer() = runTest(timeout = Duration.INFINITE) { - val server = GrpcServer( - port = PORT, - builder = { registerService { GreeterServiceImpl() } } - ) + fun runServer() { + runBlocking { + val server = GrpcServer( + port = PORT, + builder = { registerService { GreeterServiceImpl() } } + ) + + launch { + println("Terminating in 10 seconds") + delay(10000) + server.shutdown() + server.awaitTermination() + } - try { - server.start() - println("Server started") - server.awaitTermination() - } finally { - server.shutdown() - server.awaitTermination() + launch { + server.start() + println("Server started") + server.awaitTermination() + } } + +// runBlocking { +// println("Waiting, so GC is collecting stuff") +// delay(20000) +// } } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt index 18fc2114f..3b91ad925 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -8,6 +8,7 @@ package kotlinx.rpc.grpc.internal import cnames.structs.grpc_server import cnames.structs.grpc_server_credentials +import kotlinx.atomicfu.atomic import kotlinx.cinterop.COpaquePointer import kotlinx.cinterop.CPointer import kotlinx.cinterop.CValue @@ -55,15 +56,9 @@ internal class GrpcServerCredentials( } } -internal typealias MethodTag = COpaquePointer - -internal data class RegisteredMethod( - val methodDescriptor: ServerMethodDefinition<*, *>, - val tag: MethodTag, -) - internal class NativeServer( override val port: Int, + // we must reference them, otherwise the credentials are getting garbage collected @Suppress("Redundant") private val credentials: GrpcServerCredentials, ) : Server { @@ -76,20 +71,10 @@ internal class NativeServer( val raw: CPointer = grpc_server_create(null, null)!! - @Suppress("unused") - private val rawCleaner = createCleaner(raw) { - grpc_server_destroy(it) - } - // holds all stable references to MethodAllocationCtx objects. // the stable references must eventually be disposed. private val methodAllocationCtxs = mutableSetOf>() - @Suppress("unused") - private val methodAllocationCtxCleaner = createCleaner(methodAllocationCtxs) { refs -> - refs.forEach { it.dispose() } - } - init { grpc_server_register_completion_queue(raw, cq.raw, null) grpc_server_add_http2_port(raw, "localhost:$port", credentials.raw) @@ -97,9 +82,9 @@ internal class NativeServer( } private var started = false - private var isShutdownInternal = false + private var isShutdownInternal = atomic(false) override val isShutdown: Boolean - get() = isShutdownInternal + get() = isShutdownInternal.value private val isTerminatedInternal = CompletableDeferred() override val isTerminated: Boolean @@ -112,6 +97,14 @@ internal class NativeServer( return this } + private fun dispose() { + // disposed with completion of shutdown + grpc_server_destroy(raw) + methodAllocationCtxs.forEach { it.dispose() } + // release the grpc runtime, so grpc is shutdown if no other grpc servers are running. + rt.close() + } + fun addService(service: ServerServiceDefinition) { check(!started) { internalError("Server already started") } @@ -137,7 +130,7 @@ internal class NativeServer( ) ) methodAllocationCtxs.add(ctx) - + kgrpc_server_set_register_method_allocator( server = raw, cq = cq.raw, @@ -167,14 +160,14 @@ internal class NativeServer( override fun shutdown(): Server { - if (isShutdownInternal) { + if (!isShutdownInternal.compareAndSet(expect = false, update = true)) { + // shutdown only once return this } - isShutdownInternal = true grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous { cq.shutdown().onComplete { - methodAllocationCtxs.forEach { it.dispose() } + dispose() isTerminatedInternal.complete(Unit) } }) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index ed6a137b8..823d172c5 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -31,7 +31,6 @@ import libkgrpc.GRPC_OP_SEND_STATUS_FROM_SERVER import libkgrpc.grpc_byte_buffer import libkgrpc.grpc_byte_buffer_destroy import libkgrpc.grpc_call_cancel_with_status -import libkgrpc.grpc_call_ref import libkgrpc.grpc_call_unref import libkgrpc.grpc_op import libkgrpc.grpc_slice @@ -42,13 +41,12 @@ import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner internal class NativeServerCall( + // ownership is transferred to the call val raw: CPointer, - val request: ServerCallbackRequest, + val cq: CompletionQueue, val methodDescriptor: MethodDescriptor, ) : ServerCall() { - private val cq = request.cq - @Suppress("unused") private val rawCleaner = createCleaner(raw) { grpc_call_unref(it) @@ -64,8 +62,6 @@ internal class NativeServerCall( private val ready = atomic(true) init { - // take ownership of the raw pointer - grpc_call_ref(raw) initialize() } @@ -85,11 +81,14 @@ internal class NativeServerCall( val result = cq.runBatch(raw, op.ptr, 1u) if (result !is BatchResult.Submitted) { // we couldn't submit the initialization batch, so nothing can be done. + arena.clear() finalize(true) } else { initialized = true result.future.onComplete { - finalize(cancelled.value == 1) + val cancelled = cancelled.value == 1 + arena.clear() + finalize(cancelled) } } } @@ -99,7 +98,6 @@ internal class NativeServerCall( */ private fun finalize(cancelled: Boolean) { if (finalized.compareAndSet(expect = false, update = true)) { - request.dispose() if (cancelled) { this.cancelled = true callbackMutex.withLock { @@ -179,7 +177,9 @@ internal class NativeServerCall( data.recv_message.recv_message = recvPtr.ptr } - runBatch(op.ptr, 1u, cleanup = { arena.clear() }) { + runBatch(op.ptr, 1u, cleanup = { + arena.clear() + }) { // if the call was successful, but no message was received, we reached the end-of-stream. val buf = recvPtr.value if (buf == null) { @@ -189,6 +189,10 @@ internal class NativeServerCall( } else { val msg = methodDescriptor.getRequestMarshaller() .parse(buf.toKotlin().asInputStream()) + + // destroy the buffer, we don't need it anymore + grpc_byte_buffer_destroy(buf) + callbackMutex.withLock { listener.onMessage(msg) } @@ -272,7 +276,6 @@ private class DeferredCallListener : ServerCall.Listener() { private val q = ArrayDeque<(ServerCall.Listener) -> Unit>() fun setDelegate(d: ServerCall.Listener) { - println("setting delegate...") mutex.withLock { if (delegate != null) return delegate = d diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt index 1f072d545..ad7041c0b 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt @@ -56,22 +56,25 @@ internal class ServerCallbackRequest( } override fun run(ok: Boolean) { - if (!ok) { - // the call has been shutdown. - // free up the request. + try { + if (!ok) { + // the call has been shutdown.\ + return + } + + // create a NativeServerCall to control the underlying core call. + // ownership of the core call is transferred to the NativeServerCall. + val call = NativeServerCall(rawCall.value!!, cq, method.getMethodDescriptor()) + // TODO: Turn metadata into a kotlin GrpcTrailers. + val trailers = GrpcTrailers() + // start the actual call. + val listener = method.getServerCallHandler().startCall(call, trailers) + call.setListener(listener) + } finally { + // at this point, all return values have been transformed into kotlin ones, + // so we can safely clear all resources. dispose() - return } - - // TODO: The NativeServerCall must call dispose() ones the call is completed. - // create a NativeServerCall to control the underlying core call. - // ownership of the core call is transferred to the NativeServerCall. - val call = NativeServerCall(rawCall.value!!, this, method.getMethodDescriptor()) - // TODO: Implement trailers. - val trailers = GrpcTrailers() - // start the actual call. - val listener = method.getServerCallHandler().startCall(call, trailers) - call.setListener(listener) } } From 244c5bf305faa9fa44d049e669121129a26aae1d Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Thu, 4 Sep 2025 19:12:25 +0200 Subject: [PATCH 04/10] grpc-native: Test fix Signed-off-by: Johannes Zottele --- .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 32 ++++++++----------- .../rpc/grpc/internal/NativeServerCall.kt | 1 + 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 612e7de55..0342524dd 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -5,7 +5,6 @@ package kotlinx.rpc.grpc.test import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout import kotlinx.rpc.grpc.GrpcServer @@ -62,8 +61,12 @@ class GrpcCoreClientTest { this.timeout = timeout } - private fun shutdownAndWait(channel: ManagedChannel) { - channel.shutdown() + private fun shutdownAndWait(channel: ManagedChannel, now: Boolean = false) { + if (now) { + channel.shutdownNow() + } else { + channel.shutdown() + } runBlocking { channel.awaitTermination() } } @@ -190,10 +193,10 @@ class GrpcCoreClientTest { @Test fun halfCloseBeforeSendingMessage_errorWithoutCrashing() { val channel = createChannel() - val call = channel.newHelloCall() - val statusDeferred = CompletableDeferred() +// val call = channel.newHelloCall() + val call = channel.newHelloCall(fullName = "helloworld.Greeter/SayHello") val listener = createClientCallListener( - onClose = { status, _ -> statusDeferred.complete(status) } + onClose = { status, _ -> println("Status: ${status.statusCode}, Message: ${status.getDescription()}") } ) assertFailsWith { try { @@ -277,24 +280,15 @@ class GreeterServiceImpl : GreeterService { builder = { registerService { GreeterServiceImpl() } } ) - launch { - println("Terminating in 10 seconds") - delay(10000) - server.shutdown() - server.awaitTermination() - } - - launch { + try { server.start() println("Server started") server.awaitTermination() + } finally { + server.shutdown() + server.awaitTermination() } } - -// runBlocking { -// println("Waiting, so GC is collecting stuff") -// delay(20000) -// } } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index 823d172c5..72a8dcaa3 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -181,6 +181,7 @@ internal class NativeServerCall( arena.clear() }) { // if the call was successful, but no message was received, we reached the end-of-stream. + // and thus the client half-closed. val buf = recvPtr.value if (buf == null) { callbackMutex.withLock { From 07678ca42f6a9be672879edda4e26b966f8fe891 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Thu, 4 Sep 2025 19:24:28 +0200 Subject: [PATCH 05/10] grpc-native: Fix halfClose before message received behavior Signed-off-by: Johannes Zottele --- .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 3 +-- .../rpc/grpc/internal/NativeServerCall.kt | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 0342524dd..63a97252c 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -193,8 +193,7 @@ class GrpcCoreClientTest { @Test fun halfCloseBeforeSendingMessage_errorWithoutCrashing() { val channel = createChannel() -// val call = channel.newHelloCall() - val call = channel.newHelloCall(fullName = "helloworld.Greeter/SayHello") + val call = channel.newHelloCall() val listener = createClientCallListener( onClose = { status, _ -> println("Status: ${status.statusCode}, Message: ${status.getDescription()}") } ) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index 72a8dcaa3..5a58957a6 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -58,6 +58,9 @@ internal class NativeServerCall( private var cancelled = false private var finalized = atomic(false) + // Tracks whether at least one request message has been received on this call. + private var receivedFirstMessage = false + // we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192) private val ready = atomic(true) @@ -184,8 +187,16 @@ internal class NativeServerCall( // and thus the client half-closed. val buf = recvPtr.value if (buf == null) { - callbackMutex.withLock { - listener.onHalfClose() + // end-of-stream observed. for UNARY, absence of any request is a protocol violation. + if (methodDescriptor.type == MethodType.UNARY && !receivedFirstMessage) { + cancelCall( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Unary call half-closed before receiving a request message" + ) + } else { + callbackMutex.withLock { + listener.onHalfClose() + } } } else { val msg = methodDescriptor.getRequestMarshaller() @@ -194,6 +205,9 @@ internal class NativeServerCall( // destroy the buffer, we don't need it anymore grpc_byte_buffer_destroy(buf) + // Mark that we have received at least one request message + receivedFirstMessage = true + callbackMutex.withLock { listener.onMessage(msg) } From 8302181b946aba5c3b78ea9a01b5cb8b89c41857 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Thu, 4 Sep 2025 20:00:22 +0200 Subject: [PATCH 06/10] grpc-native: Fix free on error Signed-off-by: Johannes Zottele --- .../kotlinx/rpc/grpc/internal/NativeServer.kt | 7 ++++-- .../rpc/grpc/internal/NativeServerCall.kt | 23 +++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt index 3b91ad925..33a0e121e 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -56,6 +56,9 @@ internal class GrpcServerCredentials( } } +// TODO: don't hardcode the host +private val HOST = "0.0.0.0" + internal class NativeServer( override val port: Int, // we must reference them, otherwise the credentials are getting garbage collected @@ -77,7 +80,7 @@ internal class NativeServer( init { grpc_server_register_completion_queue(raw, cq.raw, null) - grpc_server_add_http2_port(raw, "localhost:$port", credentials.raw) + grpc_server_add_http2_port(raw, "$HOST:$port", credentials.raw) addUnknownService() } @@ -113,10 +116,10 @@ internal class NativeServer( // to construct a valid HTTP/2 path, we must prepend the name with a slash. // the user does not do this to align it with the java implementation. val name = "/" + desc.getFullMethodName() - // TODO: don't hardcode localhost val tag = grpc_server_register_method( server = raw, method = name, + // TODO: don't hardcode localhost host = "localhost:$port", payload_handling = grpc_server_register_method_payload_handling.GRPC_SRM_PAYLOAD_NONE, flags = 0u diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index 5a58957a6..129a1b137 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -199,17 +199,17 @@ internal class NativeServerCall( } } } else { - val msg = methodDescriptor.getRequestMarshaller() - .parse(buf.toKotlin().asInputStream()) - - // destroy the buffer, we don't need it anymore - grpc_byte_buffer_destroy(buf) - - // Mark that we have received at least one request message - receivedFirstMessage = true - - callbackMutex.withLock { - listener.onMessage(msg) + try { + val msg = methodDescriptor.getRequestMarshaller() + .parse(buf.toKotlin().asInputStream()) + // Mark that we have received at least one request message + receivedFirstMessage = true + callbackMutex.withLock { + listener.onMessage(msg) + } + } finally { + // destroy the buffer, we don't need it anymore + grpc_byte_buffer_destroy(buf) } } } @@ -306,7 +306,6 @@ private class DeferredCallListener : ServerCall.Listener() { // fast path (delegate is already set) f(d); return } - println("delivering to queue...") // slow path: re-check under lock val dd = mutex.withLock { val cur = delegate From 0d261805705e103b5978a54cdef88ba9e4a07c6e Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Fri, 5 Sep 2025 15:07:35 +0200 Subject: [PATCH 07/10] grpc-native: Code cleanup Signed-off-by: Johannes Zottele --- cinterop-c/MODULE.bazel.lock | 8 +- .../rpc/grpc/ServerServiceDefinition.kt | 4 +- .../rpc/grpc/MutableHandlerRegistry.native.kt | 55 ++++++- .../kotlin/kotlinx/rpc/grpc/Server.native.kt | 31 ++-- .../grpc/ServerServiceDefinition.native.kt | 11 +- .../rpc/grpc/internal/CompletionQueue.kt | 23 +++ .../grpc/internal/MethodDescriptor.native.kt | 18 ++- .../rpc/grpc/internal/NativeManagedChannel.kt | 3 + .../kotlinx/rpc/grpc/internal/NativeServer.kt | 150 ++++++++++-------- .../rpc/grpc/internal/NativeServerCall.kt | 48 +++++- ...erCallbackRequest.kt => serverCallTags.kt} | 120 ++++++++++---- .../kotlin/kotlinx/rpc/grpc/internal/utils.kt | 2 +- 12 files changed, 340 insertions(+), 133 deletions(-) rename grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/{ServerCallbackRequest.kt => serverCallTags.kt} (51%) diff --git a/cinterop-c/MODULE.bazel.lock b/cinterop-c/MODULE.bazel.lock index 43b3be81c..4da8305f4 100644 --- a/cinterop-c/MODULE.bazel.lock +++ b/cinterop-c/MODULE.bazel.lock @@ -245,8 +245,8 @@ "https://bcr.bazel.build/modules/rules_java/7.3.2/MODULE.bazel": "50dece891cfdf1741ea230d001aa9c14398062f2b7c066470accace78e412bc2", "https://bcr.bazel.build/modules/rules_java/7.4.0/MODULE.bazel": "a592852f8a3dd539e82ee6542013bf2cadfc4c6946be8941e189d224500a8934", "https://bcr.bazel.build/modules/rules_java/7.6.1/MODULE.bazel": "2f14b7e8a1aa2f67ae92bc69d1ec0fa8d9f827c4e17ff5e5f02e91caa3b2d0fe", - "https://bcr.bazel.build/modules/rules_java/8.12.0/MODULE.bazel": "8e6590b961f2defdfc2811c089c75716cb2f06c8a4edeb9a8d85eaa64ee2a761", - "https://bcr.bazel.build/modules/rules_java/8.12.0/source.json": "cbd5d55d9d38d4008a7d00bee5b5a5a4b6031fcd4a56515c9accbcd42c7be2ba", + "https://bcr.bazel.build/modules/rules_java/8.14.0/MODULE.bazel": "717717ed40cc69994596a45aec6ea78135ea434b8402fb91b009b9151dd65615", + "https://bcr.bazel.build/modules/rules_java/8.14.0/source.json": "8a88c4ca9e8759da53cddc88123880565c520503321e2566b4e33d0287a3d4bc", "https://bcr.bazel.build/modules/rules_java/8.3.2/MODULE.bazel": "7336d5511ad5af0b8615fdc7477535a2e4e723a357b6713af439fe8cf0195017", "https://bcr.bazel.build/modules/rules_java/8.5.1/MODULE.bazel": "d8a9e38cc5228881f7055a6079f6f7821a073df3744d441978e7a43e20226939", "https://bcr.bazel.build/modules/rules_java/8.6.1/MODULE.bazel": "f4808e2ab5b0197f094cabce9f4b006a27766beb6a9975931da07099560ca9c2", @@ -534,7 +534,7 @@ }, "@@rules_foreign_cc+//foreign_cc:extensions.bzl%tools": { "general": { - "bzlTransitiveDigest": "ginC3lIGOKKivBi0nyv2igKvSiz42Thm8yaX2RwVaHg=", + "bzlTransitiveDigest": "jO6HNyY7/eIylNs2RYABjCfbAgUNb1oiXpl3aY4V/hI=", "usagesDigest": "9LXdVp01HkdYQT8gYPjYLO6VLVJHo9uFfxWaU1ymiRE=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, @@ -848,7 +848,7 @@ }, "@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": { "general": { - "bzlTransitiveDigest": "hUTp2w+RUVdL7ma5esCXZJAFnX7vLbVfLd7FwnQI6bU=", + "bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=", "usagesDigest": "QI2z8ZUR+mqtbwsf2fLqYdJAkPOHdOV+tF2yVAUgRzw=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt index 80c6b5846..bb49085cb 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt @@ -16,10 +16,12 @@ import kotlinx.rpc.internal.utils.InternalRpcApi public expect class ServerServiceDefinition { public fun getServiceDescriptor(): ServiceDescriptor public fun getMethods(): Collection> + + public fun getMethod(methodName: String): ServerMethodDefinition<*, *>? } @InternalRpcApi public expect fun serverServiceDefinition( serviceDescriptor: ServiceDescriptor, - methods: Collection> + methods: Collection>, ): ServerServiceDefinition diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt index e2ae9619a..08e0c4bd2 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt @@ -6,17 +6,66 @@ package kotlinx.rpc.grpc +import kotlinx.rpc.grpc.internal.MethodDescriptor +import kotlinx.rpc.grpc.internal.ServerMethodDefinition +import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap + /** * Registry of services and their methods used by servers to dispatching incoming calls. */ -public actual abstract class HandlerRegistry +public actual abstract class HandlerRegistry { + /** + * Returns the [ServerServiceDefinition]s provided by the registry, or an empty list if not + * supported by the implementation. + */ + public abstract fun getServices(): List + + /** + * Lookup a [ServerMethodDefinition] by its fully-qualified name. + * + * @param methodName to lookup [ServerMethodDefinition] for. + * @param authority the authority for the desired method (to do virtual hosting). If `null` + * the first matching method will be returned. + * @return the resolved method or `null` if no method for that name exists. + */ + public abstract fun lookupMethod( + methodName: String, authority: String?, + ): ServerMethodDefinition<*, *>? + + /** + * Lookup a [ServerMethodDefinition] by its fully-qualified name. + * + * @param methodName to lookup [ServerMethodDefinition] for. + * @return the resolved method or `null` if no method for that name exists. + */ + public fun lookupMethod(methodName: String): ServerMethodDefinition<*, *>? { + return lookupMethod(methodName, null) + } + +} internal actual class MutableHandlerRegistry : HandlerRegistry() { + + private val services = RpcInternalConcurrentHashMap() + actual fun addService(service: ServerServiceDefinition): ServerServiceDefinition? { - error("Native target is not supported in gRPC") + return services.put(service.getServiceDescriptor().getName(), service) } actual fun removeService(service: ServerServiceDefinition): Boolean { - error("Native target is not supported in gRPC") + return services.remove(service.getServiceDescriptor().getName()) != null + } + + override fun getServices(): List { + return services.values.toList() + } + + override fun lookupMethod( + methodName: String, + authority: String?, + ): ServerMethodDefinition<*, *>? { + val serviceName = MethodDescriptor.extractFullServiceName(methodName) ?: return null + val service = services[serviceName] ?: return null + return service.getMethod(methodName) } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt index 63e9ac47d..5e8f53cba 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt @@ -4,8 +4,9 @@ package kotlinx.rpc.grpc -import kotlinx.rpc.grpc.internal.GrpcServerCredentials +import kotlinx.rpc.grpc.internal.GrpcInsecureServerCredentials import kotlinx.rpc.grpc.internal.NativeServer +import kotlinx.rpc.grpc.internal.ServerMethodDefinition /** * Platform-specific gRPC server builder. @@ -23,8 +24,9 @@ private class NativeServerBuilder( ) : ServerBuilder() { // TODO: Add actual credentials - private val credentials = GrpcServerCredentials.createInsecure() + private val credentials = GrpcInsecureServerCredentials() private val services = mutableListOf() + private var fallbackRegistry: HandlerRegistry = DefaultFallbackRegistry override fun addService(service: ServerServiceDefinition): NativeServerBuilder { services.add(service) @@ -32,17 +34,12 @@ private class NativeServerBuilder( } override fun fallbackHandlerRegistry(registry: HandlerRegistry?): NativeServerBuilder { - TODO("Not yet implemented") + fallbackRegistry = registry ?: DefaultFallbackRegistry + return this } override fun build(): Server { - val server = NativeServer(port, credentials) - - for (service in services) { - server.addService(service) - } - - return server + return NativeServer(port, credentials, services, fallbackRegistry) } } @@ -54,3 +51,17 @@ internal actual fun ServerBuilder(port: Int): ServerBuilder<*> { internal actual fun Server(builder: ServerBuilder<*>): Server { return builder.build() } + +private object DefaultFallbackRegistry : HandlerRegistry() { + override fun getServices(): List { + return listOf() + } + + override fun lookupMethod( + methodName: String, + authority: String?, + ): ServerMethodDefinition<*, *>? { + return null + } + +} diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt index 5559469f8..31977c8b3 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt @@ -10,11 +10,18 @@ import kotlinx.rpc.internal.utils.InternalRpcApi public actual class ServerServiceDefinition internal constructor( private val serviceDescriptor: ServiceDescriptor, - private val methods: Collection>, + private val methods: Map>, ) { + + internal constructor(serviceDescriptor: ServiceDescriptor, methods: Collection>) : + this(serviceDescriptor, methods.associateBy { it.getMethodDescriptor().getFullMethodName() }) + public actual fun getServiceDescriptor(): ServiceDescriptor = serviceDescriptor - public actual fun getMethods(): Collection> = methods + public actual fun getMethods(): Collection> = methods.values + + public actual fun getMethod(methodName: String): ServerMethodDefinition<*, *>? = methods[methodName] + } @InternalRpcApi diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt index 6780dac25..329c45ae1 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt @@ -219,19 +219,42 @@ private fun deleteCbTag(tag: CPointer) { nativeHeap.free(tag) } +/** + * Represents a callback tag for completion queues constructed with + * [grpc_completion_queue_create_for_callback]. + * + * The [run] method will be invoked onces the completion queue signals the completion of the operation. + * It is guaranteed that the [run] method will be only invoked once. + * + * `this` object is guaranteed to be not garbage collected until the [run] method was executed. + */ internal interface CallbackTag { fun run(ok: Boolean) + /** + * Creates a pointer to a gRPC callback tag that encapsulates the given `run` function. + * It can be passed to callback-based grpc_completion_queue. + */ fun toCbTag(): CPointer { return newCbTag(this, staticCFunction { functor, ok -> val tag = functor!!.reinterpret() val callbackTag = tag.pointed.user_data!!.asStableRef().get() + // free the tag memory and release the stable reference to `this` deleteCbTag(tag) callbackTag.run(ok != 0) }) } companion object { + /** + * Creates a pointer to a gRPC callback tag that encapsulates the given `run` function. + * + * The `run` function will be invoked when the callback is triggered with a boolean + * value that indicates the success or failure of the operation. + * + * @return A pointer to the newly created gRPC callback tag. + * It can be passed to callback-based grpc_completion_queue. + */ fun anonymous(run: (ok: Boolean) -> Unit): CPointer { return object : CallbackTag { override fun run(ok: Boolean) { diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt index ef95e782e..f1a77e0d7 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt @@ -22,12 +22,7 @@ public actual class MethodDescriptor internal constructor( public actual fun getFullMethodName(): String = fullMethodName private val serviceName: String? by lazy { - val index = fullMethodName.lastIndexOf('/') - if (index == -1) { - null - } else { - fullMethodName.substring(0, index) - } + extractFullServiceName(fullMethodName) } public actual fun getServiceName(): String? = serviceName @@ -48,6 +43,17 @@ public actual class MethodDescriptor internal constructor( public actual fun stream(value: T): InputStream public actual fun parse(stream: InputStream): T } + + public companion object { + public fun extractFullServiceName(fullMethodName: String): String? { + val index = fullMethodName.lastIndexOf('/') + return if (index == -1) { + null + } else { + fullMethodName.take(index) + } + } + } } @InternalRpcApi diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt index 801a6d38c..7d465f4d0 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt @@ -26,6 +26,7 @@ import libkgrpc.grpc_channel_create_call import libkgrpc.grpc_channel_credentials_release import libkgrpc.grpc_channel_destroy import libkgrpc.grpc_insecure_credentials_create +import libkgrpc.grpc_slice_unref import kotlin.coroutines.cancellation.CancellationException import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner @@ -151,6 +152,8 @@ internal class NativeManagedChannel( reserved = null ) ?: error("Failed to create call") + grpc_slice_unref(methodNameSlice) + return NativeClientCall( cq, rawCall, methodDescriptor, callJob ) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt index 33a0e121e..3c97de634 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -18,6 +18,7 @@ import kotlinx.cinterop.asStableRef import kotlinx.cinterop.staticCFunction import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.withTimeoutOrNull +import kotlinx.rpc.grpc.HandlerRegistry import kotlinx.rpc.grpc.Server import kotlinx.rpc.grpc.ServerServiceDefinition import libkgrpc.grpc_insecure_server_credentials_create @@ -42,28 +43,25 @@ import kotlin.time.Duration /** * Wrapper for [grpc_server_credentials]. */ -internal class GrpcServerCredentials( +internal sealed class GrpcServerCredentials( internal val raw: CPointer, ) { val rawCleaner = createCleaner(raw) { grpc_server_credentials_release(it) } - - companion object { - fun createInsecure(): GrpcServerCredentials = GrpcServerCredentials( - grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials") - ) - } } -// TODO: don't hardcode the host -private val HOST = "0.0.0.0" +internal class GrpcInsecureServerCredentials : + GrpcServerCredentials(grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials")) + internal class NativeServer( override val port: Int, // we must reference them, otherwise the credentials are getting garbage collected @Suppress("Redundant") private val credentials: GrpcServerCredentials, + services: List, + val fallbackRegistry: HandlerRegistry, ) : Server { // a reference to make sure the grpc_init() was called. (it is released after shutdown) @@ -76,12 +74,13 @@ internal class NativeServer( // holds all stable references to MethodAllocationCtx objects. // the stable references must eventually be disposed. - private val methodAllocationCtxs = mutableSetOf>() + private val callAllocationCtxs = mutableSetOf>() init { grpc_server_register_completion_queue(raw, cq.raw, null) - grpc_server_add_http2_port(raw, "$HOST:$port", credentials.raw) - addUnknownService() + grpc_server_add_http2_port(raw, "0.0.0.0:$port", credentials.raw) + registerServices(services) + setLookupCallAllocatorCallback() } private var started = false @@ -103,15 +102,46 @@ internal class NativeServer( private fun dispose() { // disposed with completion of shutdown grpc_server_destroy(raw) - methodAllocationCtxs.forEach { it.dispose() } + callAllocationCtxs.forEach { it.dispose() } // release the grpc runtime, so grpc is shutdown if no other grpc servers are running. rt.close() } - fun addService(service: ServerServiceDefinition) { + override fun shutdown(): Server { + if (!isShutdownInternal.compareAndSet(expect = false, update = true)) { + // shutdown only once + return this + } + + grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous { + cq.shutdown().onComplete { + dispose() + isTerminatedInternal.complete(Unit) + } + }) + return this + } + + override fun shutdownNow(): Server { + shutdown() + grpc_server_cancel_all_calls(raw) + return this + } + + override suspend fun awaitTermination(duration: Duration): Server { + withTimeoutOrNull(duration) { + isTerminatedInternal.await() + } + return this + } + + /** + * Registers a list of server service definitions with the server. + */ + private fun registerServices(services: List) { check(!started) { internalError("Server already started") } - service.getMethods().forEach { + services.flatMap { it.getMethods() }.forEach { val desc = it.getMethodDescriptor() // to construct a valid HTTP/2 path, we must prepend the name with a slash. // the user does not do this to align it with the java implementation. @@ -119,101 +149,95 @@ internal class NativeServer( val tag = grpc_server_register_method( server = raw, method = name, - // TODO: don't hardcode localhost - host = "localhost:$port", + host = null, + // we currently don't optimize unary calls by reading the message on connection payload_handling = grpc_server_register_method_payload_handling.GRPC_SRM_PAYLOAD_NONE, flags = 0u ) ?: error("Failed to register method: $name") val ctx = StableRef.create( - RegisteredMethodAllocationCtx( + RegisteredCallAllocationCtx( server = this, method = it, cq = cq, ) ) - methodAllocationCtxs.add(ctx) + callAllocationCtxs.add(ctx) + // register the allocation callback for the method. + // it is invoked by the grpc runtime to allocate a new call context for incoming requests. kgrpc_server_set_register_method_allocator( server = raw, cq = cq.raw, method_tag = tag, allocator_ctx = ctx.asCPointer(), - allocator = staticCFunction(::methodAllocationCallback) + allocator = staticCFunction(::registeredCallAllocationCallback) ) } } - private fun addUnknownService() { + /** + * Configures the server with a callback to handle the allocation of method calls for incoming requests, + * which were not registered before the server started (via [registerServices]). + */ + private fun setLookupCallAllocatorCallback() { val ctx = StableRef.create( - MethodAllocationCtx( + CallAllocationCtx( server = this, cq = cq, ) ) - methodAllocationCtxs.add(ctx) + callAllocationCtxs.add(ctx) kgrpc_server_set_batch_method_allocator( server = raw, cq = cq.raw, allocator_ctx = ctx.asCPointer(), - allocator = staticCFunction(::batchMethodAllocationCallback) + allocator = staticCFunction(::lookupCallAllocationCallback) ) } - - override fun shutdown(): Server { - if (!isShutdownInternal.compareAndSet(expect = false, update = true)) { - // shutdown only once - return this - } - - grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous { - cq.shutdown().onComplete { - dispose() - isTerminatedInternal.complete(Unit) - } - }) - return this - } - - override fun shutdownNow(): Server { - shutdown() - grpc_server_cancel_all_calls(raw) - return this - } - - override suspend fun awaitTermination(duration: Duration): Server { - withTimeoutOrNull(duration) { - isTerminatedInternal.await() - } - return this - } } +/** + * Allocates and returns a registered call allocation for a given call context. + */ @CName("kgrpc_method_allocation_callback") -private fun methodAllocationCallback(ctx: COpaquePointer?): CValue { - val ctx = ctx!!.asStableRef().get() - val request = ServerCallbackRequest(ctx.server, ctx.method, ctx.cq) - return request.toRaw() +private fun registeredCallAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = RegisteredServerCallTag(ctx.cq, ctx.method) + return request.toRawCallAllocation() } +/** + * A static callback that is invoked by the grpc runtime to allocate a new [kgrpc_registered_call_allocation]. + * As the [LookupServerCallTag] is a [CallbackTag] it won't be garbage collected until the callback is executed. + * + * @param ctx A pointer to a [CallAllocationCtx] object + */ @CName("kgrpc_method_allocation_callback") -private fun batchMethodAllocationCallback(ctx: COpaquePointer?): CValue { - val ctx = ctx!!.asStableRef().get() - val request = BatchedCallbackRequest(ctx.server, ctx.cq) - return request.toRaw() +private fun lookupCallAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = LookupServerCallTag(ctx.cq, ctx.server.fallbackRegistry) + return request.toRawCallAllocation() } -private open class MethodAllocationCtx( +/** + * A context to pass dynamic information to the [lookupCallAllocationCallback] and + * [registeredCallAllocationCallback] callbacks. + */ +private open class CallAllocationCtx( val server: NativeServer, val cq: CompletionQueue, ) -private class RegisteredMethodAllocationCtx( +/** + * A context to pass dynamic information to the [registeredCallAllocationCallback] callback. + */ +private class RegisteredCallAllocationCtx( server: NativeServer, cq: CompletionQueue, val method: ServerMethodDefinition<*, *>, -) : MethodAllocationCtx(server, cq) +) : CallAllocationCtx(server, cq) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index 129a1b137..dc941af13 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -44,15 +44,23 @@ internal class NativeServerCall( // ownership is transferred to the call val raw: CPointer, val cq: CompletionQueue, - val methodDescriptor: MethodDescriptor, ) : ServerCall() { + constructor( + raw: CPointer, + cq: CompletionQueue, + methodDescriptor: MethodDescriptor, + ) : this(raw, cq) { + setMethodDescriptor(methodDescriptor) + } + @Suppress("unused") private val rawCleaner = createCleaner(raw) { grpc_call_unref(it) } private var listener = DeferredCallListener() + private var methodDescriptor: MethodDescriptor? = null private var callbackMutex = ReentrantLock() private var initialized = false private var cancelled = false @@ -68,6 +76,18 @@ internal class NativeServerCall( initialize() } + /** + * Sets the method descriptor for this call. + * It must be set before invoking [ServerCallHandler.startCall]. + */ + fun setMethodDescriptor(methodDescriptor: MethodDescriptor) { + this.methodDescriptor = methodDescriptor + } + + /** + * Set the listener created from [ServerCallHandler.startCall]. + * This must be set directly after receiving the listener from the `startCall` invocation. + */ fun setListener(listener: Listener) { this.listener.setDelegate(listener) } @@ -114,7 +134,7 @@ internal class NativeServerCall( } } - private fun cancelCall(status: grpc_status_code, message: String) { + fun cancel(status: grpc_status_code, message: String) { grpc_call_cancel_with_status(raw, status, message, null) } @@ -146,7 +166,7 @@ internal class NativeServerCall( try { onSuccess() } catch (e: Throwable) { - cancelCall(grpc_status_code.GRPC_STATUS_INTERNAL, e.message ?: "Unknown error") + cancel(grpc_status_code.GRPC_STATUS_INTERNAL, e.message ?: "Unknown error") } finally { cleanup() } @@ -155,12 +175,12 @@ internal class NativeServerCall( BatchResult.CQShutdown -> { cleanup() - cancelCall(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Server shutdown") + cancel(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Server shutdown") } is BatchResult.SubmitError -> { cleanup() - cancelCall( + cancel( grpc_status_code.GRPC_STATUS_INTERNAL, "Batch could not be submitted: ${result.error}" ) @@ -172,6 +192,7 @@ internal class NativeServerCall( check(initialized) { internalError("Call not initialized") } // TODO: Remove the num constraint require(numMessages == 1) { internalError("numMessages must be 1") } + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } val arena = Arena() val recvPtr = arena.alloc>() @@ -189,7 +210,7 @@ internal class NativeServerCall( if (buf == null) { // end-of-stream observed. for UNARY, absence of any request is a protocol violation. if (methodDescriptor.type == MethodType.UNARY && !receivedFirstMessage) { - cancelCall( + cancel( grpc_status_code.GRPC_STATUS_INTERNAL, "Unary call half-closed before receiving a request message" ) @@ -208,7 +229,6 @@ internal class NativeServerCall( listener.onMessage(msg) } } finally { - // destroy the buffer, we don't need it anymore grpc_byte_buffer_destroy(buf) } } @@ -218,6 +238,7 @@ internal class NativeServerCall( override fun sendHeaders(headers: GrpcTrailers) { check(initialized) { internalError("Call not initialized") } val arena = Arena() + // TODO: Implement header metadata operation val op = arena.alloc { op = GRPC_OP_SEND_INITIAL_METADATA data.send_initial_metadata.count = 0u @@ -232,6 +253,8 @@ internal class NativeServerCall( override fun sendMessage(message: Response) { check(initialized) { internalError("Call not initialized") } check(isReady()) { internalError("Not yet ready.") } + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } + val arena = Arena() val inputStream = methodDescriptor.getResponseMarshaller().stream(message) val byteBuffer = inputStream.asSource().toGrpcByteBuffer() @@ -261,7 +284,7 @@ internal class NativeServerCall( } val op = arena.alloc { op = GRPC_OP_SEND_STATUS_FROM_SERVER - data.send_status_from_server.status = status.statusCode.toRaw() + data.send_status_from_server.status = status.statusCode.toRawCallAllocation() data.send_status_from_server.status_details = details?.ptr data.send_status_from_server.trailing_metadata_count = 0u data.send_status_from_server.trailing_metadata = null @@ -280,10 +303,19 @@ internal class NativeServerCall( } override fun getMethodDescriptor(): MethodDescriptor { + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } return methodDescriptor } } +/** + * A listener implementation that defers execution of its methods until a delegate is set. + * + * This class extends `ServerCall.Listener`, allowing it to serve as a listener for server calls. + * Initially, incoming method calls (e.g., `onMessage`, `onHalfClose`, etc.) are queued until a delegate + * is assigned through the `setDelegate` method. Once the delegate is set, queued methods are delivered + * in order and all future method calls are forwarded directly to the delegate. + */ private class DeferredCallListener : ServerCall.Listener() { @Volatile private var delegate: ServerCall.Listener? = null diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt similarity index 51% rename from grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt rename to grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt index ad7041c0b..58f58d21d 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCallbackRequest.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt @@ -16,45 +16,41 @@ import kotlinx.cinterop.cValue import kotlinx.cinterop.ptr import kotlinx.cinterop.value import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.HandlerRegistry import libkgrpc.gpr_timespec import libkgrpc.grpc_call_details import libkgrpc.grpc_call_details_destroy import libkgrpc.grpc_metadata_array import libkgrpc.grpc_metadata_array_destroy +import libkgrpc.grpc_status_code import libkgrpc.kgrpc_batch_call_allocation import libkgrpc.kgrpc_registered_call_allocation import kotlin.experimental.ExperimentalNativeApi -internal class ServerCallbackRequest( - val server: NativeServer, - val method: ServerMethodDefinition, +/** + * A [CallbackTag] that is passed to a completion queue and invoked when an incoming gRPC call for the + * registered [method] must be handled. + * + * The [toRawCallAllocation] method provides a [kgrpc_registered_call_allocation] used by the grpc runtime + * to pass the call context to this callback. + * + * An object of this type won't be garbage collected until the callback is executed. + */ +internal class RegisteredServerCallTag( val cq: CompletionQueue, + val method: ServerMethodDefinition, ) : CallbackTag { val arena = Arena() val rawCall = arena.alloc>() val rawDeadline = arena.alloc() val rawRequestMetadata = arena.alloc() - // the run() method disposes the callback request. - // so this is a self-disposing mechanism. - fun dispose() { + // the run() method disposes all resources + private fun dispose() { grpc_metadata_array_destroy(rawRequestMetadata.ptr) arena.clear() } - fun toRaw(): CValue { - return cValue { - tag = toCbTag() - call = rawCall.ptr - initial_metadata = rawRequestMetadata.ptr - deadline = rawDeadline.ptr - cq = this@ServerCallbackRequest.cq.raw - // we are currently not optimizing the initial client payload - // for unary and server streaming (payload_handling is always GRPC_SRM_PAYLOAD_NONE) - optional_payload = null - } - } - override fun run(ok: Boolean) { try { if (!ok) { @@ -76,11 +72,34 @@ internal class ServerCallbackRequest( dispose() } } + + fun toRawCallAllocation(): CValue { + return cValue { + tag = toCbTag() + call = rawCall.ptr + initial_metadata = rawRequestMetadata.ptr + deadline = rawDeadline.ptr + cq = this@RegisteredServerCallTag.cq.raw + // we are currently not optimizing the initial client payload + // for unary and server streaming (payload_handling is always GRPC_SRM_PAYLOAD_NONE) + optional_payload = null + } + } } -internal class BatchedCallbackRequest( - val server: NativeServer, +/** + * A [CallbackTag] that is passed to a completion queue and invoked when an incoming gRPC call that was + * not registered must be handled. + * The gRPC runtime will provide information about the method being called in the [rawDetails] field. + * + * The [toRawCallAllocation] method provides a [kgrpc_registered_call_allocation] used by the grpc runtime + * to pass the call context to this callback. + * + * An object of this type won't be garbage collected until the callback is executed. + */ +internal class LookupServerCallTag( val cq: CompletionQueue, + val registry: HandlerRegistry, ) : CallbackTag { val arena = Arena() val rawCall = arena.alloc>() @@ -88,31 +107,62 @@ internal class BatchedCallbackRequest( val rawRequestMetadata = arena.alloc() val rawDetails = arena.alloc() - // the run() method disposes the callback request. - // so this is a self-disposing mechanism. - fun dispose() { + // the run() method disposes all resources + private fun dispose() { grpc_metadata_array_destroy(rawRequestMetadata.ptr) grpc_call_details_destroy(rawDetails.ptr) arena.clear() } - fun toRaw(): CValue { + override fun run(ok: Boolean) { + try { + if (!ok) { + return + } + + // TODO: check authority + // val host = rawDetails.host.toByteArray().decodeToString() + + var method = rawDetails.method.toByteArray().decodeToString() + + // gRPC preserves the '/' character in the method name, + // while the method definition omits it and starts without '/' + method = method.removePrefix("/") + + val definition = registry.lookupMethod(method) + if (definition == null) { + // the method isn't registered; closing with UNIMPLEMENTED + val call = NativeServerCall(rawCall.value!!, cq) + call.cancel(grpc_status_code.GRPC_STATUS_UNIMPLEMENTED, "Method not found: $method") + } else { + @Suppress("UNCHECKED_CAST") + run { + val callHandler = definition.getServerCallHandler() as ServerCallHandler + val call = NativeServerCall( + rawCall.value!!, + cq, + definition.getMethodDescriptor() as MethodDescriptor + ) + // TODO: Turn metadata into a kotlin GrpcTrailers. + val metadata = GrpcTrailers() + val listener = callHandler.startCall(call, metadata) + call.setListener(listener) + } + } + + } finally { + dispose() + } + } + + fun toRawCallAllocation(): CValue { return cValue { tag = toCbTag() call = rawCall.ptr initial_metadata = rawRequestMetadata.ptr details = rawDetails.ptr - cq = this@BatchedCallbackRequest.cq.raw + cq = this@LookupServerCallTag.cq.raw } } - - override fun run(ok: Boolean) { - val host = rawDetails.host.toByteArray().decodeToString() - val method = rawDetails.method.toByteArray().decodeToString() - println("Got unknown callback request trigger.") - println("Host: $host") - println("Method: $method") - dispose() - } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt index 76dc3ee58..83b807652 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt @@ -192,7 +192,7 @@ internal fun grpc_status_code.toKotlin(): StatusCode = when (this) { grpc_status_code.GRPC_STATUS__DO_NOT_USE -> error("Invalid status code: ${this.ordinal}") } -internal fun StatusCode.toRaw(): grpc_status_code = when (this) { +internal fun StatusCode.toRawCallAllocation(): grpc_status_code = when (this) { StatusCode.OK -> grpc_status_code.GRPC_STATUS_OK StatusCode.CANCELLED -> grpc_status_code.GRPC_STATUS_CANCELLED StatusCode.UNKNOWN -> grpc_status_code.GRPC_STATUS_UNKNOWN From df9f30a546879cfb2804cead968e4bc5ba6c5d50 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Fri, 5 Sep 2025 15:34:14 +0200 Subject: [PATCH 08/10] grpc-native: Code cleanup Signed-off-by: Johannes Zottele --- cinterop-c/include/kgrpc.h | 6 +- .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 6 +- .../rpc/grpc/internal/AbstractNativeCall.kt | 75 ------------------- 3 files changed, 6 insertions(+), 81 deletions(-) delete mode 100644 grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt diff --git a/cinterop-c/include/kgrpc.h b/cinterop-c/include/kgrpc.h index f401dda39..1a7d0d638 100644 --- a/cinterop-c/include/kgrpc.h +++ b/cinterop-c/include/kgrpc.h @@ -63,8 +63,6 @@ bool kgrpc_iomgr_run_in_background(); * to retrieve the accept context, including `tag`, `grpc_call*`, metadata, deadline, * optional payload, and the completion queue. * - * This omits the need of handling pre-registering requests using `grpc_server_request_registered_call`. - * * @param server The gRPC C `grpc_server*` instance. * @param cq A callback-style `grpc_completion_queue*` (must be registered earlier). * @param method_tag Opaque identifier from `grpc_server_register_method()` for the RPC method. @@ -79,6 +77,10 @@ void kgrpc_server_set_register_method_allocator( kgrpc_registered_call_allocator allocator ); +/** + * Like kgrpc_server_set_register_method_allocator but instead of registered methods, + * it sets an allocation callback for unknown method calls. + */ void kgrpc_server_set_batch_method_allocator( grpc_server *server, grpc_completion_queue *cq, diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 63a97252c..0197c4e5e 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -71,7 +71,7 @@ class GrpcCoreClientTest { } @Test - fun normalUnaryCall_ok() = repeat(10000) { + fun normalUnaryCall_ok() = repeat(1000) { val channel = createChannel() val call = channel.newHelloCall() val req = helloReq() @@ -194,9 +194,7 @@ class GrpcCoreClientTest { fun halfCloseBeforeSendingMessage_errorWithoutCrashing() { val channel = createChannel() val call = channel.newHelloCall() - val listener = createClientCallListener( - onClose = { status, _ -> println("Status: ${status.statusCode}, Message: ${status.getDescription()}") } - ) + val listener = createClientCallListener() assertFailsWith { try { call.start(listener, GrpcTrailers()) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt deleted file mode 100644 index 9c6bfa662..000000000 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/AbstractNativeCall.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -@file:OptIn(ExperimentalForeignApi::class) - -package kotlinx.rpc.grpc.internal - -import cnames.structs.grpc_call -import kotlinx.cinterop.CPointer -import kotlinx.cinterop.ExperimentalForeignApi -import libkgrpc.grpc_op -import libkgrpc.grpc_status_code - -internal abstract class AbstractNativeCall( - val raw: CPointer, - val cq: CompletionQueue, -) { - - protected abstract val isClosed: Boolean - - protected abstract fun beginOp() - protected abstract fun endOp() - protected abstract fun cancelInternal(statusCode: grpc_status_code, message: String) - - /** - * Submits a batch operation to the [CompletionQueue] and handle the returned [BatchResult]. - * If the batch was successfully submitted, [onSuccess] is called. - * In any case, [cleanup] is called. - */ - internal fun runBatch( - ops: CPointer, - nOps: ULong, - cleanup: () -> Unit = {}, - onSuccess: () -> Unit = {}, - ) { - // we must not try to run a batch after the call is closed. - if (isClosed) return cleanup() - - // pre-book the batch, so onClose cannot be called before the batch finished. - beginOp() - - when (val callResult = cq.runBatch(this@AbstractNativeCall.raw, ops, nOps)) { - is BatchResult.Submitted -> { - callResult.future.onComplete { success -> - try { - if (success) { - onSuccess() - } - } finally { - // ignore failure, as it is reflected in the client status op - cleanup() - endOp() - } - } - } - - BatchResult.CQShutdown -> { - cleanup() - endOp() - cancelInternal(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Channel shutdown") - } - - is BatchResult.SubmitError -> { - cleanup() - endOp() - cancelInternal( - grpc_status_code.GRPC_STATUS_INTERNAL, - "Batch could not be submitted: ${callResult.error}" - ) - } - } - } - -} \ No newline at end of file From 81f26ea2ad86808b6ebd83b536dbaf4c11d50f9d Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Fri, 5 Sep 2025 15:39:54 +0200 Subject: [PATCH 09/10] grpc-native: Code cleanup Signed-off-by: Johannes Zottele --- grpc/grpc-core/build.gradle.kts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index 6acfe6c3b..88375babc 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -101,9 +101,10 @@ kotlin { } } + // configures linkReleaseTest task to build and link the test binary in RELEASE mode. + // this can be useful for performance analysis. targets.withType().configureEach { binaries { - // Ensure test binaries are created for both debug and release test( buildTypes = listOf( org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE From c9c0e1cd17e1617c0313d6fb421a4ad14e5b72b6 Mon Sep 17 00:00:00 2001 From: Johannes Zottele Date: Fri, 5 Sep 2025 20:36:51 +0200 Subject: [PATCH 10/10] grpc-native: Address PR comments Signed-off-by: Johannes Zottele --- cinterop-c/include/kgrpc.h | 1 + .../kotlinx/rpc/grpc/test/CoreClientTest.kt | 28 +++++++++---------- .../kotlinx/rpc/grpc/internal/NativeServer.kt | 3 +- .../rpc/grpc/internal/NativeServerCall.kt | 28 +++++++++---------- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/cinterop-c/include/kgrpc.h b/cinterop-c/include/kgrpc.h index 1a7d0d638..8b26aefc4 100644 --- a/cinterop-c/include/kgrpc.h +++ b/cinterop-c/include/kgrpc.h @@ -57,6 +57,7 @@ bool kgrpc_iomgr_run_in_background(); * * Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable * callback-driven method dispatch via the Core C API. + * If the C++ API is exposed to the C API, this can be removed (https://github.com/grpc/grpc/issues/40632). * * When the gRPC Core needs to accept a new call for the specified method, it invokes: * kgrpc_registered_call_allocation alloc = allocator(); diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 0197c4e5e..4b2e08c4c 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -6,6 +6,7 @@ package kotlinx.rpc.grpc.test import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout import kotlinx.rpc.grpc.GrpcServer import kotlinx.rpc.grpc.GrpcTrailers @@ -270,21 +271,20 @@ class GreeterServiceImpl : GreeterService { * Run this on JVM before executing tests. */ @Test - fun runServer() { - runBlocking { - val server = GrpcServer( - port = PORT, - builder = { registerService { GreeterServiceImpl() } } - ) + fun runServer() = runTest { + val server = GrpcServer( + port = PORT, + builder = { registerService { GreeterServiceImpl() } } + ) + + try { + server.start() + println("Server started") + server.awaitTermination() + } finally { + server.shutdown() + server.awaitTermination() - try { - server.start() - println("Server started") - server.awaitTermination() - } finally { - server.shutdown() - server.awaitTermination() - } } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt index 3c97de634..39ede8291 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -70,7 +70,8 @@ internal class NativeServer( private val cq = CompletionQueue() - val raw: CPointer = grpc_server_create(null, null)!! + val raw: CPointer = grpc_server_create(null, null) + ?: error("Failed to create server") // holds all stable references to MethodAllocationCtx objects. // the stable references must eventually be disposed. diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt index dc941af13..4fa827c46 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -59,12 +59,12 @@ internal class NativeServerCall( grpc_call_unref(it) } - private var listener = DeferredCallListener() + private val listener = DeferredCallListener() private var methodDescriptor: MethodDescriptor? = null - private var callbackMutex = ReentrantLock() + private val callbackMutex = ReentrantLock() private var initialized = false private var cancelled = false - private var finalized = atomic(false) + private val finalized = atomic(false) // Tracks whether at least one request message has been received on this call. private var receivedFirstMessage = false @@ -190,7 +190,7 @@ internal class NativeServerCall( override fun request(numMessages: Int) { check(initialized) { internalError("Call not initialized") } - // TODO: Remove the num constraint + // TODO: Remove the num constraint (KRPC-213) require(numMessages == 1) { internalError("numMessages must be 1") } val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } @@ -320,7 +320,7 @@ private class DeferredCallListener : ServerCall.Listener() { @Volatile private var delegate: ServerCall.Listener? = null private val mutex = ReentrantLock() - private val q = ArrayDeque<(ServerCall.Listener) -> Unit>() + private val queue = ArrayDeque<(ServerCall.Listener) -> Unit>() fun setDelegate(d: ServerCall.Listener) { mutex.withLock { @@ -328,26 +328,26 @@ private class DeferredCallListener : ServerCall.Listener() { delegate = d } // drain the queue - q.forEach { it(d) } - q.clear() + queue.forEach { it(d) } + queue.clear() } - private inline fun deliver(crossinline f: (ServerCall.Listener) -> Unit) { - val d = delegate - if (d != null) { + private inline fun deliver(crossinline invokeListener: (ServerCall.Listener) -> Unit) { + val currentDelegate = delegate + if (currentDelegate != null) { // fast path (delegate is already set) - f(d); return + invokeListener(currentDelegate); return } // slow path: re-check under lock - val dd = mutex.withLock { + val safeCurrentDelegate = mutex.withLock { val cur = delegate if (cur == null) { - q.addLast { f(it) } + queue.addLast { invokeListener(it) } null } else cur } // if the delegate was already set, call it - if (dd != null) f(dd) + if (safeCurrentDelegate != null) invokeListener(safeCurrentDelegate) } override fun onMessage(message: T) = deliver { it.onMessage(message) }