diff --git a/cinterop-c/MODULE.bazel.lock b/cinterop-c/MODULE.bazel.lock index 43b3be81c..4da8305f4 100644 --- a/cinterop-c/MODULE.bazel.lock +++ b/cinterop-c/MODULE.bazel.lock @@ -245,8 +245,8 @@ "https://bcr.bazel.build/modules/rules_java/7.3.2/MODULE.bazel": "50dece891cfdf1741ea230d001aa9c14398062f2b7c066470accace78e412bc2", "https://bcr.bazel.build/modules/rules_java/7.4.0/MODULE.bazel": "a592852f8a3dd539e82ee6542013bf2cadfc4c6946be8941e189d224500a8934", "https://bcr.bazel.build/modules/rules_java/7.6.1/MODULE.bazel": "2f14b7e8a1aa2f67ae92bc69d1ec0fa8d9f827c4e17ff5e5f02e91caa3b2d0fe", - "https://bcr.bazel.build/modules/rules_java/8.12.0/MODULE.bazel": "8e6590b961f2defdfc2811c089c75716cb2f06c8a4edeb9a8d85eaa64ee2a761", - "https://bcr.bazel.build/modules/rules_java/8.12.0/source.json": "cbd5d55d9d38d4008a7d00bee5b5a5a4b6031fcd4a56515c9accbcd42c7be2ba", + "https://bcr.bazel.build/modules/rules_java/8.14.0/MODULE.bazel": "717717ed40cc69994596a45aec6ea78135ea434b8402fb91b009b9151dd65615", + "https://bcr.bazel.build/modules/rules_java/8.14.0/source.json": "8a88c4ca9e8759da53cddc88123880565c520503321e2566b4e33d0287a3d4bc", "https://bcr.bazel.build/modules/rules_java/8.3.2/MODULE.bazel": "7336d5511ad5af0b8615fdc7477535a2e4e723a357b6713af439fe8cf0195017", "https://bcr.bazel.build/modules/rules_java/8.5.1/MODULE.bazel": "d8a9e38cc5228881f7055a6079f6f7821a073df3744d441978e7a43e20226939", "https://bcr.bazel.build/modules/rules_java/8.6.1/MODULE.bazel": "f4808e2ab5b0197f094cabce9f4b006a27766beb6a9975931da07099560ca9c2", @@ -534,7 +534,7 @@ }, "@@rules_foreign_cc+//foreign_cc:extensions.bzl%tools": { "general": { - "bzlTransitiveDigest": "ginC3lIGOKKivBi0nyv2igKvSiz42Thm8yaX2RwVaHg=", + "bzlTransitiveDigest": "jO6HNyY7/eIylNs2RYABjCfbAgUNb1oiXpl3aY4V/hI=", "usagesDigest": "9LXdVp01HkdYQT8gYPjYLO6VLVJHo9uFfxWaU1ymiRE=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, @@ -848,7 +848,7 @@ }, "@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": { "general": { - "bzlTransitiveDigest": "hUTp2w+RUVdL7ma5esCXZJAFnX7vLbVfLd7FwnQI6bU=", + "bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=", "usagesDigest": "QI2z8ZUR+mqtbwsf2fLqYdJAkPOHdOV+tF2yVAUgRzw=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, diff --git a/cinterop-c/include/kgrpc.h b/cinterop-c/include/kgrpc.h index 33536fe48..8b26aefc4 100644 --- a/cinterop-c/include/kgrpc.h +++ b/cinterop-c/include/kgrpc.h @@ -24,13 +24,74 @@ typedef struct { } kgrpc_cb_tag; - /* - * Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped. +// This is a duplicate of the RegisteredCallAllocation, which is defined in +// https://github.com/grpc/grpc/blob/893bdadd56dbb75fb156175afdaa2b0d47e1c15b/src/core/server/server.h#L150-L157. +// This is required, as RegisteredCallAllocation is not part of the exposed C API. +typedef struct { + void *tag; + grpc_call **call; + grpc_metadata_array *initial_metadata; + gpr_timespec *deadline; + grpc_byte_buffer **optional_payload; + grpc_completion_queue *cq; +} kgrpc_registered_call_allocation; + +typedef struct { + void* tag; + grpc_call** call; + grpc_metadata_array* initial_metadata; + grpc_call_details* details; + grpc_completion_queue* cq; +} kgrpc_batch_call_allocation; + +typedef kgrpc_registered_call_allocation (*kgrpc_registered_call_allocator)(void* ctx); +typedef kgrpc_batch_call_allocation (*kgrpc_batch_call_allocator)(void* ctx); + +/* +* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped. +*/ +bool kgrpc_iomgr_run_in_background(); + +/** + * Registers a C-style allocator callback for accepting gRPC calls to a specific method. + * + * Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable + * callback-driven method dispatch via the Core C API. + * If the C++ API is exposed to the C API, this can be removed (https://github.com/grpc/grpc/issues/40632). + * + * When the gRPC Core needs to accept a new call for the specified method, it invokes: + * kgrpc_registered_call_allocation alloc = allocator(); + * to retrieve the accept context, including `tag`, `grpc_call*`, metadata, deadline, + * optional payload, and the completion queue. + * + * @param server The gRPC C `grpc_server*` instance. + * @param cq A callback-style `grpc_completion_queue*` (must be registered earlier). + * @param method_tag Opaque identifier from `grpc_server_register_method()` for the RPC method. + * @param allocator_ctx The context for the callback to pass all necessary objects to the static function. + * @param allocator Function providing new accept contexts (`kgrpc_registered_call_allocation`). */ - bool kgrpc_iomgr_run_in_background(); +void kgrpc_server_set_register_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *method_tag, + void *allocator_ctx, + kgrpc_registered_call_allocator allocator +); + +/** + * Like kgrpc_server_set_register_method_allocator but instead of registered methods, + * it sets an allocation callback for unknown method calls. + */ +void kgrpc_server_set_batch_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *allocator_ctx, + kgrpc_batch_call_allocator allocator +); + #ifdef __cplusplus - } +} #endif #endif //GRPCPP_C_H diff --git a/cinterop-c/src/kgrpc.cpp b/cinterop-c/src/kgrpc.cpp index e2be28299..a1a979f37 100644 --- a/cinterop-c/src/kgrpc.cpp +++ b/cinterop-c/src/kgrpc.cpp @@ -2,12 +2,56 @@ #include #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/server/server.h" extern "C" { - bool kgrpc_iomgr_run_in_background() { - return grpc_iomgr_run_in_background(); - } +bool kgrpc_iomgr_run_in_background() { + return grpc_iomgr_run_in_background(); +} + +void kgrpc_server_set_register_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *method_tag, + void *allocator_ctx, + kgrpc_registered_call_allocator allocator +) { + grpc_core::Server::FromC(server)->SetRegisteredMethodAllocator( + cq, + method_tag, + [allocator_ctx, allocator] { + auto result = allocator(allocator_ctx); + return grpc_core::Server::RegisteredCallAllocation{ + .tag = result.tag, + .call = result.call, + .initial_metadata = result.initial_metadata, + .deadline = result.deadline, + .optional_payload = result.optional_payload, + .cq = result.cq, + }; + }); +} + +void kgrpc_server_set_batch_method_allocator( + grpc_server *server, + grpc_completion_queue *cq, + void *allocator_ctx, + kgrpc_batch_call_allocator allocator +) { + grpc_core::Server::FromC(server)->SetBatchMethodAllocator( + cq, + [allocator_ctx, allocator] { + auto result = allocator(allocator_ctx); + return grpc_core::Server::BatchCallAllocation{ + .tag = result.tag, + .call = result.call, + .initial_metadata = result.initial_metadata, + .details = result.details, + .cq = result.cq, + }; + }); +} } diff --git a/cinterop-c/tools/collect_headers.bzl b/cinterop-c/tools/collect_headers.bzl index 46c3dbaef..cda0fe067 100644 --- a/cinterop-c/tools/collect_headers.bzl +++ b/cinterop-c/tools/collect_headers.bzl @@ -84,12 +84,8 @@ include_dir = rule( def _cc_headers_only_impl(ctx): dep_cc = ctx.attr.dep[CcInfo].compilation_context - - # keep only source headers; this skips generated headers and their actions. - all_hdrs = dep_cc.headers.to_list() - src_hdrs = [f for f in all_hdrs if getattr(f, "is_source", False)] cc_ctx = cc_common.create_compilation_context( - headers = depset(src_hdrs), + headers = dep_cc.headers, includes = dep_cc.includes, quote_includes = dep_cc.quote_includes, system_includes = dep_cc.system_includes, diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index c71250119..88375babc 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -100,6 +100,18 @@ kotlin { extraOpts("-libraryPath", "$cLibOutDir") } } + + // configures linkReleaseTest task to build and link the test binary in RELEASE mode. + // this can be useful for performance analysis. + targets.withType().configureEach { + binaries { + test( + buildTypes = listOf( + org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE + ) + ) + } + } } configureLocalProtocGenDevelopmentDependency() diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt index 80c6b5846..bb49085cb 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt @@ -16,10 +16,12 @@ import kotlinx.rpc.internal.utils.InternalRpcApi public expect class ServerServiceDefinition { public fun getServiceDescriptor(): ServiceDescriptor public fun getMethods(): Collection> + + public fun getMethod(methodName: String): ServerMethodDefinition<*, *>? } @InternalRpcApi public expect fun serverServiceDefinition( serviceDescriptor: ServiceDescriptor, - methods: Collection> + methods: Collection>, ): ServerServiceDefinition diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 399b9c5ac..4b2e08c4c 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -8,14 +8,25 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout -import kotlinx.rpc.grpc.* -import kotlinx.rpc.grpc.internal.* +import kotlinx.rpc.grpc.GrpcServer +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.ManagedChannel +import kotlinx.rpc.grpc.ManagedChannelBuilder +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.grpc.buildChannel +import kotlinx.rpc.grpc.internal.ClientCall +import kotlinx.rpc.grpc.internal.GrpcDefaultCallOptions +import kotlinx.rpc.grpc.internal.MethodDescriptor +import kotlinx.rpc.grpc.internal.MethodType +import kotlinx.rpc.grpc.internal.clientCallListener +import kotlinx.rpc.grpc.internal.methodDescriptor +import kotlinx.rpc.grpc.statusCode import kotlinx.rpc.registerService import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertFailsWith -import kotlin.time.Duration private const val PORT = 50051 @@ -51,8 +62,12 @@ class GrpcCoreClientTest { this.timeout = timeout } - private fun shutdownAndWait(channel: ManagedChannel) { - channel.shutdown() + private fun shutdownAndWait(channel: ManagedChannel, now: Boolean = false) { + if (now) { + channel.shutdownNow() + } else { + channel.shutdown() + } runBlocking { channel.awaitTermination() } } @@ -180,10 +195,7 @@ class GrpcCoreClientTest { fun halfCloseBeforeSendingMessage_errorWithoutCrashing() { val channel = createChannel() val call = channel.newHelloCall() - val statusDeferred = CompletableDeferred() - val listener = createClientCallListener( - onClose = { status, _ -> statusDeferred.complete(status) } - ) + val listener = createClientCallListener() assertFailsWith { try { call.start(listener, GrpcTrailers()) @@ -259,7 +271,7 @@ class GreeterServiceImpl : GreeterService { * Run this on JVM before executing tests. */ @Test - fun runServer() = runTest(timeout = Duration.INFINITE) { + fun runServer() = runTest { val server = GrpcServer( port = PORT, builder = { registerService { GreeterServiceImpl() } } @@ -272,6 +284,7 @@ class GreeterServiceImpl : GreeterService { } finally { server.shutdown() server.awaitTermination() + } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt index be43d6afc..766ffbb2b 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt @@ -6,7 +6,11 @@ package kotlinx.rpc.grpc -import kotlinx.rpc.grpc.internal.* +import kotlinx.rpc.grpc.internal.GrpcChannel +import kotlinx.rpc.grpc.internal.GrpcChannelCredentials +import kotlinx.rpc.grpc.internal.GrpcInsecureChannelCredentials +import kotlinx.rpc.grpc.internal.NativeManagedChannel +import kotlinx.rpc.grpc.internal.internalError /** * Same as [ManagedChannel], but is platform-exposed. @@ -25,10 +29,10 @@ public actual abstract class ManagedChannelBuilder> internal class NativeManagedChannelBuilder( private val target: String, ) : ManagedChannelBuilder() { - private var credentials: GrpcCredentials? = null + private var credentials: GrpcChannelCredentials? = null override fun usePlaintext(): NativeManagedChannelBuilder { - credentials = GrpcInsecureCredentials() + credentials = GrpcInsecureChannelCredentials() return this } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt index e2ae9619a..08e0c4bd2 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt @@ -6,17 +6,66 @@ package kotlinx.rpc.grpc +import kotlinx.rpc.grpc.internal.MethodDescriptor +import kotlinx.rpc.grpc.internal.ServerMethodDefinition +import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap + /** * Registry of services and their methods used by servers to dispatching incoming calls. */ -public actual abstract class HandlerRegistry +public actual abstract class HandlerRegistry { + /** + * Returns the [ServerServiceDefinition]s provided by the registry, or an empty list if not + * supported by the implementation. + */ + public abstract fun getServices(): List + + /** + * Lookup a [ServerMethodDefinition] by its fully-qualified name. + * + * @param methodName to lookup [ServerMethodDefinition] for. + * @param authority the authority for the desired method (to do virtual hosting). If `null` + * the first matching method will be returned. + * @return the resolved method or `null` if no method for that name exists. + */ + public abstract fun lookupMethod( + methodName: String, authority: String?, + ): ServerMethodDefinition<*, *>? + + /** + * Lookup a [ServerMethodDefinition] by its fully-qualified name. + * + * @param methodName to lookup [ServerMethodDefinition] for. + * @return the resolved method or `null` if no method for that name exists. + */ + public fun lookupMethod(methodName: String): ServerMethodDefinition<*, *>? { + return lookupMethod(methodName, null) + } + +} internal actual class MutableHandlerRegistry : HandlerRegistry() { + + private val services = RpcInternalConcurrentHashMap() + actual fun addService(service: ServerServiceDefinition): ServerServiceDefinition? { - error("Native target is not supported in gRPC") + return services.put(service.getServiceDescriptor().getName(), service) } actual fun removeService(service: ServerServiceDefinition): Boolean { - error("Native target is not supported in gRPC") + return services.remove(service.getServiceDescriptor().getName()) != null + } + + override fun getServices(): List { + return services.values.toList() + } + + override fun lookupMethod( + methodName: String, + authority: String?, + ): ServerMethodDefinition<*, *>? { + val serviceName = MethodDescriptor.extractFullServiceName(methodName) ?: return null + val service = services[serviceName] ?: return null + return service.getMethod(methodName) } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt index 8d7ee951d..5e8f53cba 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Server.native.kt @@ -4,6 +4,10 @@ package kotlinx.rpc.grpc +import kotlinx.rpc.grpc.internal.GrpcInsecureServerCredentials +import kotlinx.rpc.grpc.internal.NativeServer +import kotlinx.rpc.grpc.internal.ServerMethodDefinition + /** * Platform-specific gRPC server builder. */ @@ -11,12 +15,53 @@ public actual abstract class ServerBuilder> { public actual abstract fun addService(service: ServerServiceDefinition): T public actual abstract fun fallbackHandlerRegistry(registry: HandlerRegistry?): T + + public abstract fun build(): Server +} + +private class NativeServerBuilder( + val port: Int, +) : ServerBuilder() { + + // TODO: Add actual credentials + private val credentials = GrpcInsecureServerCredentials() + private val services = mutableListOf() + private var fallbackRegistry: HandlerRegistry = DefaultFallbackRegistry + + override fun addService(service: ServerServiceDefinition): NativeServerBuilder { + services.add(service) + return this + } + + override fun fallbackHandlerRegistry(registry: HandlerRegistry?): NativeServerBuilder { + fallbackRegistry = registry ?: DefaultFallbackRegistry + return this + } + + override fun build(): Server { + return NativeServer(port, credentials, services, fallbackRegistry) + } + } internal actual fun ServerBuilder(port: Int): ServerBuilder<*> { - error("Native target is not supported in gRPC") + return NativeServerBuilder(port) } internal actual fun Server(builder: ServerBuilder<*>): Server { - error("Native target is not supported in gRPC") + return builder.build() +} + +private object DefaultFallbackRegistry : HandlerRegistry() { + override fun getServices(): List { + return listOf() + } + + override fun lookupMethod( + methodName: String, + authority: String?, + ): ServerMethodDefinition<*, *>? { + return null + } + } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt index 2d5da178c..31977c8b3 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.native.kt @@ -8,14 +8,20 @@ import kotlinx.rpc.grpc.internal.ServerMethodDefinition import kotlinx.rpc.grpc.internal.ServiceDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi -public actual class ServerServiceDefinition { - public actual fun getServiceDescriptor(): ServiceDescriptor { - TODO("Not yet implemented") - } - - public actual fun getMethods(): Collection> { - TODO("Not yet implemented") - } +public actual class ServerServiceDefinition internal constructor( + private val serviceDescriptor: ServiceDescriptor, + private val methods: Map>, +) { + + internal constructor(serviceDescriptor: ServiceDescriptor, methods: Collection>) : + this(serviceDescriptor, methods.associateBy { it.getMethodDescriptor().getFullMethodName() }) + + public actual fun getServiceDescriptor(): ServiceDescriptor = serviceDescriptor + + public actual fun getMethods(): Collection> = methods.values + + public actual fun getMethod(methodName: String): ServerMethodDefinition<*, *>? = methods[methodName] + } @InternalRpcApi @@ -23,5 +29,5 @@ public actual fun serverServiceDefinition( serviceDescriptor: ServiceDescriptor, methods: Collection>, ): ServerServiceDefinition { - TODO("Not yet implemented") + return ServerServiceDefinition(serviceDescriptor, methods) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt index 799964abc..329c45ae1 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt @@ -6,11 +6,34 @@ package kotlinx.rpc.grpc.internal +import cnames.structs.grpc_call import kotlinx.atomicfu.atomic import kotlinx.atomicfu.locks.SynchronizedObject import kotlinx.atomicfu.locks.synchronized -import kotlinx.cinterop.* -import libkgrpc.* +import kotlinx.cinterop.CFunction +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.alloc +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.convert +import kotlinx.cinterop.free +import kotlinx.cinterop.nativeHeap +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.sizeOf +import kotlinx.cinterop.staticCFunction +import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT +import libkgrpc.grpc_call_error +import libkgrpc.grpc_call_start_batch +import libkgrpc.grpc_completion_queue_create_for_callback +import libkgrpc.grpc_completion_queue_destroy +import libkgrpc.grpc_completion_queue_functor +import libkgrpc.grpc_completion_queue_shutdown +import libkgrpc.grpc_op +import libkgrpc.kgrpc_cb_tag +import libkgrpc.kgrpc_iomgr_run_in_background import platform.posix.memset import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner @@ -84,7 +107,7 @@ internal class CompletionQueue { @Suppress("unused") private val shutdownFunctorCleaner = createCleaner(shutdownFunctor) { nativeHeap.free(it) } - + init { // Assert grpc_iomgr_run_in_background() to guarantee that the event manager provides // IO threads and supports the callback API. @@ -95,7 +118,7 @@ internal class CompletionQueue { * Submits a batch operation to the queue. * See [BatchResult] for possible outcomes. */ - fun runBatch(call: NativeClientCall<*, *>, ops: CPointer, nOps: ULong): BatchResult { + fun runBatch(call: CPointer, ops: CPointer, nOps: ULong): BatchResult { val completion = CallbackFuture() val tag = newCbTag(completion, OPS_COMPLETE_CB) @@ -116,7 +139,7 @@ internal class CompletionQueue { return BatchResult.CQShutdown } - err = grpc_call_start_batch(call.raw, ops, nOps, tag, null) + err = grpc_call_start_batch(call, ops, nOps, tag, null) } if (err != grpc_call_error.GRPC_CALL_OK) { @@ -194,4 +217,51 @@ private fun newCbTag( private fun deleteCbTag(tag: CPointer) { tag.pointed.user_data!!.asStableRef().dispose() nativeHeap.free(tag) -} \ No newline at end of file +} + +/** + * Represents a callback tag for completion queues constructed with + * [grpc_completion_queue_create_for_callback]. + * + * The [run] method will be invoked onces the completion queue signals the completion of the operation. + * It is guaranteed that the [run] method will be only invoked once. + * + * `this` object is guaranteed to be not garbage collected until the [run] method was executed. + */ +internal interface CallbackTag { + fun run(ok: Boolean) + + /** + * Creates a pointer to a gRPC callback tag that encapsulates the given `run` function. + * It can be passed to callback-based grpc_completion_queue. + */ + fun toCbTag(): CPointer { + return newCbTag(this, staticCFunction { functor, ok -> + val tag = functor!!.reinterpret() + val callbackTag = tag.pointed.user_data!!.asStableRef().get() + // free the tag memory and release the stable reference to `this` + deleteCbTag(tag) + callbackTag.run(ok != 0) + }) + } + + companion object { + /** + * Creates a pointer to a gRPC callback tag that encapsulates the given `run` function. + * + * The `run` function will be invoked when the callback is triggered with a boolean + * value that indicates the success or failure of the operation. + * + * @return A pointer to the newly created gRPC callback tag. + * It can be passed to callback-based grpc_completion_queue. + */ + fun anonymous(run: (ok: Boolean) -> Unit): CPointer { + return object : CallbackTag { + override fun run(ok: Boolean) { + run(ok) + } + }.toCbTag() + } + } +} + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt index ef95e782e..f1a77e0d7 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt @@ -22,12 +22,7 @@ public actual class MethodDescriptor internal constructor( public actual fun getFullMethodName(): String = fullMethodName private val serviceName: String? by lazy { - val index = fullMethodName.lastIndexOf('/') - if (index == -1) { - null - } else { - fullMethodName.substring(0, index) - } + extractFullServiceName(fullMethodName) } public actual fun getServiceName(): String? = serviceName @@ -48,6 +43,17 @@ public actual class MethodDescriptor internal constructor( public actual fun stream(value: T): InputStream public actual fun parse(stream: InputStream): T } + + public companion object { + public fun extractFullServiceName(fullMethodName: String): String? { + val index = fullMethodName.lastIndexOf('/') + return if (index == -1) { + null + } else { + fullMethodName.take(index) + } + } + } } @InternalRpcApi diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt index d77bd9fbc..69c637b32 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt @@ -8,7 +8,19 @@ package kotlinx.rpc.grpc.internal import cnames.structs.grpc_call import kotlinx.atomicfu.atomic -import kotlinx.cinterop.* +import kotlinx.cinterop.Arena +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert +import kotlinx.cinterop.get +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.toKString +import kotlinx.cinterop.value import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableJob import kotlinx.rpc.grpc.GrpcTrailers @@ -16,7 +28,25 @@ import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode import kotlinx.rpc.protobuf.input.stream.asInputStream import kotlinx.rpc.protobuf.input.stream.asSource -import libkgrpc.* +import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA +import libkgrpc.GRPC_OP_RECV_MESSAGE +import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT +import libkgrpc.GRPC_OP_SEND_CLOSE_FROM_CLIENT +import libkgrpc.GRPC_OP_SEND_INITIAL_METADATA +import libkgrpc.GRPC_OP_SEND_MESSAGE +import libkgrpc.gpr_free +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_destroy +import libkgrpc.grpc_call_cancel_with_status +import libkgrpc.grpc_call_error +import libkgrpc.grpc_call_unref +import libkgrpc.grpc_metadata_array +import libkgrpc.grpc_metadata_array_destroy +import libkgrpc.grpc_metadata_array_init +import libkgrpc.grpc_op +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner @@ -163,7 +193,7 @@ internal class NativeClientCall( // pre-book the batch, so onClose cannot be called before the batch finished. beginOp() - when (val callResult = cq.runBatch(this@NativeClientCall, ops, nOps)) { + when (val callResult = cq.runBatch(this@NativeClientCall.raw, ops, nOps)) { is BatchResult.Submitted -> { callResult.future.onComplete { success -> try { @@ -220,7 +250,7 @@ internal class NativeClientCall( data.recv_status_on_client.trailing_metadata = null } - when (val callResult = cq.runBatch(this@NativeClientCall, op.ptr, 1u)) { + when (val callResult = cq.runBatch(this@NativeClientCall.raw, op.ptr, 1u)) { is BatchResult.Submitted -> { callResult.future.onComplete { val details = statusDetails.toByteArray().toKString() diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt index 3f03b92f1..7d465f4d0 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt @@ -11,19 +11,31 @@ import cnames.structs.grpc_channel_credentials import kotlinx.atomicfu.atomic import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.coroutines.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.ManagedChannel import kotlinx.rpc.grpc.ManagedChannelPlatform -import libkgrpc.* +import libkgrpc.GPR_CLOCK_REALTIME +import libkgrpc.GRPC_PROPAGATE_DEFAULTS +import libkgrpc.gpr_inf_future +import libkgrpc.grpc_channel_create +import libkgrpc.grpc_channel_create_call +import libkgrpc.grpc_channel_credentials_release +import libkgrpc.grpc_channel_destroy +import libkgrpc.grpc_insecure_credentials_create +import libkgrpc.grpc_slice_unref import kotlin.coroutines.cancellation.CancellationException import kotlin.experimental.ExperimentalNativeApi import kotlin.native.ref.createCleaner import kotlin.time.Duration /** - * Wrapper for [grpc_channel_credentials]. + * Wrapper for [cnames.structs.grpc_channel_credentials]. */ -internal sealed class GrpcCredentials( +internal sealed class GrpcChannelCredentials( internal val raw: CPointer, ) { val rawCleaner = createCleaner(raw) { @@ -34,8 +46,8 @@ internal sealed class GrpcCredentials( /** * Insecure credentials. */ -internal class GrpcInsecureCredentials() : - GrpcCredentials(grpc_insecure_credentials_create() ?: error("Failed to create credentials")) +internal class GrpcInsecureChannelCredentials() : + GrpcChannelCredentials(grpc_insecure_credentials_create() ?: error("Failed to create channel credentials")) /** @@ -47,7 +59,7 @@ internal class GrpcInsecureCredentials() : internal class NativeManagedChannel( target: String, // we must store them, otherwise the credentials are getting released - credentials: GrpcCredentials, + credentials: GrpcChannelCredentials, ) : ManagedChannel, ManagedChannelPlatform() { // a reference to make sure the grpc_init() was called. (it is released after shutdown) @@ -72,7 +84,8 @@ internal class NativeManagedChannel( override val platformApi: ManagedChannelPlatform = this private var isShutdownInternal = atomic(false) - override val isShutdown: Boolean = isShutdownInternal.value + override val isShutdown: Boolean + get() = isShutdownInternal.value private val isTerminatedInternal = CompletableDeferred() override val isTerminated: Boolean get() = isTerminatedInternal.isCompleted @@ -139,6 +152,8 @@ internal class NativeManagedChannel( reserved = null ) ?: error("Failed to create call") + grpc_slice_unref(methodNameSlice) + return NativeClientCall( cq, rawCall, methodDescriptor, callJob ) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt new file mode 100644 index 000000000..39ede8291 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt @@ -0,0 +1,244 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_server +import cnames.structs.grpc_server_credentials +import kotlinx.atomicfu.atomic +import kotlinx.cinterop.COpaquePointer +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.staticCFunction +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.withTimeoutOrNull +import kotlinx.rpc.grpc.HandlerRegistry +import kotlinx.rpc.grpc.Server +import kotlinx.rpc.grpc.ServerServiceDefinition +import libkgrpc.grpc_insecure_server_credentials_create +import libkgrpc.grpc_server_add_http2_port +import libkgrpc.grpc_server_cancel_all_calls +import libkgrpc.grpc_server_create +import libkgrpc.grpc_server_credentials_release +import libkgrpc.grpc_server_destroy +import libkgrpc.grpc_server_register_completion_queue +import libkgrpc.grpc_server_register_method +import libkgrpc.grpc_server_register_method_payload_handling +import libkgrpc.grpc_server_shutdown_and_notify +import libkgrpc.grpc_server_start +import libkgrpc.kgrpc_batch_call_allocation +import libkgrpc.kgrpc_registered_call_allocation +import libkgrpc.kgrpc_server_set_batch_method_allocator +import libkgrpc.kgrpc_server_set_register_method_allocator +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner +import kotlin.time.Duration + +/** + * Wrapper for [grpc_server_credentials]. + */ +internal sealed class GrpcServerCredentials( + internal val raw: CPointer, +) { + val rawCleaner = createCleaner(raw) { + grpc_server_credentials_release(it) + } +} + +internal class GrpcInsecureServerCredentials : + GrpcServerCredentials(grpc_insecure_server_credentials_create() ?: error("Failed to create server credentials")) + + +internal class NativeServer( + override val port: Int, + // we must reference them, otherwise the credentials are getting garbage collected + @Suppress("Redundant") + private val credentials: GrpcServerCredentials, + services: List, + val fallbackRegistry: HandlerRegistry, +) : Server { + + // a reference to make sure the grpc_init() was called. (it is released after shutdown) + @Suppress("unused") + private val rt = GrpcRuntime.acquire() + + private val cq = CompletionQueue() + + val raw: CPointer = grpc_server_create(null, null) + ?: error("Failed to create server") + + // holds all stable references to MethodAllocationCtx objects. + // the stable references must eventually be disposed. + private val callAllocationCtxs = mutableSetOf>() + + init { + grpc_server_register_completion_queue(raw, cq.raw, null) + grpc_server_add_http2_port(raw, "0.0.0.0:$port", credentials.raw) + registerServices(services) + setLookupCallAllocatorCallback() + } + + private var started = false + private var isShutdownInternal = atomic(false) + override val isShutdown: Boolean + get() = isShutdownInternal.value + + private val isTerminatedInternal = CompletableDeferred() + override val isTerminated: Boolean + get() = isTerminatedInternal.isCompleted + + override fun start(): Server { + check(!started) { internalError("Server already started") } + started = true + grpc_server_start(raw) + return this + } + + private fun dispose() { + // disposed with completion of shutdown + grpc_server_destroy(raw) + callAllocationCtxs.forEach { it.dispose() } + // release the grpc runtime, so grpc is shutdown if no other grpc servers are running. + rt.close() + } + + override fun shutdown(): Server { + if (!isShutdownInternal.compareAndSet(expect = false, update = true)) { + // shutdown only once + return this + } + + grpc_server_shutdown_and_notify(raw, cq.raw, CallbackTag.anonymous { + cq.shutdown().onComplete { + dispose() + isTerminatedInternal.complete(Unit) + } + }) + return this + } + + override fun shutdownNow(): Server { + shutdown() + grpc_server_cancel_all_calls(raw) + return this + } + + override suspend fun awaitTermination(duration: Duration): Server { + withTimeoutOrNull(duration) { + isTerminatedInternal.await() + } + return this + } + + /** + * Registers a list of server service definitions with the server. + */ + private fun registerServices(services: List) { + check(!started) { internalError("Server already started") } + + services.flatMap { it.getMethods() }.forEach { + val desc = it.getMethodDescriptor() + // to construct a valid HTTP/2 path, we must prepend the name with a slash. + // the user does not do this to align it with the java implementation. + val name = "/" + desc.getFullMethodName() + val tag = grpc_server_register_method( + server = raw, + method = name, + host = null, + // we currently don't optimize unary calls by reading the message on connection + payload_handling = grpc_server_register_method_payload_handling.GRPC_SRM_PAYLOAD_NONE, + flags = 0u + ) ?: error("Failed to register method: $name") + + val ctx = StableRef.create( + RegisteredCallAllocationCtx( + server = this, + method = it, + cq = cq, + ) + ) + callAllocationCtxs.add(ctx) + + // register the allocation callback for the method. + // it is invoked by the grpc runtime to allocate a new call context for incoming requests. + kgrpc_server_set_register_method_allocator( + server = raw, + cq = cq.raw, + method_tag = tag, + allocator_ctx = ctx.asCPointer(), + allocator = staticCFunction(::registeredCallAllocationCallback) + ) + } + } + + /** + * Configures the server with a callback to handle the allocation of method calls for incoming requests, + * which were not registered before the server started (via [registerServices]). + */ + private fun setLookupCallAllocatorCallback() { + val ctx = StableRef.create( + CallAllocationCtx( + server = this, + cq = cq, + ) + ) + callAllocationCtxs.add(ctx) + + kgrpc_server_set_batch_method_allocator( + server = raw, + cq = cq.raw, + allocator_ctx = ctx.asCPointer(), + allocator = staticCFunction(::lookupCallAllocationCallback) + ) + } + +} + +/** + * Allocates and returns a registered call allocation for a given call context. + */ +@CName("kgrpc_method_allocation_callback") +private fun registeredCallAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = RegisteredServerCallTag(ctx.cq, ctx.method) + return request.toRawCallAllocation() +} + +/** + * A static callback that is invoked by the grpc runtime to allocate a new [kgrpc_registered_call_allocation]. + * As the [LookupServerCallTag] is a [CallbackTag] it won't be garbage collected until the callback is executed. + * + * @param ctx A pointer to a [CallAllocationCtx] object + */ +@CName("kgrpc_method_allocation_callback") +private fun lookupCallAllocationCallback(ctx: COpaquePointer?): CValue { + val ctx = ctx!!.asStableRef().get() + val request = LookupServerCallTag(ctx.cq, ctx.server.fallbackRegistry) + return request.toRawCallAllocation() +} + +/** + * A context to pass dynamic information to the [lookupCallAllocationCallback] and + * [registeredCallAllocationCallback] callbacks. + */ +private open class CallAllocationCtx( + val server: NativeServer, + val cq: CompletionQueue, +) + +/** + * A context to pass dynamic information to the [registeredCallAllocationCallback] callback. + */ +private class RegisteredCallAllocationCtx( + server: NativeServer, + cq: CompletionQueue, + val method: ServerMethodDefinition<*, *>, +) : CallAllocationCtx(server, cq) + + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt new file mode 100644 index 000000000..4fa827c46 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt @@ -0,0 +1,357 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.ReentrantLock +import kotlinx.atomicfu.locks.withLock +import kotlinx.cinterop.Arena +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.IntVar +import kotlinx.cinterop.alloc +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.value +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.protobuf.input.stream.asInputStream +import kotlinx.rpc.protobuf.input.stream.asSource +import libkgrpc.GRPC_OP_RECV_CLOSE_ON_SERVER +import libkgrpc.GRPC_OP_RECV_MESSAGE +import libkgrpc.GRPC_OP_SEND_INITIAL_METADATA +import libkgrpc.GRPC_OP_SEND_MESSAGE +import libkgrpc.GRPC_OP_SEND_STATUS_FROM_SERVER +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_destroy +import libkgrpc.grpc_call_cancel_with_status +import libkgrpc.grpc_call_unref +import libkgrpc.grpc_op +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code +import kotlin.concurrent.Volatile +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner + +internal class NativeServerCall( + // ownership is transferred to the call + val raw: CPointer, + val cq: CompletionQueue, +) : ServerCall() { + + constructor( + raw: CPointer, + cq: CompletionQueue, + methodDescriptor: MethodDescriptor, + ) : this(raw, cq) { + setMethodDescriptor(methodDescriptor) + } + + @Suppress("unused") + private val rawCleaner = createCleaner(raw) { + grpc_call_unref(it) + } + + private val listener = DeferredCallListener() + private var methodDescriptor: MethodDescriptor? = null + private val callbackMutex = ReentrantLock() + private var initialized = false + private var cancelled = false + private val finalized = atomic(false) + + // Tracks whether at least one request message has been received on this call. + private var receivedFirstMessage = false + + // we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192) + private val ready = atomic(true) + + init { + initialize() + } + + /** + * Sets the method descriptor for this call. + * It must be set before invoking [ServerCallHandler.startCall]. + */ + fun setMethodDescriptor(methodDescriptor: MethodDescriptor) { + this.methodDescriptor = methodDescriptor + } + + /** + * Set the listener created from [ServerCallHandler.startCall]. + * This must be set directly after receiving the listener from the `startCall` invocation. + */ + fun setListener(listener: Listener) { + this.listener.setDelegate(listener) + } + + private fun initialize() { + // finishes if the whole connection is closed. + // this triggers onClose()/onCanceled() callback. + val arena = Arena() + val cancelled = arena.alloc() + val op = arena.alloc { + op = GRPC_OP_RECV_CLOSE_ON_SERVER + data.recv_close_on_server.cancelled = cancelled.ptr + } + val result = cq.runBatch(raw, op.ptr, 1u) + if (result !is BatchResult.Submitted) { + // we couldn't submit the initialization batch, so nothing can be done. + arena.clear() + finalize(true) + } else { + initialized = true + result.future.onComplete { + val cancelled = cancelled.value == 1 + arena.clear() + finalize(cancelled) + } + } + } + + /** + * Called when the call is closed (both by the client and the server). + */ + private fun finalize(cancelled: Boolean) { + if (finalized.compareAndSet(expect = false, update = true)) { + if (cancelled) { + this.cancelled = true + callbackMutex.withLock { + listener.onCancel() + } + } else { + callbackMutex.withLock { + listener.onComplete() + } + } + } + } + + fun cancel(status: grpc_status_code, message: String) { + grpc_call_cancel_with_status(raw, status, message, null) + } + + /** + * Sets the [ready] flag to true and calls the listener's onReady callback. + * This is called as soon as the RECV_MESSAGE batch is finished (or failed). + */ + private fun turnReady() { + callbackMutex.withLock { + if (ready.compareAndSet(expect = false, update = true)) { + listener.onReady() + } + } + } + + override fun isReady(): Boolean { + return ready.value + } + + private fun runBatch( + ops: CPointer, + nOps: ULong, + cleanup: () -> Unit = {}, + onSuccess: () -> Unit = {}, + ) { + when (val result = cq.runBatch(raw, ops, nOps)) { + is BatchResult.Submitted -> { + result.future.onComplete { + try { + onSuccess() + } catch (e: Throwable) { + cancel(grpc_status_code.GRPC_STATUS_INTERNAL, e.message ?: "Unknown error") + } finally { + cleanup() + } + } + } + + BatchResult.CQShutdown -> { + cleanup() + cancel(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Server shutdown") + } + + is BatchResult.SubmitError -> { + cleanup() + cancel( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Batch could not be submitted: ${result.error}" + ) + } + } + } + + override fun request(numMessages: Int) { + check(initialized) { internalError("Call not initialized") } + // TODO: Remove the num constraint (KRPC-213) + require(numMessages == 1) { internalError("numMessages must be 1") } + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } + + val arena = Arena() + val recvPtr = arena.alloc>() + val op = arena.alloc { + op = GRPC_OP_RECV_MESSAGE + data.recv_message.recv_message = recvPtr.ptr + } + + runBatch(op.ptr, 1u, cleanup = { + arena.clear() + }) { + // if the call was successful, but no message was received, we reached the end-of-stream. + // and thus the client half-closed. + val buf = recvPtr.value + if (buf == null) { + // end-of-stream observed. for UNARY, absence of any request is a protocol violation. + if (methodDescriptor.type == MethodType.UNARY && !receivedFirstMessage) { + cancel( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Unary call half-closed before receiving a request message" + ) + } else { + callbackMutex.withLock { + listener.onHalfClose() + } + } + } else { + try { + val msg = methodDescriptor.getRequestMarshaller() + .parse(buf.toKotlin().asInputStream()) + // Mark that we have received at least one request message + receivedFirstMessage = true + callbackMutex.withLock { + listener.onMessage(msg) + } + } finally { + grpc_byte_buffer_destroy(buf) + } + } + } + } + + override fun sendHeaders(headers: GrpcTrailers) { + check(initialized) { internalError("Call not initialized") } + val arena = Arena() + // TODO: Implement header metadata operation + val op = arena.alloc { + op = GRPC_OP_SEND_INITIAL_METADATA + data.send_initial_metadata.count = 0u + data.send_initial_metadata.metadata = null + } + + runBatch(op.ptr, 1u, cleanup = { arena.clear() }) { + // nothing to do here + } + } + + override fun sendMessage(message: Response) { + check(initialized) { internalError("Call not initialized") } + check(isReady()) { internalError("Not yet ready.") } + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } + + val arena = Arena() + val inputStream = methodDescriptor.getResponseMarshaller().stream(message) + val byteBuffer = inputStream.asSource().toGrpcByteBuffer() + ready.value = false + + val op = arena.alloc { + op = GRPC_OP_SEND_MESSAGE + data.send_message.send_message = byteBuffer + } + + runBatch(op.ptr, 1u, cleanup = { + arena.clear() + grpc_byte_buffer_destroy(byteBuffer) + }) { + turnReady() + } + } + + override fun close(status: Status, trailers: GrpcTrailers) { + check(initialized) { internalError("Call not initialized") } + val arena = Arena() + + val details = status.getDescription()?.let { + arena.alloc { + it.toGrpcSlice() + } + } + val op = arena.alloc { + op = GRPC_OP_SEND_STATUS_FROM_SERVER + data.send_status_from_server.status = status.statusCode.toRawCallAllocation() + data.send_status_from_server.status_details = details?.ptr + data.send_status_from_server.trailing_metadata_count = 0u + data.send_status_from_server.trailing_metadata = null + } + + runBatch(op.ptr, 1u, cleanup = { + if (details != null) grpc_slice_unref(details.readValue()) + arena.clear() + }) { + // nothing to do here + } + } + + override fun isCancelled(): Boolean { + return cancelled + } + + override fun getMethodDescriptor(): MethodDescriptor { + val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") } + return methodDescriptor + } +} + +/** + * A listener implementation that defers execution of its methods until a delegate is set. + * + * This class extends `ServerCall.Listener`, allowing it to serve as a listener for server calls. + * Initially, incoming method calls (e.g., `onMessage`, `onHalfClose`, etc.) are queued until a delegate + * is assigned through the `setDelegate` method. Once the delegate is set, queued methods are delivered + * in order and all future method calls are forwarded directly to the delegate. + */ +private class DeferredCallListener : ServerCall.Listener() { + @Volatile + private var delegate: ServerCall.Listener? = null + private val mutex = ReentrantLock() + private val queue = ArrayDeque<(ServerCall.Listener) -> Unit>() + + fun setDelegate(d: ServerCall.Listener) { + mutex.withLock { + if (delegate != null) return + delegate = d + } + // drain the queue + queue.forEach { it(d) } + queue.clear() + } + + private inline fun deliver(crossinline invokeListener: (ServerCall.Listener) -> Unit) { + val currentDelegate = delegate + if (currentDelegate != null) { + // fast path (delegate is already set) + invokeListener(currentDelegate); return + } + // slow path: re-check under lock + val safeCurrentDelegate = mutex.withLock { + val cur = delegate + if (cur == null) { + queue.addLast { invokeListener(it) } + null + } else cur + } + // if the delegate was already set, call it + if (safeCurrentDelegate != null) invokeListener(safeCurrentDelegate) + } + + override fun onMessage(message: T) = deliver { it.onMessage(message) } + override fun onHalfClose() = deliver { it.onHalfClose() } + override fun onCancel() = deliver { it.onCancel() } + override fun onComplete() = deliver { it.onComplete() } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt index 3da552b02..b0e625840 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerMethodDefinition.native.kt @@ -7,13 +7,16 @@ package kotlinx.rpc.grpc.internal import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual class ServerMethodDefinition { +public actual class ServerMethodDefinition internal constructor( + private val methodDescriptor: MethodDescriptor, + private val serverCallHandler: ServerCallHandler, +) { public actual fun getMethodDescriptor(): MethodDescriptor { - TODO("Not yet implemented") + return methodDescriptor } public actual fun getServerCallHandler(): ServerCallHandler { - TODO("Not yet implemented") + return serverCallHandler } } @@ -21,5 +24,5 @@ public actual fun serverMethodDefinition( descriptor: MethodDescriptor, handler: ServerCallHandler, ): ServerMethodDefinition { - TODO("Not yet implemented") + return ServerMethodDefinition(descriptor, handler) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt index bf2415a08..ce890ae91 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServiceDescriptor.native.kt @@ -7,18 +7,16 @@ package kotlinx.rpc.grpc.internal import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual class ServiceDescriptor { - public actual fun getName(): String { - TODO("Not yet implemented") - } +public actual class ServiceDescriptor internal constructor( + private val name: String, + private val methods: Collection>, + private val schemaDescriptor: Any?, +) { + public actual fun getName(): String = name - public actual fun getMethods(): Collection> { - TODO("Not yet implemented") - } + public actual fun getMethods(): Collection> = methods - public actual fun getSchemaDescriptor(): Any? { - TODO("Not yet implemented") - } + public actual fun getSchemaDescriptor(): Any? = schemaDescriptor } @InternalRpcApi @@ -27,5 +25,5 @@ public actual fun serviceDescriptor( methods: Collection>, schemaDescriptor: Any?, ): ServiceDescriptor { - TODO("Not yet implemented") + return ServiceDescriptor(name, methods, schemaDescriptor) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt new file mode 100644 index 000000000..58f58d21d --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/serverCallTags.kt @@ -0,0 +1,168 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.cinterop.Arena +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.alloc +import kotlinx.cinterop.cValue +import kotlinx.cinterop.ptr +import kotlinx.cinterop.value +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.HandlerRegistry +import libkgrpc.gpr_timespec +import libkgrpc.grpc_call_details +import libkgrpc.grpc_call_details_destroy +import libkgrpc.grpc_metadata_array +import libkgrpc.grpc_metadata_array_destroy +import libkgrpc.grpc_status_code +import libkgrpc.kgrpc_batch_call_allocation +import libkgrpc.kgrpc_registered_call_allocation +import kotlin.experimental.ExperimentalNativeApi + +/** + * A [CallbackTag] that is passed to a completion queue and invoked when an incoming gRPC call for the + * registered [method] must be handled. + * + * The [toRawCallAllocation] method provides a [kgrpc_registered_call_allocation] used by the grpc runtime + * to pass the call context to this callback. + * + * An object of this type won't be garbage collected until the callback is executed. + */ +internal class RegisteredServerCallTag( + val cq: CompletionQueue, + val method: ServerMethodDefinition, +) : CallbackTag { + val arena = Arena() + val rawCall = arena.alloc>() + val rawDeadline = arena.alloc() + val rawRequestMetadata = arena.alloc() + + // the run() method disposes all resources + private fun dispose() { + grpc_metadata_array_destroy(rawRequestMetadata.ptr) + arena.clear() + } + + override fun run(ok: Boolean) { + try { + if (!ok) { + // the call has been shutdown.\ + return + } + + // create a NativeServerCall to control the underlying core call. + // ownership of the core call is transferred to the NativeServerCall. + val call = NativeServerCall(rawCall.value!!, cq, method.getMethodDescriptor()) + // TODO: Turn metadata into a kotlin GrpcTrailers. + val trailers = GrpcTrailers() + // start the actual call. + val listener = method.getServerCallHandler().startCall(call, trailers) + call.setListener(listener) + } finally { + // at this point, all return values have been transformed into kotlin ones, + // so we can safely clear all resources. + dispose() + } + } + + fun toRawCallAllocation(): CValue { + return cValue { + tag = toCbTag() + call = rawCall.ptr + initial_metadata = rawRequestMetadata.ptr + deadline = rawDeadline.ptr + cq = this@RegisteredServerCallTag.cq.raw + // we are currently not optimizing the initial client payload + // for unary and server streaming (payload_handling is always GRPC_SRM_PAYLOAD_NONE) + optional_payload = null + } + } +} + +/** + * A [CallbackTag] that is passed to a completion queue and invoked when an incoming gRPC call that was + * not registered must be handled. + * The gRPC runtime will provide information about the method being called in the [rawDetails] field. + * + * The [toRawCallAllocation] method provides a [kgrpc_registered_call_allocation] used by the grpc runtime + * to pass the call context to this callback. + * + * An object of this type won't be garbage collected until the callback is executed. + */ +internal class LookupServerCallTag( + val cq: CompletionQueue, + val registry: HandlerRegistry, +) : CallbackTag { + val arena = Arena() + val rawCall = arena.alloc>() + val rawDeadline = arena.alloc() + val rawRequestMetadata = arena.alloc() + val rawDetails = arena.alloc() + + // the run() method disposes all resources + private fun dispose() { + grpc_metadata_array_destroy(rawRequestMetadata.ptr) + grpc_call_details_destroy(rawDetails.ptr) + arena.clear() + } + + override fun run(ok: Boolean) { + try { + if (!ok) { + return + } + + // TODO: check authority + // val host = rawDetails.host.toByteArray().decodeToString() + + var method = rawDetails.method.toByteArray().decodeToString() + + // gRPC preserves the '/' character in the method name, + // while the method definition omits it and starts without '/' + method = method.removePrefix("/") + + val definition = registry.lookupMethod(method) + if (definition == null) { + // the method isn't registered; closing with UNIMPLEMENTED + val call = NativeServerCall(rawCall.value!!, cq) + call.cancel(grpc_status_code.GRPC_STATUS_UNIMPLEMENTED, "Method not found: $method") + } else { + @Suppress("UNCHECKED_CAST") + run { + val callHandler = definition.getServerCallHandler() as ServerCallHandler + val call = NativeServerCall( + rawCall.value!!, + cq, + definition.getMethodDescriptor() as MethodDescriptor + ) + // TODO: Turn metadata into a kotlin GrpcTrailers. + val metadata = GrpcTrailers() + val listener = callHandler.startCall(call, metadata) + call.setListener(listener) + } + } + + } finally { + dispose() + } + } + + fun toRawCallAllocation(): CValue { + return cValue { + tag = toCbTag() + call = rawCall.ptr + initial_metadata = rawRequestMetadata.ptr + details = rawDetails.ptr + cq = this@LookupServerCallTag.cq.raw + } + } +} + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt index 69bc3b1ee..83b807652 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt @@ -6,11 +6,41 @@ package kotlinx.rpc.grpc.internal -import kotlinx.cinterop.* -import kotlinx.io.* +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.plus +import kotlinx.cinterop.ptr +import kotlinx.cinterop.readValue +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.sizeOf +import kotlinx.cinterop.useContents +import kotlinx.cinterop.usePinned +import kotlinx.io.Buffer +import kotlinx.io.InternalIoApi +import kotlinx.io.Sink +import kotlinx.io.Source +import kotlinx.io.UnsafeIoApi import kotlinx.io.unsafe.UnsafeBufferOperations import kotlinx.rpc.grpc.StatusCode -import libkgrpc.* +import libkgrpc.grpc_byte_buffer +import libkgrpc.grpc_byte_buffer_reader +import libkgrpc.grpc_byte_buffer_reader_destroy +import libkgrpc.grpc_byte_buffer_reader_init +import libkgrpc.grpc_byte_buffer_reader_next +import libkgrpc.grpc_raw_byte_buffer_create +import libkgrpc.grpc_slice +import libkgrpc.grpc_slice_from_copied_buffer +import libkgrpc.grpc_slice_from_copied_string +import libkgrpc.grpc_slice_malloc +import libkgrpc.grpc_slice_unref +import libkgrpc.grpc_status_code import platform.posix.memcpy internal fun internalError(message: String) { @@ -44,6 +74,7 @@ internal fun grpc_slice.toByteArray(): ByteArray = memScoped { return out } + internal fun CPointer.toKotlin(): Buffer = memScoped { val reader = alloc() check(grpc_byte_buffer_reader_init(reader.ptr, this@toKotlin) == 1) @@ -159,4 +190,24 @@ internal fun grpc_status_code.toKotlin(): StatusCode = when (this) { grpc_status_code.GRPC_STATUS_DATA_LOSS -> StatusCode.DATA_LOSS grpc_status_code.GRPC_STATUS_UNAUTHENTICATED -> StatusCode.UNAUTHENTICATED grpc_status_code.GRPC_STATUS__DO_NOT_USE -> error("Invalid status code: ${this.ordinal}") +} + +internal fun StatusCode.toRawCallAllocation(): grpc_status_code = when (this) { + StatusCode.OK -> grpc_status_code.GRPC_STATUS_OK + StatusCode.CANCELLED -> grpc_status_code.GRPC_STATUS_CANCELLED + StatusCode.UNKNOWN -> grpc_status_code.GRPC_STATUS_UNKNOWN + StatusCode.INVALID_ARGUMENT -> grpc_status_code.GRPC_STATUS_INVALID_ARGUMENT + StatusCode.DEADLINE_EXCEEDED -> grpc_status_code.GRPC_STATUS_DEADLINE_EXCEEDED + StatusCode.NOT_FOUND -> grpc_status_code.GRPC_STATUS_NOT_FOUND + StatusCode.ALREADY_EXISTS -> grpc_status_code.GRPC_STATUS_ALREADY_EXISTS + StatusCode.PERMISSION_DENIED -> grpc_status_code.GRPC_STATUS_PERMISSION_DENIED + StatusCode.RESOURCE_EXHAUSTED -> grpc_status_code.GRPC_STATUS_RESOURCE_EXHAUSTED + StatusCode.FAILED_PRECONDITION -> grpc_status_code.GRPC_STATUS_FAILED_PRECONDITION + StatusCode.ABORTED -> grpc_status_code.GRPC_STATUS_ABORTED + StatusCode.OUT_OF_RANGE -> grpc_status_code.GRPC_STATUS_OUT_OF_RANGE + StatusCode.UNIMPLEMENTED -> grpc_status_code.GRPC_STATUS_UNIMPLEMENTED + StatusCode.INTERNAL -> grpc_status_code.GRPC_STATUS_INTERNAL + StatusCode.UNAVAILABLE -> grpc_status_code.GRPC_STATUS_UNAVAILABLE + StatusCode.DATA_LOSS -> grpc_status_code.GRPC_STATUS_DATA_LOSS + StatusCode.UNAUTHENTICATED -> grpc_status_code.GRPC_STATUS_UNAUTHENTICATED } \ No newline at end of file