diff --git a/cinterop-c/BUILD.bazel b/cinterop-c/BUILD.bazel index 4e3157af4..4bb16b167 100644 --- a/cinterop-c/BUILD.bazel +++ b/cinterop-c/BUILD.bazel @@ -1,34 +1,21 @@ load("@rules_cc//cc:defs.bzl", "cc_library") -cc_binary( - name = "testdemo", - srcs = ["src/main.cpp"], - deps = [ - ":protowire", - ], -) - cc_static_library( - name = "grpcpp_c_static", + name = "kgrpc_static", deps = [ - ":grpcpp_c", + ":kgrpc", ], ) cc_library( - name = "grpcpp_c", - srcs = ["src/grpcpp_c.cpp"], - hdrs = glob(["include/**/*.h"]), + name = "kgrpc", + srcs = ["src/kgrpc.cpp"], + hdrs = glob(["include/kgrpc.h"]), copts = ["-std=c++20"], includes = ["include"], visibility = ["//visibility:public"], deps = [ - # TODO: Reduce the dependencies and only use required once. KRPC-185 - "@com_github_grpc_grpc//:channelz", - "@com_github_grpc_grpc//:generic_stub", - "@com_github_grpc_grpc//:grpc++", - "@com_github_grpc_grpc//:grpc_credentials_util", - "@com_google_protobuf//:protobuf", + "@com_github_grpc_grpc//:grpc" ], ) @@ -42,7 +29,7 @@ cc_static_library( cc_library( name = "protowire", srcs = ["src/protowire.cpp"], - hdrs = glob(["include/*.h"]), + hdrs = glob(["include/protowire.h"]), copts = ["-std=c++20"], includes = ["include"], visibility = ["//visibility:public"], diff --git a/cinterop-c/MODULE.bazel b/cinterop-c/MODULE.bazel index 136cad3ae..a9d45ecc3 100644 --- a/cinterop-c/MODULE.bazel +++ b/cinterop-c/MODULE.bazel @@ -1,5 +1,5 @@ module( - name = "grpcpp_c", + name = "kgrpc", version = "0.1", ) @@ -16,7 +16,7 @@ bazel_dep( repo_name = "com_google_protobuf", ) -# gRPC C++ library +# gRPC library bazel_dep( name = "grpc", version = "1.73.1", diff --git a/cinterop-c/include/grpcpp_c.h b/cinterop-c/include/grpcpp_c.h deleted file mode 100644 index c24e0f2d6..000000000 --- a/cinterop-c/include/grpcpp_c.h +++ /dev/null @@ -1,66 +0,0 @@ -// -// Created by Johannes Zottele on 11.07.25. -// - -#ifndef GRPCPP_C_H -#define GRPCPP_C_H - -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct grpc_client grpc_client_t; -typedef struct grpc_method grpc_method_t; -typedef struct grpc_context grpc_context_t; - -typedef enum StatusCode { - GRPC_C_STATUS_OK = 0, - GRPC_C_STATUS_CANCELLED = 1, - GRPC_C_STATUS_UNKNOWN = 2, - GRPC_C_STATUS_INVALID_ARGUMENT = 3, - GRPC_C_STATUS_DEADLINE_EXCEEDED = 4, - GRPC_C_STATUS_NOT_FOUND = 5, - GRPC_C_STATUS_ALREADY_EXISTS = 6, - GRPC_C_STATUS_PERMISSION_DENIED = 7, - GRPC_C_STATUS_UNAUTHENTICATED = 16, - GRPC_C_STATUS_RESOURCE_EXHAUSTED = 8, - GRPC_C_STATUS_FAILED_PRECONDITION = 9, - GRPC_C_STATUS_ABORTED = 10, - GRPC_C_STATUS_OUT_OF_RANGE = 11, - GRPC_C_STATUS_UNIMPLEMENTED = 12, - GRPC_C_STATUS_INTERNAL = 13, - GRPC_C_STATUS_UNAVAILABLE = 14, - GRPC_C_STATUS_DATA_LOSS = 15, - GRPC_C_STATUS_DO_NOT_USE = -1 -} grpc_status_code_t; - -grpc_client_t *grpc_client_create_insecure(const char *target); -void grpc_client_delete(const grpc_client_t *client); - -grpc_method_t *grpc_method_create(const char *method_name); -void grpc_method_delete(const grpc_method_t *method); - -const char *grpc_method_name(const grpc_method_t *method); - -grpc_context_t *grpc_context_create(); -void grpc_context_delete(const grpc_context_t *context); - -grpc_status_code_t grpc_client_call_unary_blocking(grpc_client_t *client, const char *method, - grpc_slice req_slice, grpc_slice *resp_slice); - -void grpc_client_call_unary_callback(grpc_client_t *client, grpc_method_t *method, grpc_context_t *context, - grpc_byte_buffer **req_buf, grpc_byte_buffer **resp_buf, void* callback_context, void (*callback)(grpc_status_code_t,void*)); - -uint32_t pb_decode_greeter_sayhello_response(grpc_slice response); - -grpc_status_code_t grpc_byte_buffer_dump_to_single_slice(grpc_byte_buffer *byte_buffer, grpc_slice *slice); - -#ifdef __cplusplus - } -#endif - -#endif //GRPCPP_C_H diff --git a/cinterop-c/include/kgrpc.h b/cinterop-c/include/kgrpc.h new file mode 100644 index 000000000..5c2d72e19 --- /dev/null +++ b/cinterop-c/include/kgrpc.h @@ -0,0 +1,37 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +/* + * Helper functions required for gRPC Core cinterop. + */ + +#ifndef GRPCPP_C_H +#define GRPCPP_C_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Struct that layouts a grpc_completion_queue_functor and user opaque data pointer, + * to implement the callback mechanism in the K/N CompletionQueue. + */ +typedef struct { + grpc_completion_queue_functor functor; + void *user_data; +} kgrpc_cb_tag; + +/* + * Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped. + */ +bool kgrpc_iomgr_run_in_background(); + +#ifdef __cplusplus + } +#endif + +#endif //GRPCPP_C_H diff --git a/cinterop-c/include/protowire.h b/cinterop-c/include/protowire.h index 87435e488..20dff1288 100644 --- a/cinterop-c/include/protowire.h +++ b/cinterop-c/include/protowire.h @@ -1,3 +1,7 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + #ifndef PROTOWIRE_H #define PROTOWIRE_H diff --git a/cinterop-c/src/grpcpp_c.cpp b/cinterop-c/src/grpcpp_c.cpp deleted file mode 100644 index 2dd875e18..000000000 --- a/cinterop-c/src/grpcpp_c.cpp +++ /dev/null @@ -1,201 +0,0 @@ -// -// Created by Johannes Zottele on 11.07.25. -// - -#include - -#include -#include -#include -#include -#include -#include - -namespace pb = google::protobuf; - -struct grpc_client { - std::shared_ptr channel; - std::unique_ptr stub; -}; - -struct grpc_method { - std::string name_str; - std::unique_ptr method; -}; - -struct grpc_context { - std::unique_ptr context; -}; - -extern "C" { - - grpc_client_t *grpc_client_create_insecure(const char *target) { - std::string target_str = target; - auto client = new grpc_client; - client->channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); - client->stub = std::make_unique(client->channel); - return client; - } - - void grpc_client_delete(const grpc_client_t *client) { - delete client; - } - - grpc_method_t *grpc_method_create(const char *method_name) { - auto *method = new grpc_method; - method->name_str = method_name; - method->method = std::make_unique(method->name_str.c_str(), grpc::internal::RpcMethod::NORMAL_RPC); - return method; - } - - void grpc_method_delete(const grpc_method_t *method) { - delete method; - } - - const char *grpc_method_name(const grpc_method_t *method) { - return method->method->name(); - } - - grpc_context_t *grpc_context_create() { - auto *context = new grpc_context; - context->context = std::make_unique(); - return context; - } - - void grpc_context_delete(const grpc_context_t *context) { - delete context; - } - - static grpc_status_code_t status_to_c(grpc::StatusCode status); - - grpc_status_code_t grpc_client_call_unary_blocking(grpc_client_t *client, const char *method, - grpc_slice req_slice, grpc_slice *resp_slice) { - - if (!client || !method) return GRPC_C_STATUS_INVALID_ARGUMENT; - - grpc::Slice cc_req_slice(req_slice, grpc::Slice::ADD_REF); - grpc::ByteBuffer req_bb(&cc_req_slice, 1); - - grpc::ClientContext context; - grpc::ByteBuffer resp_bb; - - const std::string method_path = "/Greeter/SayHello"; - grpc::internal::RpcMethod rpc(method_path.c_str(), - grpc::internal::RpcMethod::NORMAL_RPC); - - grpc::Status st = - grpc::internal::BlockingUnaryCall( - client->channel.get(), rpc, &context, req_bb, &resp_bb); - - - if (!st.ok()) { - // if not ok, no resp_buf is left null - return status_to_c(st.error_code()); - } - - grpc::Slice cc_resp_slice; - resp_bb.DumpToSingleSlice(&cc_resp_slice); - *resp_slice = cc_resp_slice.c_slice(); - - grpc::Slice test_slice(*resp_slice, grpc::Slice::ADD_REF); - pb::io::ArrayInputStream ais(test_slice.begin(), test_slice.size()); - pb::io::CodedInputStream cis(&ais); - - - cis.ReadTag(); - uint32_t id = 0; - if (!cis.ReadVarint32(&id)) { - std::cerr << "Failed to read id field\n"; - } - - return status_to_c(st.error_code()); - } - - void grpc_client_call_unary_callback(grpc_client_t *client, grpc_method_t *method, grpc_context_t *context, - grpc_byte_buffer **req_buf, grpc_byte_buffer **resp_buf, void* callback_context, void (*callback)(grpc_status_code_t,void*)) { - // the grpc::ByteBuffer representation is identical to (* grpc_byte_buffer) so we can safely cast it. - // so a **grpc_byte_buffer can be cast to *grpc::ByteBuffer. - static_assert(sizeof(grpc::ByteBuffer) == sizeof(grpc_byte_buffer*), - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - const auto req_bb = reinterpret_cast(req_buf); - const auto resp_bb = reinterpret_cast(resp_buf); - grpc::internal::CallbackUnaryCall(client->channel.get(), *method->method, context->context.get(), req_bb, resp_bb, [callback, callback_context](grpc::Status st) { - const auto c_st = status_to_c(st.error_code()); - callback(c_st, callback_context); - }); - } - - grpc_status_code_t status_to_c(grpc::StatusCode status) { - switch (status) { - case grpc::OK: - return GRPC_C_STATUS_OK; - case grpc::CANCELLED: - return GRPC_C_STATUS_CANCELLED; - case grpc::UNKNOWN: - return GRPC_C_STATUS_UNKNOWN; - case grpc::INVALID_ARGUMENT: - return GRPC_C_STATUS_INVALID_ARGUMENT; - case grpc::DEADLINE_EXCEEDED: - return GRPC_C_STATUS_DEADLINE_EXCEEDED; - case grpc::NOT_FOUND: - return GRPC_C_STATUS_NOT_FOUND; - case grpc::ALREADY_EXISTS: - return GRPC_C_STATUS_ALREADY_EXISTS; - case grpc::PERMISSION_DENIED: - return GRPC_C_STATUS_PERMISSION_DENIED; - case grpc::UNAUTHENTICATED: - return GRPC_C_STATUS_UNAUTHENTICATED; - case grpc::RESOURCE_EXHAUSTED: - return GRPC_C_STATUS_RESOURCE_EXHAUSTED; - case grpc::FAILED_PRECONDITION: - return GRPC_C_STATUS_FAILED_PRECONDITION; - case grpc::ABORTED: - return GRPC_C_STATUS_ABORTED; - case grpc::UNIMPLEMENTED: - return GRPC_C_STATUS_UNIMPLEMENTED; - case grpc::OUT_OF_RANGE: - return GRPC_C_STATUS_OUT_OF_RANGE; - case grpc::INTERNAL: - return GRPC_C_STATUS_INTERNAL; - case grpc::UNAVAILABLE: - return GRPC_C_STATUS_UNAVAILABLE; - case grpc::DATA_LOSS: - return GRPC_C_STATUS_DATA_LOSS; - case grpc::DO_NOT_USE: - return GRPC_C_STATUS_DO_NOT_USE; - } - } - - - uint32_t pb_decode_greeter_sayhello_response(grpc_slice response) { - grpc::Slice cc_resp_slice(response, grpc::Slice::ADD_REF); - pb::io::ArrayInputStream asi(cc_resp_slice.begin(), cc_resp_slice.size()); - pb::io::CodedInputStream cis(&asi); - - const auto tag = cis.ReadTag(); - if (tag != 8) { - std::cerr << "Failed to read tag. Got: " << tag << std::endl; - } - - uint32_t result; - if (!cis.ReadVarint32(&result)) { - std::cerr << "Failed to read result" << std::endl; - } else { - - } - return result; - } - - - grpc_status_code_t grpc_byte_buffer_dump_to_single_slice(grpc_byte_buffer *byte_buffer, grpc_slice *slice) { - auto bb = reinterpret_cast(&byte_buffer); - grpc::Slice cc_slice; - bb->DumpToSingleSlice(&cc_slice); - *slice = cc_slice.c_slice(); - return GRPC_C_STATUS_OK; - } - -} - - diff --git a/cinterop-c/src/kgrpc.cpp b/cinterop-c/src/kgrpc.cpp new file mode 100644 index 000000000..cd860db75 --- /dev/null +++ b/cinterop-c/src/kgrpc.cpp @@ -0,0 +1,17 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +#include + +#include "src/core/lib/iomgr/iomgr.h" + +extern "C" { + + bool kgrpc_iomgr_run_in_background() { + return grpc_iomgr_run_in_background(); + } + +} + + diff --git a/cinterop-c/src/protowire.cpp b/cinterop-c/src/protowire.cpp index 1a9f35ffa..e48fb768f 100644 --- a/cinterop-c/src/protowire.cpp +++ b/cinterop-c/src/protowire.cpp @@ -1,6 +1,6 @@ -// -// Created by Johannes Zottele on 17.07.25. -// +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ #include "protowire.h" diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index b6c4a6fc4..828428656 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -61,11 +61,18 @@ kotlin { implementation(libs.grpc.netty) } } + + nativeMain { + dependencies { + // required for status.proto + implementation(projects.protobuf.protobufCore) + } + } } - configureCLibCInterop(project, ":grpcpp_c_static") { cinteropCLib -> + configureCLibCInterop(project, ":kgrpc_static") { cinteropCLib -> @Suppress("unused") - val libgrpcpp_c by creating { + val libkgrpc by creating { includeDirs( cinteropCLib.resolve("include"), cinteropCLib.resolve("bazel-cinterop-c/external/grpc+/include"), diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.kt index 7984c0c3f..2c3146bd2 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.kt @@ -8,6 +8,25 @@ import kotlinx.rpc.grpc.GrpcTrailers import kotlinx.rpc.grpc.Status import kotlinx.rpc.internal.utils.InternalRpcApi +/** + * This class represents a client-side call to a server. + * It provides the interface of the gRPC-Java ClientCall class; however, semantics are slightly different + * on JVM and Native platforms. + * + * Callback execution: + * - On JVM it is guaranteed that callbacks aren't executed concurrently. + * - On Native, it is only guaranteed that `onClose` is called after all other callbacks finished. + * + * Sending message readiness: + * - On JVM, it is possible to call [sendMessage] multiple times, without checking [isReady]. + * Internally, it buffers the messages. + * - On Native, you can only call [sendMessage] when [isReady] returns true. There is no buffering; therefore, + * only one message can be sent at a time. + * + * Request message number: + * - On JVM, there is no limit on the number of messages you can [request]. + * - On Native, you can only call [request] with up to `16`. + */ @InternalRpcApi public expect abstract class ClientCall { @InternalRpcApi diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.kt index 189c0782b..fd961723d 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.kt @@ -37,6 +37,12 @@ public enum class MethodType { UNKNOWN, } +/** + * Creates a new [MethodDescriptor] instance. + * + * @param fullMethodName the full name of the method, consisting of the service name followed by a forward slash + * and the method name. It does not include a leading slash. + */ @InternalRpcApi public expect fun methodDescriptor( fullMethodName: String, diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt index 690512d89..1578c9277 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt @@ -4,23 +4,13 @@ package kotlinx.rpc.grpc.internal -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.onFailure -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.single -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext -import kotlinx.rpc.grpc.GrpcTrailers -import kotlinx.rpc.grpc.Status -import kotlinx.rpc.grpc.StatusCode -import kotlinx.rpc.grpc.StatusException -import kotlinx.rpc.grpc.code +import kotlinx.rpc.grpc.* import kotlinx.rpc.internal.utils.InternalRpcApi // heavily inspired by @@ -249,7 +239,7 @@ internal fun Flow.singleOrStatusFlow( internal suspend fun Flow.singleOrStatus( expected: String, - descriptor: Any + descriptor: Any, ): T = singleOrStatusFlow(expected, descriptor).single() internal class Ready(private val isReallyReady: () -> Boolean) { diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/RawClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/RawClientTest.kt new file mode 100644 index 000000000..18743bdac --- /dev/null +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/RawClientTest.kt @@ -0,0 +1,106 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.test.runTest +import kotlinx.rpc.grpc.ManagedChannelBuilder +import kotlinx.rpc.grpc.buildChannel +import kotlinx.rpc.grpc.internal.* +import kotlin.test.Test +import kotlin.test.assertEquals + +/** + * Tests for JVM and Native clients. + */ +// TODO: Start echo service server automatically +class RawClientTest { + + @Test + fun unaryEchoTest() = runTest( + methodName = "UnaryEcho", + type = MethodType.UNARY, + ) { channel, descriptor -> + val response = unaryRpc(channel, descriptor, EchoRequest { message = "Eccchhooo" }) + assertEquals("Eccchhooo", response.message) + } + + @Test + fun serverStreamingEchoTest() = runTest( + methodName = "ServerStreamingEcho", + type = MethodType.SERVER_STREAMING, + ) { channel, descriptor -> + val response = serverStreamingRpc(channel, descriptor, EchoRequest { message = "Eccchhooo" }) + var i = 0 + response.collect { + println("Received: ${i++}") + assertEquals("Eccchhooo", it.message) + } + } + + @Test + fun clientStreamingEchoTest() = runTest( + methodName = "ClientStreamingEcho", + type = MethodType.CLIENT_STREAMING, + ) { channel, descriptor -> + val response = clientStreamingRpc(channel, descriptor, flow { + repeat(5) { + delay(100) + println("Sending: ${it + 1}") + emit(EchoRequest { message = "Eccchhooo" }) + } + }) + val expected = "Eccchhooo, Eccchhooo, Eccchhooo, Eccchhooo, Eccchhooo" + assertEquals(expected, response.message) + } + + @Test + fun bidirectionalStreamingEchoTest() = runTest( + methodName = "BidirectionalStreamingEcho", + type = MethodType.BIDI_STREAMING, + ) { channel, descriptor -> + val response = bidirectionalStreamingRpc(channel, descriptor, flow { + repeat(5) { + emit(EchoRequest { message = "Eccchhooo" }) + } + }) + + var i = 0 + response.collect { + i++ + assertEquals("Eccchhooo", it.message) + } + assertEquals(5, i) + } + + fun runTest( + methodName: String, + type: MethodType, + block: suspend (GrpcChannel, MethodDescriptor) -> Unit, + ) = runTest { + val channel = ManagedChannelBuilder("localhost:50051") + .usePlaintext() + .buildChannel() + + val methodDescriptor = methodDescriptor( + fullMethodName = "grpc.examples.echo.Echo/$methodName", + requestCodec = EchoRequestInternal.CODEC, + responseCodec = EchoResponseInternal.CODEC, + type = type, + schemaDescriptor = Unit, + idempotent = true, + safe = true, + sampledToLocalTracing = true, + ) + + try { + block(channel.platformApi, methodDescriptor) + } finally { + channel.shutdown() + channel.awaitTermination() + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/proto/echo_grpc.proto b/grpc/grpc-core/src/commonTest/proto/echo_grpc.proto new file mode 100644 index 000000000..4c960d90b --- /dev/null +++ b/grpc/grpc-core/src/commonTest/proto/echo_grpc.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package grpc.examples.echo; + +// EchoRequest is the request for echo. +message EchoRequest { + string message = 1; +} + +// EchoResponse is the response for echo. +message EchoResponse { + string message = 1; +} + +// Echo is the echo service. +service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/proto/helloworld.proto b/grpc/grpc-core/src/commonTest/proto/helloworld.proto new file mode 100644 index 000000000..f3f815255 --- /dev/null +++ b/grpc/grpc-core/src/commonTest/proto/helloworld.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + + +// The request message containing the user's name. +message HelloRequest { + string name = 1; + optional uint32 timeout = 2; +} + + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/grpc/grpc-core/src/nativeInterop/cinterop/libgrpcpp_c.def b/grpc/grpc-core/src/nativeInterop/cinterop/libgrpcpp_c.def deleted file mode 100644 index 7ed20de3a..000000000 --- a/grpc/grpc-core/src/nativeInterop/cinterop/libgrpcpp_c.def +++ /dev/null @@ -1,6 +0,0 @@ -headers = grpcpp_c.h -headerFilter= grpcpp_c.h grpc/slice.h grpc/byte_buffer.h - -noStringConversion = grpc_slice_from_copied_buffer my_grpc_slice_from_copied_buffer - -staticLibraries = libgrpcpp_c_static.a diff --git a/grpc/grpc-core/src/nativeInterop/cinterop/libkgrpc.def b/grpc/grpc-core/src/nativeInterop/cinterop/libkgrpc.def new file mode 100644 index 000000000..59aa61bcd --- /dev/null +++ b/grpc/grpc-core/src/nativeInterop/cinterop/libkgrpc.def @@ -0,0 +1,11 @@ +headers = kgrpc.h grpc/grpc.h grpc/credentials.h grpc/byte_buffer_reader.h \ + grpc/support/alloc.h grpc/impl/propagation_bits.h + +headerFilter= kgrpc.h grpc/slice.h grpc/byte_buffer.h grpc/grpc.h \ + grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h grpc/byte_buffer_reader.h \ + grpc/support/alloc.h grpc/impl/propagation_bits.h + +noStringConversion = grpc_slice_from_copied_buffer my_grpc_slice_from_copied_buffer +strictEnums = grpc_status_code grpc_connectivity_state grpc_call_error + +staticLibraries = libkgrpc_static.a diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt index 7ad772253..be43d6afc 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt @@ -6,7 +6,7 @@ package kotlinx.rpc.grpc -import kotlinx.rpc.grpc.internal.GrpcChannel +import kotlinx.rpc.grpc.internal.* /** * Same as [ManagedChannel], but is platform-exposed. @@ -17,19 +17,41 @@ public actual abstract class ManagedChannelPlatform : GrpcChannel() * Builder class for [ManagedChannel]. */ public actual abstract class ManagedChannelBuilder> { - public actual fun usePlaintext(): T { - TODO("Not yet implemented") + public actual open fun usePlaintext(): T { + error("Builder does not support usePlaintext()") } } +internal class NativeManagedChannelBuilder( + private val target: String, +) : ManagedChannelBuilder() { + private var credentials: GrpcCredentials? = null + + override fun usePlaintext(): NativeManagedChannelBuilder { + credentials = GrpcInsecureCredentials() + return this + } + + fun buildChannel(): NativeManagedChannel { + return NativeManagedChannel( + target, + credentials = credentials ?: error("No credentials set"), + ) + } + +} + internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel { - error("Native target is not supported in gRPC") + check(this is NativeManagedChannelBuilder) { internalError("Wrong builder type, expected NativeManagedChannelBuilder") } + return buildChannel() } internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> { - error("Native target is not supported in gRPC") + return NativeManagedChannelBuilder(target = "$hostname:$port") } internal actual fun ManagedChannelBuilder(target: String): ManagedChannelBuilder<*> { - error("Native target is not supported in gRPC") + return NativeManagedChannelBuilder(target) } + + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Status.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Status.native.kt index bd7103740..b097226c7 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Status.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/Status.native.kt @@ -4,23 +4,21 @@ package kotlinx.rpc.grpc -public actual class Status { - public actual fun getDescription(): String? { - TODO("Not yet implemented") - } +public actual class Status internal constructor( + private val description: String?, + internal val statusCode: StatusCode, + private val cause: Throwable? +) { + public actual fun getDescription(): String? = description - public actual fun getCause(): Throwable? { - TODO("Not yet implemented") - } + public actual fun getCause(): Throwable? = cause } public actual val Status.code: StatusCode - get() = TODO("Not yet implemented") + get() = this.statusCode public actual fun Status( code: StatusCode, description: String?, cause: Throwable?, -): Status { - TODO("Not yet implemented") -} +): Status = Status(description, code, cause) diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/StatusException.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/StatusException.native.kt index 6c8739a96..e319178e4 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/StatusException.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/StatusException.native.kt @@ -5,37 +5,39 @@ package kotlinx.rpc.grpc public actual class StatusException : Exception { - public actual fun getStatus(): Status { - TODO("Not yet implemented") - } + private val status: Status + private val trailers: GrpcTrailers? - public actual fun getTrailers(): GrpcTrailers? { - TODO("Not yet implemented") - } + public actual constructor(status: Status) : this(status, null) - public actual constructor(status: Status) { - TODO("Not yet implemented") + public actual constructor(status: Status, trailers: GrpcTrailers?) : super( + "${status.statusCode}: ${status.getDescription()}", + status.getCause() + ) { + this.status = status + this.trailers = trailers } - public actual constructor(status: Status, trailers: GrpcTrailers?) { - TODO("Not yet implemented") - } + public actual fun getStatus(): Status = status + + public actual fun getTrailers(): GrpcTrailers? = trailers } public actual class StatusRuntimeException : RuntimeException { - public actual fun getStatus(): Status { - TODO("Not yet implemented") - } + private val status: Status + private val trailers: GrpcTrailers? - public actual fun getTrailers(): GrpcTrailers? { - TODO("Not yet implemented") - } + public actual constructor(status: Status) : this(status, null) - public actual constructor(status: Status) { - TODO("Not yet implemented") + public actual constructor(status: Status, trailers: GrpcTrailers?) : super( + "${status.statusCode}: ${status.getDescription()}", + status.getCause() + ) { + this.status = status + this.trailers = trailers } - public actual constructor(status: Status, trailers: GrpcTrailers?) { - TODO("Not yet implemented") - } + public actual fun getStatus(): Status = status + + public actual fun getTrailers(): GrpcTrailers? = trailers } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CallbackFuture.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CallbackFuture.kt new file mode 100644 index 000000000..6dbdb5b74 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CallbackFuture.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.internal + +import kotlinx.atomicfu.atomic + +/** + * Thread safe future for callbacks. + */ +internal class CallbackFuture { + private sealed interface State { + data class Pending(val callbacks: List<(T) -> Unit> = emptyList()) : State + data class Done(val value: T) : State + } + + private val state = atomic>(State.Pending()) + + fun complete(result: T) { + var toInvoke: List<(T) -> Unit> + while (true) { + when (val s = state.value) { + is State.Pending -> if (state.compareAndSet(s, State.Done(result))) { + toInvoke = s.callbacks + break + } + + is State.Done -> error("Already completed") + } + } + for (cb in toInvoke) cb(result) + } + + fun onComplete(callback: (T) -> Unit) { + while (true) { + when (val s = state.value) { + is State.Done -> { + callback(s.value); return + } + + is State.Pending -> { + val next = State.Pending(s.callbacks + callback) // copy-on-write append + if (state.compareAndSet(s, next)) return + } + } + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.native.kt index 4f2b24850..f650f9e17 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.native.kt @@ -2,11 +2,15 @@ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + package kotlinx.rpc.grpc.internal +import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.rpc.grpc.GrpcTrailers import kotlinx.rpc.grpc.Status import kotlinx.rpc.internal.utils.InternalRpcApi +import kotlin.experimental.ExperimentalNativeApi @InternalRpcApi public actual abstract class ClientCall { @@ -20,7 +24,8 @@ public actual abstract class ClientCall { public actual abstract fun halfClose() public actual abstract fun sendMessage(message: Request) public actual open fun isReady(): Boolean { - TODO("Not yet implemented") + // Default implementation returns true - subclasses can override if they need flow control + return true } @InternalRpcApi @@ -46,5 +51,21 @@ public actual fun clientCallListener( onClose: (status: Status, trailers: GrpcTrailers) -> Unit, onReady: () -> Unit, ): ClientCall.Listener { - TODO("Not yet implemented") + return object : ClientCall.Listener() { + override fun onHeaders(headers: GrpcTrailers) { + onHeaders(headers) + } + + override fun onMessage(message: Message) { + onMessage(message) + } + + override fun onClose(status: Status, trailers: GrpcTrailers) { + onClose(status, trailers) + } + + override fun onReady() { + onReady() + } + } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt new file mode 100644 index 000000000..157428ccd --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt @@ -0,0 +1,197 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class, ExperimentalStdlibApi::class) + +package kotlinx.rpc.grpc.internal + +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.SynchronizedObject +import kotlinx.atomicfu.locks.synchronized +import kotlinx.cinterop.* +import libkgrpc.* +import platform.posix.memset +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner + +/** + * The result of a batch operation (see [CompletionQueue.runBatch]). + */ +internal sealed interface BatchResult { + /** + * Happens when a batch was submitted and... + * - the queue is closed + * - the queue is in the process of a force shutdown + * - the queue is in the process of a normal shutdown, and the batch is a new `RECV_STATUS_ON_CLIENT` batch. + */ + object CQShutdown : BatchResult + + /** + * Happens when the batch couldn't be submitted for some reason. + */ + data class SubmitError(val error: grpc_call_error) : BatchResult + + /** + * Happens when the batch was successfully submitted. + * The [future] will be completed with `true` if the batch was successful, `false` otherwise. + * In the case of `false`, the status of the `RECV_STATUS_ON_CLIENT` batch will provide the error details. + */ + data class Submitted(val future: CallbackFuture) : BatchResult +} + +/** + * A thread-safe Kotlin wrapper for the native grpc_completion_queue. + * It is based on the "new" callback API; therefore, there are no kotlin-side threads required to poll + * the queue. + * Users can attach to the returned [CallbackFuture] if the batch was successfully submitted (see [BatchResult]). + */ +internal class CompletionQueue { + + internal enum class State { OPEN, SHUTTING_DOWN, CLOSED } + + // if the shutdown() was called with forceShutdown = true, + // it will reject all new batches and wait for all current ones to finish. + private val forceShutdown = atomic(false) + + // internal as it must be accessible from the SHUTDOWN_CB, + // but it shouldn't be used from outside this file. + @Suppress("PropertyName") + internal val _state = atomic(State.OPEN) + + // internal as it must be accessible from the SHUTDOWN_CB, + // but it shouldn't be used from outside this file. + @Suppress("PropertyName") + internal val _shutdownDone = CallbackFuture() + + // used to synchronize the start of a new batch operation. + private val batchStartGuard = SynchronizedObject() + + // a stable reference of this used as user_data in the shutdown callback. + private val thisStableRef = StableRef.create(this) + + // the shutdown functor/tag called when the queue is shut down. + private val shutdownFunctor = nativeHeap.alloc { + functor.functor_run = SHUTDOWN_CB + user_data = thisStableRef.asCPointer() + }.reinterpret() + + + val raw = grpc_completion_queue_create_for_callback(shutdownFunctor.ptr, null) + + @Suppress("unused") + private val thisStableRefCleaner = createCleaner(thisStableRef) { it.dispose() } + + @Suppress("unused") + private val shutdownFunctorCleaner = createCleaner(shutdownFunctor) { nativeHeap.free(it) } + + init { + // Assert grpc_iomgr_run_in_background() to guarantee that the event manager provides + // IO threads and supports the callback API. + require(kgrpc_iomgr_run_in_background()) { "The gRPC iomgr is not running background threads, required for callback based APIs." } + } + + /** + * Submits a batch operation to the queue. + * See [BatchResult] for possible outcomes. + */ + fun runBatch(call: NativeClientCall<*, *>, ops: CPointer, nOps: ULong): BatchResult { + val completion = CallbackFuture() + val tag = newCbTag(completion, OPS_COMPLETE_CB) + + var err = grpc_call_error.GRPC_CALL_ERROR + + synchronized(batchStartGuard) { + if (_state.value == State.SHUTTING_DOWN && ops.pointed.op == GRPC_OP_RECV_STATUS_ON_CLIENT) { + // if the queue is in the process of a SHUTDOWN, + // new call status receive batches will be rejected. + deleteCbTag(tag) + return BatchResult.CQShutdown + } + + if (forceShutdown.value || _state.value == State.CLOSED) { + // if the queue is either closed or in the process of a FORCE shutdown, + // new batches will instantly fail. + deleteCbTag(tag) + return BatchResult.CQShutdown + } + + err = grpc_call_start_batch(call.raw, ops, nOps, tag, null) + } + + if (err != grpc_call_error.GRPC_CALL_OK) { + // if the call was not successful, the callback will not be invoked. + deleteCbTag(tag) + return BatchResult.SubmitError(err) + } + + return BatchResult.Submitted(completion) + } + + /** + * Shuts down the queue. + * The method returns immediately, but the queue will be shut down asynchronously. + * The returned [CallbackFuture] will be completed with `Unit` when the queue is shut down. + * + * @param force if `true`, the queue will reject all new batches with [BatchResult.CQShutdown]. + * Otherwise, the queue allows submitting new batches and shutdown only when there are no more + * ongoing batches. + */ + fun shutdown(force: Boolean = false): CallbackFuture { + if (force) { + forceShutdown.value = true + } + if (!_state.compareAndSet(State.OPEN, State.SHUTTING_DOWN)) { + // the first call to shutdown() makes transition and to SHUTTING_DOWN and + // initiates shut down. all other invocations await the shutdown. + return _shutdownDone + } + + // wait until all batch operations since the state transitions were started. + // this is required to prevent batches from starting after shutdown was initialized. + // however, this lock will be available very fast, so it shouldn't be a problem.' + synchronized(batchStartGuard) { + grpc_completion_queue_shutdown(raw) + } + + return _shutdownDone + } +} + +// kq stands for kompletion_queue lol +@CName("kq_ops_complete_cb") +private fun opsCompleteCb(functor: CPointer?, ok: Int) { + val tag = functor!!.reinterpret() + val cont = tag.pointed.user_data!!.asStableRef>().get() + deleteCbTag(tag) + cont.complete(ok != 0) +} + +@CName("kq_shutdown_cb") +private fun shutdownCb(functor: CPointer?, ok: Int) { + val tag = functor!!.reinterpret() + val cq = tag.pointed.user_data!!.asStableRef().get() + cq._shutdownDone.complete(Unit) + cq._state.value = CompletionQueue.State.CLOSED + grpc_completion_queue_destroy(cq.raw) +} + +private val OPS_COMPLETE_CB = staticCFunction(::opsCompleteCb) +private val SHUTDOWN_CB = staticCFunction(::shutdownCb) + +private fun newCbTag( + userData: Any, + cb: CPointer?, Int) -> Unit>>, +): CPointer { + val tag = nativeHeap.alloc() + memset(tag.ptr, 0, sizeOf().convert()) + tag.functor.functor_run = cb + tag.user_data = StableRef.create(userData).asCPointer() + return tag.ptr +} + +@CName("kgrpc_cb_tag_destroy") +private fun deleteCbTag(tag: CPointer) { + tag.pointed.user_data!!.asStableRef().dispose() + nativeHeap.free(tag) +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcCallOptions.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcCallOptions.native.kt index 28b09cf76..f9eb10461 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcCallOptions.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcCallOptions.native.kt @@ -7,8 +7,9 @@ package kotlinx.rpc.grpc.internal import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual class GrpcCallOptions +public actual class GrpcCallOptions { + // TODO: Do something with it +} @InternalRpcApi -public actual val GrpcDefaultCallOptions: GrpcCallOptions - get() = TODO("Not yet implemented") +public actual val GrpcDefaultCallOptions: GrpcCallOptions = GrpcCallOptions() diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.native.kt index 076073f08..dd60f03e1 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/GrpcContext.native.kt @@ -8,16 +8,18 @@ import kotlin.coroutines.CoroutineContext internal actual class GrpcContext +private val currentGrpcContext = GrpcContext() + internal actual val CurrentGrpcContext: GrpcContext - get() = TODO("Not yet implemented") + get() = currentGrpcContext internal actual class GrpcContextElement : CoroutineContext.Element { actual override val key: CoroutineContext.Key - get() = TODO("Not yet implemented") + get() = Key actual companion object Key : CoroutineContext.Key { actual fun current(): GrpcContextElement { - TODO("Not yet implemented") + return GrpcContextElement() } } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt index a4774b52c..ef95e782e 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.native.kt @@ -9,42 +9,40 @@ import kotlinx.rpc.internal.utils.InternalRpcApi import kotlinx.rpc.protobuf.input.stream.InputStream @InternalRpcApi -internal actual val MethodDescriptor<*, *>.type: MethodType - get() = TODO("Not yet implemented") +public actual class MethodDescriptor internal constructor( + private val fullMethodName: String, + private val requestMarshaller: Marshaller, + private val responseMarshaller: Marshaller, + internal val methodType: MethodType, + private val schemaDescriptor: Any?, + private val idempotent: Boolean, + private val safe: Boolean, + private val sampledToLocalTracing: Boolean, +) { + public actual fun getFullMethodName(): String = fullMethodName -@InternalRpcApi -public actual class MethodDescriptor { - public actual fun getFullMethodName(): String { - TODO("Not yet implemented") + private val serviceName: String? by lazy { + val index = fullMethodName.lastIndexOf('/') + if (index == -1) { + null + } else { + fullMethodName.substring(0, index) + } } - public actual fun getServiceName(): String? { - TODO("Not yet implemented") - } + public actual fun getServiceName(): String? = serviceName - public actual fun getRequestMarshaller(): Marshaller { - TODO("Not yet implemented") - } + public actual fun getRequestMarshaller(): Marshaller = requestMarshaller - public actual fun getResponseMarshaller(): Marshaller { - TODO("Not yet implemented") - } + public actual fun getResponseMarshaller(): Marshaller = responseMarshaller - public actual fun getSchemaDescriptor(): Any? { - TODO("Not yet implemented") - } + public actual fun getSchemaDescriptor(): Any? = schemaDescriptor - public actual fun isIdempotent(): Boolean { - TODO("Not yet implemented") - } + public actual fun isIdempotent(): Boolean = idempotent - public actual fun isSafe(): Boolean { - TODO("Not yet implemented") - } + public actual fun isSafe(): Boolean = safe - public actual fun isSampledToLocalTracing(): Boolean { - TODO("Not yet implemented") - } + public actual fun isSampledToLocalTracing(): Boolean = sampledToLocalTracing public actual interface Marshaller { public actual fun stream(value: T): InputStream @@ -52,6 +50,10 @@ public actual class MethodDescriptor { } } +@InternalRpcApi +internal actual val MethodDescriptor<*, *>.type: MethodType + get() = this.methodType + @InternalRpcApi public actual fun methodDescriptor( fullMethodName: String, @@ -63,5 +65,34 @@ public actual fun methodDescriptor( safe: Boolean, sampledToLocalTracing: Boolean, ): MethodDescriptor { - TODO("Not yet implemented") + val requestMarshaller = object : MethodDescriptor.Marshaller { + override fun stream(value: Request): InputStream { + return requestCodec.encode(value) + } + + override fun parse(stream: InputStream): Request { + return requestCodec.decode(stream) + } + } + + val responseMarshaller = object : MethodDescriptor.Marshaller { + override fun stream(value: Response): InputStream { + return responseCodec.encode(value) + } + + override fun parse(stream: InputStream): Response { + return responseCodec.decode(stream) + } + } + + return MethodDescriptor( + fullMethodName = fullMethodName, + requestMarshaller = requestMarshaller, + responseMarshaller = responseMarshaller, + methodType = type, + schemaDescriptor = schemaDescriptor, + idempotent = idempotent, + safe = safe, + sampledToLocalTracing = sampledToLocalTracing, + ) } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt new file mode 100644 index 000000000..0cf37653a --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeClientCall.kt @@ -0,0 +1,383 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_call +import kotlinx.atomicfu.atomic +import kotlinx.cinterop.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableJob +import kotlinx.rpc.grpc.GrpcTrailers +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.protobuf.input.stream.asInputStream +import kotlinx.rpc.protobuf.input.stream.asSource +import libkgrpc.* +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner + + +internal class NativeClientCall( + private val cq: CompletionQueue, + internal val raw: CPointer, + private val methodDescriptor: MethodDescriptor, + private val callJob: CompletableJob, +) : ClientCall() { + + @Suppress("unused") + private val rawCleaner = createCleaner(raw) { + grpc_call_unref(it) + } + + init { + // cancel the call if the job is canceled. + callJob.invokeOnCompletion { + when (it) { + is CancellationException -> { + cancelInternal(grpc_status_code.GRPC_STATUS_CANCELLED, "Call got cancelled.") + } + + is Throwable -> { + cancelInternal(grpc_status_code.GRPC_STATUS_INTERNAL, "Call failed: ${it.message}") + } + } + } + } + + private var listener: Listener? = null + private var halfClosed = false + private var cancelled = false + private var closed = atomic(false) + + // tracks how many operations are in flight (not yet completed by the listener). + // if 0 and we got a closeInfo (containing the status), there are no more ongoing operations. + // in this case, we can safely call onClose on the listener. + // we need this mechanism to ensure that onClose is not called while any other callback is still running + // on the listener. + private val inFlight = atomic(0) + + // holds the received status information returned by the RECV_STATUS_ON_CLIENT batch. + // if null, the call is still in progress. otherwise, the call can be closed as soon as inFlight is 0. + private val closeInfo = atomic?>(null) + + // we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192) + private val ready = atomic(true) + + /** + * Increments the [inFlight] counter by one. + * This should be called before starting a batch. + */ + private fun beginOp() { + inFlight.incrementAndGet() + } + + /** + * Decrements the [inFlight] counter by one. + * This should be called after a batch has finished (in case of success AND error) + * AND the corresponding listener callback returned. + * + * If the counter reaches 0, no more listener callbacks are executed, and the call can be closed by + * calling [tryToCloseCall]. + */ + private fun endOp() { + if (inFlight.decrementAndGet() == 0) { + tryToCloseCall() + } + } + + /** + * Tries to close the call by invoking the listener's onClose callback. + * + * - If the call is already closed, this does nothing. + * - If the RECV_STATUS_ON_CLIENT batch is still in progress, this does nothing. + * - If the [inFlight] counter is not 0, this does nothing. + * - Otherwise, the listener's onClose callback is invoked and the call is closed. + */ + private fun tryToCloseCall() { + val info = closeInfo.value ?: return + if (inFlight.value == 0 && closed.compareAndSet(expect = false, update = true)) { + val lst = checkNotNull(listener) { internalError("Not yet started") } + // allows the managed channel to join for the call to finish. + callJob.complete() + lst.onClose(info.first, info.second) + } + } + + /** + * Sets the [closeInfo] and calls [tryToCloseCall]. + * This is called as soon as the RECV_STATUS_ON_CLIENT batch (started with [startRecvStatus]) finished. + */ + private fun markClosePending(status: Status, trailers: GrpcTrailers) { + if (closeInfo.compareAndSet(null, Pair(status, trailers))) { + tryToCloseCall() + } + } + + /** + * Sets the [ready] flag to true and calls the listener's onReady callback. + * This is called as soon as the RECV_MESSAGE batch is finished (or failed). + */ + private fun turnReady() { + if (ready.compareAndSet(expect = false, update = true)) { + listener?.onReady() + } + } + + + override fun start( + responseListener: Listener, + headers: GrpcTrailers, + ) { + check(listener == null) { internalError("Already started") } + check(!cancelled) { internalError("Already cancelled.") } + + listener = responseListener + + // start receiving the status from the completion queue, + // which is bound to the lifetime of the call. + val success = startRecvStatus() + if (!success) return + + // send and receive initial headers to/from the server + sendAndReceiveInitialMetadata() + } + + /** + * Submits a batch operation to the [CompletionQueue] and handle the returned [BatchResult]. + * If the batch was successfully submitted, [onSuccess] is called. + * In any case, [cleanup] is called. + */ + private fun runBatch( + ops: CPointer, + nOps: ULong, + cleanup: () -> Unit = {}, + onSuccess: () -> Unit = {}, + ) { + // we must not try to run a batch after the call is closed. + if (closed.value) return cleanup() + + // pre-book the batch, so onClose cannot be called before the batch finished. + beginOp() + + when (val callResult = cq.runBatch(this@NativeClientCall, ops, nOps)) { + is BatchResult.Submitted -> { + callResult.future.onComplete { success -> + try { + if (success) { + onSuccess() + } + } finally { + // ignore failure, as it is reflected in the client status op + cleanup() + endOp() + } + } + } + + BatchResult.CQShutdown -> { + cleanup() + endOp() + cancelInternal(grpc_status_code.GRPC_STATUS_UNAVAILABLE, "Channel shutdown") + } + + is BatchResult.SubmitError -> { + cleanup() + endOp() + cancelInternal( + grpc_status_code.GRPC_STATUS_INTERNAL, + "Batch could not be submitted: ${callResult.error}" + ) + } + } + } + + /** + * Starts a batch operation to receive the status from the completion queue (RECV_STATUS_ON_CLIENT). + * This operation is bound to the lifetime of the call, so it will finish once all other operations are done. + * If this operation fails, it will call [markClosePending] with the corresponding error, as the entire call + * si considered failed. + * + * @return true if the batch was successfully submitted, false otherwise. + * In this case, the call is considered failed. + */ + @OptIn(ExperimentalStdlibApi::class) + private fun startRecvStatus(): Boolean { + checkNotNull(listener) { internalError("Not yet started") } + val arena = Arena() + val statusCode = arena.alloc() + val statusDetails = arena.alloc() + val errorStr = arena.alloc>() + val op = arena.alloc { + op = GRPC_OP_RECV_STATUS_ON_CLIENT + data.recv_status_on_client.status = statusCode.ptr + data.recv_status_on_client.status_details = statusDetails.ptr + data.recv_status_on_client.error_string = errorStr.ptr + // TODO: trailing metadata + data.recv_status_on_client.trailing_metadata = null + } + + when (val callResult = cq.runBatch(this@NativeClientCall, op.ptr, 1u)) { + is BatchResult.Submitted -> { + callResult.future.onComplete { + val details = statusDetails.toByteArray().toKString() + val status = Status(statusCode.value.toKotlin(), details, null) + val trailers = GrpcTrailers() + + // cleanup + grpc_slice_unref(statusDetails.readValue()) + if (errorStr.value != null) gpr_free(errorStr.value) + arena.clear() + + // set close info and try to close the call. + markClosePending(status, trailers) + } + return true + } + + BatchResult.CQShutdown -> { + arena.clear() + markClosePending(Status(StatusCode.UNAVAILABLE, "Channel shutdown"), GrpcTrailers()) + return false + } + + is BatchResult.SubmitError -> { + arena.clear() + markClosePending( + Status(StatusCode.INTERNAL, "Failed to start call: ${callResult.error}"), + GrpcTrailers() + ) + return false + } + } + } + + private fun sendAndReceiveInitialMetadata() { + // sending and receiving initial metadata + val arena = Arena() + val opsNum = 2uL + val ops = arena.allocArray(opsNum.convert()) + + // send initial meta data to server + // TODO: initial metadata + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA + ops[0].data.send_initial_metadata.count = 0u + + val meta = arena.alloc() + // TODO: make metadata array an object (for lifecycle management) + grpc_metadata_array_init(meta.ptr) + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA + ops[1].data.recv_initial_metadata.recv_initial_metadata = meta.ptr + + runBatch(ops, opsNum, cleanup = { + grpc_metadata_array_destroy(meta.ptr) + arena.clear() + }) { + // TODO: Send headers to listener + } + } + + /** + * Requests [numMessages] messages from the server. + * This must only be called again after [numMessages] were received in the [Listener.onMessage] callback. + */ + override fun request(numMessages: Int) { + check(numMessages > 0) { internalError("numMessages must be > 0") } + // limit numMessages to prevent potential stack overflows + check(numMessages <= 16) { internalError("numMessages must be <= 16") } + val listener = checkNotNull(listener) { internalError("Not yet started") } + check(!cancelled) { internalError("Already cancelled") } + + var remainingMessages = numMessages + + // we need to request only one message at a time, so we use a recursive function that + // requests one message and then calls itself again. + fun post() { + if (remainingMessages-- <= 0) return + + val arena = Arena() + val recvPtr = arena.alloc>() + val op = arena.alloc { + op = GRPC_OP_RECV_MESSAGE + data.recv_message.recv_message = recvPtr.ptr + } + runBatch(op.ptr, 1u, cleanup = { + if (recvPtr.value != null) grpc_byte_buffer_destroy(recvPtr.value) + arena.clear() + }) { + // if the call was successful, but no message was received, we reached the end-of-stream. + val buf = recvPtr.value ?: return@runBatch + val msg = methodDescriptor.getResponseMarshaller() + .parse(buf.toKotlin().asInputStream()) + listener.onMessage(msg) + post() + } + } + + // start requesting messages + post() + } + + override fun cancel(message: String?, cause: Throwable?) { + cancelled = true + val message = if (cause != null) "$message: ${cause.message}" else message + cancelInternal(grpc_status_code.GRPC_STATUS_CANCELLED, message ?: "Call cancelled") + } + + private fun cancelInternal(statusCode: grpc_status_code, message: String) { + val cancelResult = grpc_call_cancel_with_status(raw, statusCode, message, null) + if (cancelResult != grpc_call_error.GRPC_CALL_OK) { + markClosePending(Status(StatusCode.INTERNAL, "Failed to cancel call: $cancelResult"), GrpcTrailers()) + } + } + + override fun halfClose() { + check(!halfClosed) { internalError("Already half closed.") } + check(!cancelled) { internalError("Already cancelled.") } + halfClosed = true + + val arena = Arena() + val op = arena.alloc { + op = GRPC_OP_SEND_CLOSE_FROM_CLIENT + } + + runBatch(op.ptr, 1u, cleanup = { arena.clear() }) { + // nothing to do here + } + } + + override fun isReady(): Boolean = ready.value + + override fun sendMessage(message: Request) { + checkNotNull(listener) { internalError("Not yet started") } + check(!halfClosed) { internalError("Already half closed.") } + check(!cancelled) { internalError("Already cancelled.") } + check(isReady()) { internalError("Not yet ready.") } + + // set ready false, as only one message can be sent at a time. + ready.value = false + + val arena = Arena() + val inputStream = methodDescriptor.getRequestMarshaller().stream(message) + val byteBuffer = inputStream.asSource().toGrpcByteBuffer() + + val op = arena.alloc { + op = GRPC_OP_SEND_MESSAGE + data.send_message.send_message = byteBuffer + } + + runBatch(op.ptr, 1u, cleanup = { + // actual cleanup + grpc_byte_buffer_destroy(byteBuffer) + arena.clear() + }) { + // set ready true, as we can now send another message. + turnReady() + } + } +} + + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeGrpcLibrary.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeGrpcLibrary.kt new file mode 100644 index 000000000..615295bec --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeGrpcLibrary.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalNativeApi::class, ExperimentalForeignApi::class) + +package kotlinx.rpc.grpc.internal + +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.reentrantLock +import kotlinx.atomicfu.locks.withLock +import kotlinx.cinterop.ExperimentalForeignApi +import libkgrpc.grpc_init +import libkgrpc.grpc_shutdown +import kotlin.experimental.ExperimentalNativeApi + +internal object GrpcRuntime { + private val refLock = reentrantLock() + private var refs = 0 + + /** Acquire a runtime reference. Must be closed exactly once. */ + fun acquire(): AutoCloseable { + refLock.withLock { + val prev = refs++ + if (prev == 0) grpc_init() + } + return object : AutoCloseable { + private val done = atomic(false) + override fun close() { + if (!done.compareAndSet(expect = false, update = true)) return + refLock.withLock { + val now = --refs + require(now >= 0) { internalError("release() without matching acquire()") } + if (now == 0) { + grpc_shutdown() + } + } + } + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt new file mode 100644 index 000000000..3f03b92f1 --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeManagedChannel.kt @@ -0,0 +1,151 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import cnames.structs.grpc_channel +import cnames.structs.grpc_channel_credentials +import kotlinx.atomicfu.atomic +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.coroutines.* +import kotlinx.rpc.grpc.ManagedChannel +import kotlinx.rpc.grpc.ManagedChannelPlatform +import libkgrpc.* +import kotlin.coroutines.cancellation.CancellationException +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner +import kotlin.time.Duration + +/** + * Wrapper for [grpc_channel_credentials]. + */ +internal sealed class GrpcCredentials( + internal val raw: CPointer, +) { + val rawCleaner = createCleaner(raw) { + grpc_channel_credentials_release(it) + } +} + +/** + * Insecure credentials. + */ +internal class GrpcInsecureCredentials() : + GrpcCredentials(grpc_insecure_credentials_create() ?: error("Failed to create credentials")) + + +/** + * Native implementation of [ManagedChannel]. + * + * @param target The target address to connect to. + * @param credentials The credentials to use for the connection. + */ +internal class NativeManagedChannel( + target: String, + // we must store them, otherwise the credentials are getting released + credentials: GrpcCredentials, +) : ManagedChannel, ManagedChannelPlatform() { + + // a reference to make sure the grpc_init() was called. (it is released after shutdown) + @Suppress("unused") + private val rt = GrpcRuntime.acquire() + + // job bundling all the call jobs created by this channel. + // this allows easy cancellation of ongoing calls. + private val callJobSupervisor = SupervisorJob() + + // the channel's completion queue, handling all request operations + private val cq = CompletionQueue() + + internal val raw: CPointer = grpc_channel_create(target, credentials.raw, null) + ?: error("Failed to create channel") + + @Suppress("unused") + private val rawCleaner = createCleaner(raw) { + grpc_channel_destroy(it) + } + + override val platformApi: ManagedChannelPlatform = this + + private var isShutdownInternal = atomic(false) + override val isShutdown: Boolean = isShutdownInternal.value + private val isTerminatedInternal = CompletableDeferred() + override val isTerminated: Boolean + get() = isTerminatedInternal.isCompleted + + override suspend fun awaitTermination(duration: Duration): Boolean { + withTimeoutOrNull(duration) { + isTerminatedInternal.await() + } ?: return false + return true + } + + override fun shutdown(): ManagedChannel { + shutdownInternal(false) + return this + } + + override fun shutdownNow(): ManagedChannel { + shutdownInternal(true) + return this + } + + private fun shutdownInternal(force: Boolean) { + isShutdownInternal.value = true + if (isTerminatedInternal.isCompleted) { + return + } + if (force) { + // cancel all jobs, such that the shutdown is completing faster (not immediate). + // TODO: replace jobs by custom pendingCallClass. + callJobSupervisor.cancelChildren(CancellationException("Channel is shutting down")) + } + + // wait for the completion queue to shut down. + // the completion queue will be shut down after all requests are completed. + // therefore, we don't have to wait for the callJobs to be completed. + cq.shutdown(force).onComplete { + if (isTerminatedInternal.complete(Unit)) { + // release the grpc runtime, so it might call grpc_shutdown() + rt.close() + } + } + } + + override fun newCall( + methodDescriptor: MethodDescriptor, + callOptions: GrpcCallOptions, + ): ClientCall { + check(!isShutdown) { internalError("Channel is shutdown") } + + val callJob = Job(callJobSupervisor) + + val methodFullName = methodDescriptor.getFullMethodName() + // to construct a valid HTTP/2 path, we must prepend the name with a slash. + // the user does not do this to align it with the java implementation. + val methodNameSlice = "/$methodFullName".toGrpcSlice() + val rawCall = grpc_channel_create_call( + channel = raw, + parent_call = null, + propagation_mask = GRPC_PROPAGATE_DEFAULTS, + completion_queue = cq.raw, + method = methodNameSlice, + host = null, + deadline = gpr_inf_future(GPR_CLOCK_REALTIME), + reserved = null + ) ?: error("Failed to create call") + + return NativeClientCall( + cq, rawCall, methodDescriptor, callJob + ) + } + + override fun authority(): String { + TODO("Not yet implemented") + } + +} diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCall.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCall.native.kt index 29b06aede..bda5a17be 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCall.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ServerCall.native.kt @@ -24,7 +24,8 @@ public actual abstract class ServerCall { public actual abstract fun close(status: Status, trailers: GrpcTrailers) public actual open fun isReady(): Boolean { - TODO("Not yet implemented") + // Default implementation returns true - subclasses can override if they need flow control + return true } public actual abstract fun isCancelled(): Boolean @@ -49,5 +50,25 @@ public actual fun serverCallListener( onComplete: (State) -> Unit, onReady: (State) -> Unit, ): ServerCall.Listener { - TODO("Not yet implemented") + return object : ServerCall.Listener() { + override fun onMessage(message: Message) { + onMessage(state, message) + } + + override fun onHalfClose() { + onHalfClose(state) + } + + override fun onCancel() { + onCancel(state) + } + + override fun onComplete() { + onComplete(state) + } + + override fun onReady() { + onReady(state) + } + } } diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcByteBuffer.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcByteBuffer.kt deleted file mode 100644 index f5df196b2..000000000 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcByteBuffer.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.rpc.grpc.internal.bridge - -import kotlinx.cinterop.* -import libgrpcpp_c.* -import kotlin.experimental.ExperimentalNativeApi -import kotlin.native.ref.createCleaner - -@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) -internal class GrpcByteBuffer internal constructor( - internal val cByteBuffer: CPointer -) { - - constructor(slice: GrpcSlice) : this(memScoped { - grpc_raw_byte_buffer_create(slice.cSlice, 1u) ?: error("Failed to create byte buffer") - }) - - init { - createCleaner(cByteBuffer) { - grpc_byte_buffer_destroy(it) - } - } - - fun intoSlice(): GrpcSlice { - memScoped { - val respSlice = alloc() - grpc_byte_buffer_dump_to_single_slice(cByteBuffer, respSlice.ptr) - return GrpcSlice(respSlice.readValue()) - } - } - -} diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcClient.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcClient.kt deleted file mode 100644 index 642712dfa..000000000 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcClient.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.rpc.grpc.internal.bridge - -import kotlinx.cinterop.* -import kotlinx.coroutines.suspendCancellableCoroutine -import libgrpcpp_c.* -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.experimental.ExperimentalNativeApi -import kotlin.native.ref.createCleaner - -@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) -internal class GrpcClient(target: String) { - private var clientPtr: CPointer = - grpc_client_create_insecure(target) ?: error("Failed to create client") - - init { - createCleaner(clientPtr) { - grpc_client_delete(it) - } - } - - fun callUnaryBlocking(method: String, req: GrpcSlice): GrpcSlice { - memScoped { - val result = alloc() - grpc_client_call_unary_blocking(clientPtr, method, req.cSlice, result.ptr) - return GrpcSlice(result.readValue()) - } - } - - suspend fun callUnary(method: String, req: GrpcByteBuffer): GrpcByteBuffer = - suspendCancellableCoroutine { continuation -> - val context = grpc_context_create() - val method = grpc_method_create(method) - - val reqRawBuf = nativeHeap.alloc>() - reqRawBuf.value = req.cByteBuffer - - val respRawBuf: CPointerVar = nativeHeap.alloc() - - val continueCb = { st: grpc_status_code_t -> - // cleanup allocations owned by this method (this runs always) - grpc_method_delete(method) - grpc_context_delete(context) - nativeHeap.free(reqRawBuf) - - if (st != GRPC_C_STATUS_OK) { - continuation.resumeWithException(RuntimeException("Call failed with code: $st")) - } else { - val result = respRawBuf.value - if (result == null) { - continuation.resumeWithException(RuntimeException("No response received")) - } else { - continuation.resume(GrpcByteBuffer(result)) - } - } - - nativeHeap.free(respRawBuf) - } - val cbCtxStable = StableRef.create(continueCb) - - grpc_client_call_unary_callback( - clientPtr, method, context, reqRawBuf.ptr, respRawBuf.ptr, - cbCtxStable.asCPointer(), staticCFunction { st, ctx -> - val cbCtxStable = ctx!!.asStableRef<(grpc_status_code_t) -> Unit>() - cbCtxStable.get()(st) - cbCtxStable.dispose() - }) - } -} diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcSlice.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcSlice.kt deleted file mode 100644 index 70ab9a515..000000000 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/bridge/GrpcSlice.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.rpc.grpc.internal.bridge - -import kotlinx.cinterop.CValue -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.addressOf -import kotlinx.cinterop.usePinned -import libgrpcpp_c.grpc_slice -import libgrpcpp_c.grpc_slice_from_copied_buffer -import libgrpcpp_c.grpc_slice_unref -import kotlin.experimental.ExperimentalNativeApi -import kotlin.native.ref.createCleaner - -@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) -internal class GrpcSlice internal constructor(internal val cSlice: CValue) { - - constructor(buffer: ByteArray) : this( - buffer.usePinned { pinned -> - grpc_slice_from_copied_buffer(pinned.addressOf(0), buffer.size.toULong()) - } - ) - - init { - createCleaner(cSlice) { - grpc_slice_unref(it) - } - } -} diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt new file mode 100644 index 000000000..69bc3b1ee --- /dev/null +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/utils.kt @@ -0,0 +1,162 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class) + +package kotlinx.rpc.grpc.internal + +import kotlinx.cinterop.* +import kotlinx.io.* +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.rpc.grpc.StatusCode +import libkgrpc.* +import platform.posix.memcpy + +internal fun internalError(message: String) { + error("Unexpected internal error: $message. Please, report the issue here: https://github.com/Kotlin/kotlinx-rpc/issues/new?template=bug_report.md") +} + +@OptIn(ExperimentalForeignApi::class, InternalIoApi::class, UnsafeIoApi::class) +internal fun Sink.writeFully(buffer: CPointer, offset: Long, length: Long) { + var consumed = 0L + while (consumed < length) { + UnsafeBufferOperations.writeToTail(this.buffer, 1) { array, start, endExclusive -> + val size = minOf(length - consumed, (endExclusive - start).toLong()) + + array.usePinned { + memcpy(it.addressOf(start), buffer + offset + consumed, size.convert()) + } + + consumed += size + size.toInt() + } + } +} + +internal fun grpc_slice.toByteArray(): ByteArray = memScoped { + val out = ByteArray(len().toInt()) + if (out.isEmpty()) return out + + out.usePinned { + memcpy(it.addressOf(0), startPtr(), len().convert()) + } + return out +} + +internal fun CPointer.toKotlin(): Buffer = memScoped { + val reader = alloc() + check(grpc_byte_buffer_reader_init(reader.ptr, this@toKotlin) == 1) + { internalError("Failed to initialized byte buffer.") } + + val out = Buffer() + val slice = alloc() + while (grpc_byte_buffer_reader_next(reader.ptr, slice.ptr) != 0) { + val dataPtr = slice.startPtr() + val len = slice.len() + + out.writeFully(dataPtr, 0, len.convert()) + grpc_slice_unref(slice.readValue()) + } + + grpc_byte_buffer_reader_destroy(reader.ptr) + return out +} + +internal fun Source.toGrpcByteBuffer(): CPointer { + if (this is Buffer) return toGrpcByteBuffer() + + val tmp = ByteArray(8192) + val slices = ArrayList>(4) + + while (true) { + val n = readAtMostTo(tmp, 0, tmp.size) + if (n <= 0) break + tmp.usePinned { + slices += grpc_slice_from_copied_buffer(it.addressOf(0), n.toULong()) + } + } + + return slices.toGrpcByteBuffer() +} + +@OptIn(UnsafeIoApi::class) +internal fun Buffer.toGrpcByteBuffer(): CPointer { + val slices = ArrayList>(4) + + while (size > 0L) { + UnsafeBufferOperations.readFromHead(this) { arr, start, end -> + val len = end - start + arr.usePinned { p -> slices += grpc_slice_from_copied_buffer(p.addressOf(start), len.toULong()) } + len + } + } + + return slices.toGrpcByteBuffer() +} + +private fun ArrayList>.toGrpcByteBuffer(): CPointer = memScoped { + val count = if (isEmpty()) 1 else size + val sliceArr = allocArray(count) + val base = sliceArr.reinterpret() + val stride = sizeOf() + + if (isEmpty()) { + val dst = base /* + 0*stride */ + val empty = grpc_slice_malloc(0u) + empty.useContents { memcpy(dst, ptr, stride.convert()) } + } else { + for (i in 0 until count) { + val dst = base + i * stride // <-- important: advance by i*size + this@toGrpcByteBuffer[i].useContents { memcpy(dst, ptr, stride.convert()) } + } + } + + + val buf = grpc_raw_byte_buffer_create(sliceArr, count.toULong())!! + // unref each slice, as the buffer takes ownership + this@toGrpcByteBuffer.forEach { grpc_slice_unref(it) } + + return buf +} + +internal fun grpc_slice.startPtr(): CPointer { + return if (this.refcount != null) { + this.data.refcounted.bytes!!.reinterpret() + } else { + this.data.inlined.bytes.reinterpret() + } +} + +internal fun grpc_slice.len(): ULong { + return if (this.refcount != null) { + this.data.refcounted.length + } else { + this.data.inlined.length.convert() + } +} + +internal fun String.toGrpcSlice(): CValue { + return grpc_slice_from_copied_string(this) +} + +internal fun grpc_status_code.toKotlin(): StatusCode = when (this) { + grpc_status_code.GRPC_STATUS_OK -> StatusCode.OK + grpc_status_code.GRPC_STATUS_CANCELLED -> StatusCode.CANCELLED + grpc_status_code.GRPC_STATUS_UNKNOWN -> StatusCode.UNKNOWN + grpc_status_code.GRPC_STATUS_INVALID_ARGUMENT -> StatusCode.INVALID_ARGUMENT + grpc_status_code.GRPC_STATUS_DEADLINE_EXCEEDED -> StatusCode.DEADLINE_EXCEEDED + grpc_status_code.GRPC_STATUS_NOT_FOUND -> StatusCode.NOT_FOUND + grpc_status_code.GRPC_STATUS_ALREADY_EXISTS -> StatusCode.ALREADY_EXISTS + grpc_status_code.GRPC_STATUS_PERMISSION_DENIED -> StatusCode.PERMISSION_DENIED + grpc_status_code.GRPC_STATUS_RESOURCE_EXHAUSTED -> StatusCode.RESOURCE_EXHAUSTED + grpc_status_code.GRPC_STATUS_FAILED_PRECONDITION -> StatusCode.FAILED_PRECONDITION + grpc_status_code.GRPC_STATUS_ABORTED -> StatusCode.ABORTED + grpc_status_code.GRPC_STATUS_OUT_OF_RANGE -> StatusCode.OUT_OF_RANGE + grpc_status_code.GRPC_STATUS_UNIMPLEMENTED -> StatusCode.UNIMPLEMENTED + grpc_status_code.GRPC_STATUS_INTERNAL -> StatusCode.INTERNAL + grpc_status_code.GRPC_STATUS_UNAVAILABLE -> StatusCode.UNAVAILABLE + grpc_status_code.GRPC_STATUS_DATA_LOSS -> StatusCode.DATA_LOSS + grpc_status_code.GRPC_STATUS_UNAUTHENTICATED -> StatusCode.UNAUTHENTICATED + grpc_status_code.GRPC_STATUS__DO_NOT_USE -> error("Invalid status code: ${this.ordinal}") +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/CoreTest.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/CoreTest.kt new file mode 100644 index 000000000..4db3e63aa --- /dev/null +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/CoreTest.kt @@ -0,0 +1,318 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ +@file:OptIn(ExperimentalForeignApi::class, ExperimentalStdlibApi::class, ExperimentalNativeApi::class) + +package kotlinx.rpc.grpc.internal + +import HelloReply +import HelloReplyInternal +import HelloRequest +import HelloRequestInternal +import grpc.examples.echo.EchoRequest +import grpc.examples.echo.EchoRequestInternal +import grpc.examples.echo.EchoResponseInternal +import grpc.examples.echo.invoke +import invoke +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import kotlinx.rpc.grpc.* +import kotlin.experimental.ExperimentalNativeApi +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class GrpcCoreTest { + + private fun descriptorFor(fullName: String = "helloworld.Greeter/SayHello"): MethodDescriptor = + methodDescriptor( + fullMethodName = fullName, + requestCodec = HelloRequestInternal.CODEC, + responseCodec = HelloReplyInternal.CODEC, + type = MethodType.UNARY, + schemaDescriptor = Unit, + idempotent = true, + safe = true, + sampledToLocalTracing = true, + ) + + private fun ManagedChannel.newHelloCall(fullName: String = "helloworld.Greeter/SayHello"): ClientCall = + platformApi.newCall(descriptorFor(fullName), GrpcCallOptions()) + + private fun createChannel(): ManagedChannel = ManagedChannelBuilder("localhost:50051") + .usePlaintext() + .buildChannel() + + + private fun helloReq(timeout: UInt = 0u): HelloRequest = HelloRequest { + name = "world" + this.timeout = timeout + } + + private fun shutdownAndWait(channel: ManagedChannel) { + channel.shutdown() + runBlocking { channel.awaitTermination() } + } + + @Test + fun normalUnaryCall_ok() = repeat(1000) { + val channel = createChannel() + val call = channel.newHelloCall() + val req = helloReq() + + val statusDeferred = CompletableDeferred() + val replyDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onMessage(message: HelloReply) { + replyDeferred.complete(message) + } + + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + + call.start(listener, GrpcTrailers()) + call.sendMessage(req) + call.halfClose() + call.request(1) + + runBlocking { + withTimeout(10000) { + val status = statusDeferred.await() + val reply = replyDeferred.await() + assertEquals(StatusCode.OK, status.statusCode) + assertEquals("Hello world", reply.message) + } + } + shutdownAndWait(channel) + } + + @Test + fun sendMessage_beforeStart_throws() { + val channel = createChannel() + val call = channel.newHelloCall() + val req = helloReq() + assertFailsWith { call.sendMessage(req) } + shutdownAndWait(channel) + } + + @Test + fun request_beforeStart_throws() { + val channel = createChannel() + val call = channel.newHelloCall() + assertFailsWith { call.request(1) } + shutdownAndWait(channel) + } + + @Test + fun start_twice_throws() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + call.start(listener, GrpcTrailers()) + assertFailsWith { call.start(listener, GrpcTrailers()) } + // cancel to finish the call quickly + call.cancel("Double start test", null) + runBlocking { withTimeout(5000) { statusDeferred.await() } } + shutdownAndWait(channel) + } + + @Test + fun send_afterHalfClose_throws() { + val channel = createChannel() + val call = channel.newHelloCall() + val req = helloReq() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + call.start(listener, GrpcTrailers()) + call.halfClose() + assertFailsWith { call.sendMessage(req) } + // Ensure call completes + call.cancel("cleanup", null) + runBlocking { withTimeout(5000) { statusDeferred.await() } } + shutdownAndWait(channel) + } + + @Test + fun request_zero_throws() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + call.start(listener, GrpcTrailers()) + assertFailsWith { call.request(0) } + call.cancel("cleanup", null) + runBlocking { withTimeout(5000) { statusDeferred.await() } } + shutdownAndWait(channel) + } + + @Test + fun cancel_afterStart_resultsInCancelledStatus() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + call.start(listener, GrpcTrailers()) + call.cancel("user cancel", null) + runBlocking { + withTimeout(10000) { + val status = statusDeferred.await() + assertEquals(StatusCode.CANCELLED, status.statusCode) + } + } + shutdownAndWait(channel) + } + + @Test + fun invalid_method_returnsNonOkStatus() { + val channel = createChannel() + val call = channel.newHelloCall("/helloworld.Greeter/NoSuchMethod") + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + + call.start(listener, GrpcTrailers()) + call.sendMessage(helloReq()) + call.halfClose() + call.request(1) + runBlocking { + withTimeout(10000) { + val status = statusDeferred.await() + assertTrue(status.statusCode != StatusCode.OK) + } + } + shutdownAndWait(channel) + } + + + @Test + fun halfCloseBeforeSendingMessage_errorWithoutCrashing() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + assertFailsWith { + try { + call.start(listener, GrpcTrailers()) + call.halfClose() + call.sendMessage(helloReq()) + } finally { + shutdownAndWait(channel) + } + } + } + + @Test + fun invokeStartAfterShutdown() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + + channel.shutdown() + call.start(listener, GrpcTrailers()) + call.sendMessage(helloReq()) + call.halfClose() + call.request(1) + + runBlocking { + withTimeout(10000) { + val status = statusDeferred.await() + assertEquals(StatusCode.UNAVAILABLE, status.statusCode) + } + } + } + + @Test + fun shutdownNowInMiddleOfCall() { + val channel = createChannel() + val call = channel.newHelloCall() + val statusDeferred = CompletableDeferred() + val listener = object : ClientCall.Listener() { + override fun onClose(status: Status, trailers: GrpcTrailers) { + statusDeferred.complete(status) + } + } + + call.start(listener, GrpcTrailers()) + // set timeout on the server to 1000 ms, to simulate a long-running call + call.sendMessage(helloReq(1000u)) + call.halfClose() + call.request(1) + + runBlocking { + delay(100) + channel.shutdownNow() + withTimeout(10000) { + val status = statusDeferred.await() + assertEquals(StatusCode.CANCELLED, status.statusCode) + } + } + } + + @Test + fun unaryCallTest() = runBlocking { + val ch = createChannel() + val desc = descriptorFor() + val req = helloReq() + repeat(1000) { + val res = unaryRpc(ch.platformApi, desc, req) + assertEquals("Hello world", res.message) + } + } + + + private fun echoDescriptor(methodName: String, type: MethodType) = + methodDescriptor( + fullMethodName = "grpc.examples.echo.Echo/$methodName", + requestCodec = EchoRequestInternal.CODEC, + responseCodec = EchoResponseInternal.CODEC, + type = type, + schemaDescriptor = Unit, + idempotent = true, + safe = true, + sampledToLocalTracing = true, + ) + + @Test + fun unaryEchoTest() = runBlocking { + val ch = createChannel() + val desc = echoDescriptor("UnaryEcho", MethodType.UNARY) + val req = EchoRequest { message = "Echoooo" } + unaryRpc(ch.platformApi, desc, req) + return@runBlocking + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/BridgeTest.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/BridgeTest.kt deleted file mode 100644 index e0ef885b2..000000000 --- a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/BridgeTest.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.rpc.grpc.test - -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.coroutines.runBlocking -import kotlinx.rpc.grpc.internal.bridge.GrpcByteBuffer -import kotlinx.rpc.grpc.internal.bridge.GrpcClient -import kotlinx.rpc.grpc.internal.bridge.GrpcSlice -import libgrpcpp_c.pb_decode_greeter_sayhello_response -import kotlin.native.runtime.GC -import kotlin.native.runtime.NativeRuntimeApi -import kotlin.test.Test -import kotlin.test.fail - -@OptIn(ExperimentalForeignApi::class) -class BridgeTest { - - @OptIn(NativeRuntimeApi::class) - @Test - fun testBasicUnaryAsyncCall() = runBlocking { - try { - val client = GrpcClient("localhost:50051") - val request = GrpcSlice(byteArrayOf(8, 4)) - val reqBuf = GrpcByteBuffer(request) - val result = client.callUnary("/Greeter/SayHello", reqBuf) - val response = result.intoSlice() - val value = pb_decode_greeter_sayhello_response(response.cSlice) - println("Response received: $value") - } catch (e: Exception) { - // trigger GC collection, otherwise there will be a leak - GC.collect() - fail("Got an exception: ${e.message}", e) - } - } - -} diff --git a/protobuf/protobuf-core/build.gradle.kts b/protobuf/protobuf-core/build.gradle.kts index 52157cbc7..2c71d5306 100644 --- a/protobuf/protobuf-core/build.gradle.kts +++ b/protobuf/protobuf-core/build.gradle.kts @@ -4,10 +4,10 @@ @file:OptIn(InternalRpcApi::class) -import org.jetbrains.kotlin.gradle.dsl.KotlinVersion import kotlinx.rpc.buf.tasks.BufGenerateTask import kotlinx.rpc.internal.InternalRpcApi import kotlinx.rpc.internal.configureLocalProtocGenDevelopmentDependency +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion import util.configureCLibCInterop plugins { @@ -55,7 +55,7 @@ kotlin { configureCLibCInterop(project, ":protowire_static") { cinteropCLib -> @Suppress("unused") - val libprotowire by creating { + val libprotowire by creating { includeDirs( cinteropCLib.resolve("include") ) diff --git a/protoc-gen/common/src/main/kotlin/kotlinx/rpc/protoc/gen/core/codeRequestToModel.kt b/protoc-gen/common/src/main/kotlin/kotlinx/rpc/protoc/gen/core/codeRequestToModel.kt index adbcaca1c..5aea4bcaa 100644 --- a/protoc-gen/common/src/main/kotlin/kotlinx/rpc/protoc/gen/core/codeRequestToModel.kt +++ b/protoc-gen/common/src/main/kotlin/kotlinx/rpc/protoc/gen/core/codeRequestToModel.kt @@ -7,16 +7,7 @@ package kotlinx.rpc.protoc.gen.core import com.google.protobuf.DescriptorProtos import com.google.protobuf.Descriptors import com.google.protobuf.compiler.PluginProtos.CodeGeneratorRequest -import kotlinx.rpc.protoc.gen.core.model.EnumDeclaration -import kotlinx.rpc.protoc.gen.core.model.FieldDeclaration -import kotlinx.rpc.protoc.gen.core.model.FieldType -import kotlinx.rpc.protoc.gen.core.model.FileDeclaration -import kotlinx.rpc.protoc.gen.core.model.FqName -import kotlinx.rpc.protoc.gen.core.model.MessageDeclaration -import kotlinx.rpc.protoc.gen.core.model.MethodDeclaration -import kotlinx.rpc.protoc.gen.core.model.Model -import kotlinx.rpc.protoc.gen.core.model.OneOfDeclaration -import kotlinx.rpc.protoc.gen.core.model.ServiceDeclaration +import kotlinx.rpc.protoc.gen.core.model.* private val nameCache = mutableMapOf() private val modelCache = mutableMapOf() @@ -51,7 +42,7 @@ fun CodeGeneratorRequest.toModel(): Model { */ private fun DescriptorProtos.FileDescriptorProto.toDescriptor( protoFileMap: Map, - cache: MutableMap + cache: MutableMap, ): Descriptors.FileDescriptor { if (cache.containsKey(name)) return cache[name]!! diff --git a/protoc-gen/grpc/src/main/kotlin/kotlinx/rpc/protoc/gen/grpc/ModelToGrpcKotlinCommonGenerator.kt b/protoc-gen/grpc/src/main/kotlin/kotlinx/rpc/protoc/gen/grpc/ModelToGrpcKotlinCommonGenerator.kt index e7e51a13e..d2a62b904 100644 --- a/protoc-gen/grpc/src/main/kotlin/kotlinx/rpc/protoc/gen/grpc/ModelToGrpcKotlinCommonGenerator.kt +++ b/protoc-gen/grpc/src/main/kotlin/kotlinx/rpc/protoc/gen/grpc/ModelToGrpcKotlinCommonGenerator.kt @@ -20,14 +20,11 @@ class ModelToGrpcKotlinCommonGenerator( override val FileDeclaration.hasInternalGeneratedContent: Boolean get() = false override fun CodeGenerator.generatePublicDeclaredEntities(fileDeclaration: FileDeclaration) { + additionalPublicImports.add("kotlinx.coroutines.flow.Flow") fileDeclaration.serviceDeclarations.forEach { generatePublicService(it) } } - override fun CodeGenerator.generateInternalDeclaredEntities(fileDeclaration: FileDeclaration) { } - - init { - additionalPublicImports.add("kotlinx.coroutines.flow.Flow") - } + override fun CodeGenerator.generateInternalDeclaredEntities(fileDeclaration: FileDeclaration) {} @Suppress("detekt.LongMethod") private fun CodeGenerator.generatePublicService(service: ServiceDeclaration) {