From f3ff70dfd3dff6b09f2707e6c4636c6218eca612 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Thu, 10 Jul 2025 13:16:23 +0200 Subject: [PATCH 1/4] Support streaming: KRPC-152, KRPC-153 --- .../rpc/protobuf/ModelToKotlinGenerator.kt | 102 +++++++++++++----- .../rpc/protobuf/test/StreamingTest.kt | 85 +++++++++++++++ .../src/test/proto/reference.proto | 2 - .../src/test/proto/streaming.proto | 9 ++ 4 files changed, 172 insertions(+), 26 deletions(-) create mode 100644 protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt create mode 100644 protobuf-plugin/src/test/proto/streaming.proto diff --git a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt index eec9fad21..1e6d73ee2 100644 --- a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt +++ b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt @@ -9,6 +9,7 @@ package kotlinx.rpc.protobuf import kotlinx.rpc.protobuf.CodeGenerator.DeclarationType import kotlinx.rpc.protobuf.model.* import org.slf4j.Logger +import kotlin.getValue private const val RPC_INTERNAL_PACKAGE_SUFFIX = "_rpc_internal" @@ -50,6 +51,7 @@ class ModelToKotlinGenerator( generatePublicDeclaredEntities(this@generatePublicKotlinFile) import("kotlinx.rpc.internal.utils.*") + import("kotlinx.coroutines.flow.*") additionalPublicImports.forEach { import(it) @@ -76,6 +78,7 @@ class ModelToKotlinGenerator( generateInternalDeclaredEntities(this@generateInternalKotlinFile) import("kotlinx.rpc.internal.utils.*") + import("kotlinx.coroutines.flow.*") additionalInternalImports.forEach { import(it) @@ -510,19 +513,22 @@ class ModelToKotlinGenerator( code("@kotlinx.rpc.grpc.annotations.Grpc") clazz(service.name.simpleName, declarationType = DeclarationType.Interface) { service.methods.forEach { method -> - // no streaming for now val inputType by method.inputType val outputType by method.outputType function( name = method.name, - modifiers = "suspend", - args = "message: ${inputType.name.safeFullName()}", - returnType = outputType.name.safeFullName(), + modifiers = if (method.serverStreaming) "" else "suspend", + args = "message: ${inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)}", + returnType = outputType.name.safeFullName().wrapInFlowIf(method.serverStreaming), ) } } } + private fun String.wrapInFlowIf(condition: Boolean): String { + return if (condition) "Flow<$this>" else this + } + private fun CodeGenerator.generateInternalService(service: ServiceDeclaration) { code("@Suppress(\"unused\", \"all\")") clazz( @@ -566,11 +572,23 @@ class ModelToKotlinGenerator( function( name = grpcName, - modifiers = "override suspend", - args = "request: ${inputType.toPlatformMessageType()}", - returnType = outputType.toPlatformMessageType(), + modifiers = "override${if (method.serverStreaming) "" else " suspend"}", + args = "request: ${inputType.toPlatformMessageType().wrapInFlowIf(method.clientStreaming)}", + returnType = outputType.toPlatformMessageType().wrapInFlowIf(method.serverStreaming), ) { - code("return impl.${method.name}(request.toKotlin()).toPlatform()") + val toKotlin = if (method.clientStreaming) { + "map { it.toKotlin() }" + } else { + "toKotlin()" + } + + val toPlatform = if (method.serverStreaming) { + "map { it.toPlatform() }" + } else { + "toPlatform()" + } + + code("return impl.${method.name}(request.${toKotlin}).${toPlatform}") importRootDeclarationIfNeeded(inputType.name, "toPlatform", true) importRootDeclarationIfNeeded(outputType.name, "toKotlin", true) @@ -605,22 +623,14 @@ class ModelToKotlinGenerator( typeParameters = "R", returnType = "R", ) { - code("val message = rpcCall.parameters[0]") - code("@Suppress(\"UNCHECKED_CAST\")") - scope("return when (rpcCall.callableName)") { - service.methods.forEach { method -> - val inputType by method.inputType - val outputType by method.outputType - val grpcName = method.name.replaceFirstChar { it.lowercase() } - val result = "stub.$grpcName((message as ${inputType.name.safeFullName()}).toPlatform())" - code("\"${method.name}\" -> $result.toKotlin() as R") - - importRootDeclarationIfNeeded(inputType.name, "toPlatform", true) - importRootDeclarationIfNeeded(outputType.name, "toKotlin", true) - } + val methods = service.methods.filter { !it.serverStreaming } - code("else -> error(\"Illegal call: \${rpcCall.callableName}\")") + if (methods.isEmpty()) { + code("error(\"Illegal call: \${rpcCall.callableName}\")") + return@function } + + generateCallsImpls(methods) } function( @@ -628,13 +638,57 @@ class ModelToKotlinGenerator( modifiers = "override", args = "rpcCall: kotlinx.rpc.RpcCall", typeParameters = "R", - returnType = "kotlinx.coroutines.flow.Flow", + returnType = "Flow", ) { - code("error(\"Flow calls are not supported\")") + val methods = service.methods.filter { it.serverStreaming } + + if (methods.isEmpty()) { + code("error(\"Illegal streaming call: \${rpcCall.callableName}\")") + return@function + } + + generateCallsImpls(methods) } } } + private fun CodeGenerator.generateCallsImpls( + methods: List, + ) { + code("val message = rpcCall.parameters[0]") + code("@Suppress(\"UNCHECKED_CAST\")") + scope("return when (rpcCall.callableName)") { + methods.forEach { method -> + val inputType by method.inputType + val outputType by method.outputType + val grpcName = method.name.replaceFirstChar { it.lowercase() } + + val toKotlin = if (method.serverStreaming) { + "map { it.toKotlin() }" + } else { + "toKotlin()" + } + + val toPlatform = if (method.clientStreaming) { + "map { it.toPlatform() }" + } else { + "toPlatform()" + } + + val argumentCast = inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming) + val resultCast = "R".wrapInFlowIf(method.serverStreaming) + + val result = "stub.$grpcName((message as $argumentCast).${toPlatform})" + code("\"${method.name}\" -> $result.${toKotlin} as $resultCast") + + importRootDeclarationIfNeeded(inputType.name, "toPlatform", true) + importRootDeclarationIfNeeded(outputType.name, "toKotlin", true) + } + + code("else -> error(\"Illegal call: \${rpcCall.callableName}\")") + } + } + private fun MessageDeclaration.toPlatformMessageType(): String { return "${outerClassName.safeFullName()}.${name.fullNestedName()}" } diff --git a/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt new file mode 100644 index 000000000..047f74caf --- /dev/null +++ b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt @@ -0,0 +1,85 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.protobuf.test + +import StreamingTestService +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collectIndexed +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.last +import kotlinx.coroutines.flow.toList +import kotlinx.rpc.RpcServer +import kotlinx.rpc.registerService +import kotlinx.rpc.withService +import kotlin.test.Test +import kotlin.test.assertEquals + +class StreamingTestServiceImpl : StreamingTestService { + override fun Server(message: References): Flow { + return flow { emit(message); emit(message); emit(message) } + } + + override suspend fun Client(message: Flow): References { + return message.last() + } + + override fun Bidi(message: Flow): Flow { + return message + } +} + +class StreamingTest : GrpcServerTest() { + override fun RpcServer.registerServices() { + registerService { StreamingTestServiceImpl() } + } + + @Test + fun testServerStreaming() = runGrpcTest { grpcClient -> + val service = grpcClient.withService() + service.Server(References { + other = Other { + field= 42 + } + }).toList().run { + assertEquals(3, size) + + forEach { + assertEquals(42, it.other.field) + } + } + } + + @Test + fun testClientStreaming() = runGrpcTest { grpcClient -> + val service = grpcClient.withService() + val result = service.Client(flow { + repeat(3) { + emit(References { + other = Other { + field = 42 + it + } + }) + } + }) + + assertEquals(44, result.other.field) + } + + @Test + fun testBidiStreaming() = runGrpcTest { grpcClient -> + val service = grpcClient.withService() + service.Bidi(flow { + repeat(3) { + emit(References { + other = Other { + field = 42 + it + } + }) + } + }).collectIndexed { i, it -> + assertEquals(42 + i, it.other.field) + } + } +} diff --git a/protobuf-plugin/src/test/proto/reference.proto b/protobuf-plugin/src/test/proto/reference.proto index 6f276344c..4a68c5189 100644 --- a/protobuf-plugin/src/test/proto/reference.proto +++ b/protobuf-plugin/src/test/proto/reference.proto @@ -1,7 +1,5 @@ syntax = "proto3"; -import "all_primitives.proto"; - message Other { string arg = 1; } diff --git a/protobuf-plugin/src/test/proto/streaming.proto b/protobuf-plugin/src/test/proto/streaming.proto new file mode 100644 index 000000000..061f22458 --- /dev/null +++ b/protobuf-plugin/src/test/proto/streaming.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +import "reference_package.proto"; + +service StreamingTestService { + rpc Server(kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References); + rpc Client(stream kotlinx.rpc.protobuf.test.References) returns (kotlinx.rpc.protobuf.test.References); + rpc Bidi(stream kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References); +} From e593385b14605c5faf7c341389f2e06118b223ef Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Thu, 10 Jul 2025 13:22:48 +0200 Subject: [PATCH 2/4] Dump platforms --- docs/pages/kotlinx-rpc/topics/platforms.topic | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/pages/kotlinx-rpc/topics/platforms.topic b/docs/pages/kotlinx-rpc/topics/platforms.topic index 1511fbaaa..f846b1dfe 100644 --- a/docs/pages/kotlinx-rpc/topics/platforms.topic +++ b/docs/pages/kotlinx-rpc/topics/platforms.topic @@ -1,4 +1,8 @@ + + @@ -68,6 +72,14 @@
  • apple
  • ios
  • iosArm64
  • iosSimulatorArm64
  • iosX64
  • macos
  • macosArm64
  • macosX64
  • watchos
  • watchosArm32
  • watchosArm64
  • watchosDeviceArm64
  • watchosSimulatorArm64
  • watchosX64
  • tvos
  • tvosArm64
  • tvosSimulatorArm64
  • tvosX64
  • linux
  • linuxArm64
  • linuxX64
  • windows
  • mingwX64
  • + +protobuf-plugin +Jvm Only +- +- +- + + utils jvm @@ -76,6 +88,14 @@
  • apple
  • ios
  • iosArm64
  • iosSimulatorArm64
  • iosX64
  • macos
  • macosArm64
  • macosX64
  • watchos
  • watchosArm32
  • watchosArm64
  • watchosDeviceArm64
  • watchosSimulatorArm64
  • watchosX64
  • tvos
  • tvosArm64
  • tvosSimulatorArm64
  • tvosX64
  • linux
  • linuxArm64
  • linuxX64
  • windows
  • mingwX64
  • + +grpc-core +jvm +
  • browser
  • node
  • +
  • wasmJs
  • browser
  • d8
  • node
  • +
  • apple
  • ios
  • iosArm64
  • iosSimulatorArm64
  • iosX64
  • macos
  • macosArm64
  • macosX64
  • watchos
  • watchosArm32
  • watchosArm64
  • watchosDeviceArm64
  • watchosSimulatorArm64
  • watchosX64
  • tvos
  • tvosArm64
  • tvosSimulatorArm64
  • tvosX64
  • linux
  • linuxArm64
  • linuxX64
  • windows
  • mingwX64
  • + + krpc-client jvm From 3526c0a0c628715d04d660d11101d769c7a74dae Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Thu, 10 Jul 2025 13:35:53 +0200 Subject: [PATCH 3/4] Fix jpms for grpc KRPC-164 --- grpc/grpc-core/build.gradle.kts | 12 ++++++------ jpms-check/build.gradle.kts | 18 +++++++++++++----- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index 68496ae23..d8bb8c28d 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -19,12 +19,12 @@ kotlin { jvmMain { dependencies { - api(libs.grpc.util) - api(libs.grpc.stub) - api(libs.grpc.protobuf) - api(libs.grpc.kotlin.stub) - api(libs.protobuf.java.util) - api(libs.protobuf.kotlin) + implementation(libs.grpc.util) + implementation(libs.grpc.stub) + implementation(libs.grpc.protobuf) + implementation(libs.grpc.kotlin.stub) + implementation(libs.protobuf.java.util) + implementation(libs.protobuf.kotlin) } } } diff --git a/jpms-check/build.gradle.kts b/jpms-check/build.gradle.kts index d02ea18ac..f6e97eb39 100644 --- a/jpms-check/build.gradle.kts +++ b/jpms-check/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ import util.targets.hasJavaModule @@ -11,9 +11,13 @@ plugins { description = "Internal module for checking JPMS compliance" -tasks.register("generateModuleInfo") { +val excludedProjects = listOf( + "protobuf-plugin", +) + +val generateModuleInfo = tasks.register("generateModuleInfo") { val modules = project.rootProject.subprojects - .filter { it.hasJavaModule } + .filter { it.applicableForCheck() } .map { it.javaModuleName() } val moduleInfoPath = project.projectDir.absolutePath + "/src/main/java/module-info.java" @@ -33,7 +37,7 @@ tasks.register("generateModuleInfo") { } tasks.getByName("compileJava") { - dependsOn("generateModuleInfo") + dependsOn(generateModuleInfo) val projectFiles = project.files() doFirst { @@ -58,9 +62,13 @@ dependencies { .joinToString(":", prefix = ":") { segment -> segment.name } it.plugins.withId("maven-publish") { - if (it.hasJavaModule) { + if (it.applicableForCheck()) { api(project(dep)) } } } } + +private fun Project.applicableForCheck(): Boolean { + return hasJavaModule && name !in excludedProjects +} From e8859cfc008136386e5e052423a98713ae39d2c8 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Mon, 14 Jul 2025 12:31:28 +0200 Subject: [PATCH 4/4] KRPC-113 Support gRPC: Ktor Integration --- grpc/grpc-core/build.gradle.kts | 13 ++-- .../kotlin/kotlinx/rpc/grpc/GrpcClient.kt | 19 ++++- .../kotlin/kotlinx/rpc/grpc/GrpcServer.kt | 9 ++- .../kotlin/kotlinx/rpc/grpc/ManagedChannel.kt | 2 +- .../kotlinx/rpc/grpc/ManagedChannel.js.kt | 2 +- .../kotlinx/rpc/grpc/ManagedChannel.jvm.kt | 4 +- .../kotlinx/rpc/grpc/ManagedChannel.native.kt | 2 +- .../kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt | 2 +- grpc/grpc-ktor-server-test/build.gradle.kts | 41 ++++++++++ grpc/grpc-ktor-server-test/gradle.properties | 5 ++ .../rpc/grpc/ktor/server/test/TestServer.kt | 45 +++++++++++ .../src/test/proto/ktor-test-service.proto | 11 +++ .../src/test/resources/logback.xml | 16 ++++ grpc/grpc-ktor-server/build.gradle.kts | 19 +++++ grpc/grpc-ktor-server/gradle.properties | 5 ++ .../kotlinx/rpc/grpc/ktor/server/Server.kt | 76 +++++++++++++++++++ settings.gradle.kts | 3 + versions-root/libs.versions.toml | 1 + 18 files changed, 260 insertions(+), 15 deletions(-) create mode 100644 grpc/grpc-ktor-server-test/build.gradle.kts create mode 100644 grpc/grpc-ktor-server-test/gradle.properties create mode 100644 grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt create mode 100644 grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto create mode 100644 grpc/grpc-ktor-server-test/src/test/resources/logback.xml create mode 100644 grpc/grpc-ktor-server/build.gradle.kts create mode 100644 grpc/grpc-ktor-server/gradle.properties create mode 100644 grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts index d8bb8c28d..fa6dd6983 100644 --- a/grpc/grpc-core/build.gradle.kts +++ b/grpc/grpc-core/build.gradle.kts @@ -14,16 +14,19 @@ kotlin { api(projects.core) api(projects.utils) api(libs.coroutines.core) + + implementation(libs.atomicfu) } } jvmMain { dependencies { - implementation(libs.grpc.util) - implementation(libs.grpc.stub) - implementation(libs.grpc.protobuf) - implementation(libs.grpc.kotlin.stub) - implementation(libs.protobuf.java.util) + api(libs.grpc.api) + api(libs.grpc.util) + api(libs.grpc.stub) + api(libs.grpc.protobuf) + implementation(libs.grpc.kotlin.stub) // causes problems to jpms if api + api(libs.protobuf.java.util) implementation(libs.protobuf.kotlin) } } diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt index 8d10243a2..7ba5c588e 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt @@ -10,6 +10,7 @@ import kotlinx.rpc.RpcClient import kotlinx.rpc.grpc.descriptor.GrpcClientDelegate import kotlinx.rpc.grpc.descriptor.GrpcServiceDescriptor import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap +import kotlin.time.Duration /** * GrpcClient manages gRPC communication by providing implementation for making asynchronous RPC calls. @@ -19,6 +20,20 @@ import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap public class GrpcClient internal constructor(private val channel: ManagedChannel) : RpcClient { private val stubs = RpcInternalConcurrentHashMap() + public fun shutdown() { + stubs.clear() + channel.shutdown() + } + + public fun shutdownNow() { + stubs.clear() + channel.shutdownNow() + } + + public suspend fun awaitTermination(duration: Duration) { + channel.awaitTermination(duration) + } + override suspend fun call(call: RpcCall): T { return call.delegate().call(call) } @@ -39,11 +54,11 @@ public class GrpcClient internal constructor(private val channel: ManagedChannel * Constructor function for the [GrpcClient] class. */ public fun GrpcClient( - name: String, + hostname: String, port: Int, configure: ManagedChannelBuilder<*>.() -> Unit = {}, ): GrpcClient { - val channel = ManagedChannelBuilder(name, port).apply(configure).buildChannel() + val channel = ManagedChannelBuilder(hostname, port).apply(configure).buildChannel() return GrpcClient(channel) } diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt index 4600ea17c..f69865b9b 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt @@ -4,6 +4,7 @@ package kotlinx.rpc.grpc +import kotlinx.atomicfu.atomic import kotlinx.rpc.RpcServer import kotlinx.rpc.descriptor.serviceDescriptorOf import kotlinx.rpc.grpc.annotations.Grpc @@ -65,9 +66,13 @@ public class GrpcServer internal constructor( return grpc.delegate.definitionFor(service) } + private val buildLock = atomic(false) + internal fun build() { - internalServer = Server(serverBuilder) - isBuilt = true + if (buildLock.compareAndSet(expect = false, update = true)) { + internalServer = Server(serverBuilder) + isBuilt = true + } } override val isShutdown: Boolean diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt index 5368b683b..c7ea10b53 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt @@ -68,7 +68,7 @@ public interface ManagedChannel { */ public expect abstract class ManagedChannelBuilder> -internal expect fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> +internal expect fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> internal expect fun ManagedChannelBuilder(target: String): ManagedChannelBuilder<*> internal expect fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel diff --git a/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt b/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt index b7330da46..2218a5078 100644 --- a/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt +++ b/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt @@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel { error("JS target is not supported in gRPC") } -internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> { +internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> { error("JS target is not supported in gRPC") } diff --git a/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt b/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt index 602f57c82..559007568 100644 --- a/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt +++ b/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt @@ -24,8 +24,8 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel { return build().toKotlin() } -internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> { - return io.grpc.ManagedChannelBuilder.forAddress(name, port) +internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> { + return io.grpc.ManagedChannelBuilder.forAddress(hostname, port) } internal actual fun ManagedChannelBuilder(target: String): ManagedChannelBuilder<*> { diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt index 69d4d7c93..41c05518d 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt @@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel { error("Native target is not supported in gRPC") } -internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> { +internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> { error("Native target is not supported in gRPC") } diff --git a/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt b/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt index 0fbd98458..8e9101a03 100644 --- a/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt +++ b/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt @@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel { error("WasmJS target is not supported in gRPC") } -internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> { +internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> { error("WasmJS target is not supported in gRPC") } diff --git a/grpc/grpc-ktor-server-test/build.gradle.kts b/grpc/grpc-ktor-server-test/build.gradle.kts new file mode 100644 index 000000000..c2d5de339 --- /dev/null +++ b/grpc/grpc-ktor-server-test/build.gradle.kts @@ -0,0 +1,41 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +plugins { + alias(libs.plugins.conventions.jvm) + alias(libs.plugins.kotlinx.rpc) + alias(libs.plugins.protobuf) +} + +dependencies { + // for the jar dependency + testImplementation(kotlin("test")) + testImplementation(projects.grpc.grpcCore) + testImplementation(projects.grpc.grpcKtorServer) + + testImplementation(libs.grpc.kotlin.stub) + testImplementation(libs.grpc.netty) + + testImplementation(libs.ktor.server.core) + testImplementation(libs.ktor.server.test.host) + testRuntimeOnly(libs.logback.classic) +} + +rpc { + grpc { + enabled = true + + val globalRootDir: String by extra + + plugin { + locator { + path = "$globalRootDir/protobuf-plugin/build/libs/protobuf-plugin-$version-all.jar" + } + } + + tasksMatching { it.isTest }.all { + dependsOn(project(":protobuf-plugin").tasks.jar) + } + } +} diff --git a/grpc/grpc-ktor-server-test/gradle.properties b/grpc/grpc-ktor-server-test/gradle.properties new file mode 100644 index 000000000..b68c20f8d --- /dev/null +++ b/grpc/grpc-ktor-server-test/gradle.properties @@ -0,0 +1,5 @@ +# +# Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. +# + +kotlinx.rpc.exclude.wasmWasi=true diff --git a/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt b/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt new file mode 100644 index 000000000..f299fd649 --- /dev/null +++ b/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.ktor.server.test + +import io.ktor.server.testing.testApplication +import kotlinx.rpc.grpc.GrpcClient +import kotlin.test.Test +import kotlinx.rpc.grpc.ktor.server.grpc +import kotlinx.rpc.registerService +import kotlinx.rpc.withService +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.minutes + +class KtorTestServiceImpl : KtorTestService { + override suspend fun sayHello(message: Hello): Hello { + return message + } +} + +const val PORT = 8085 + +class TestServer { + @Test + fun testPlainRequests() = testApplication { + application { + grpc(PORT) { + registerService { KtorTestServiceImpl() } + } + } + + startApplication() + + val client = GrpcClient("localhost", PORT) { + usePlaintext() + } + + val response = client.withService().sayHello(Hello { message = "Hello" }) + assertEquals("Hello", response.message, "Wrong response message") + + client.shutdown() + client.awaitTermination(1.minutes) + } +} diff --git a/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto b/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto new file mode 100644 index 000000000..2dfd01e0b --- /dev/null +++ b/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package kotlinx.rpc.grpc.ktor.server.test; + +message Hello { + string message = 1; +} + +service KtorTestService { + rpc sayHello(Hello) returns (Hello); +} diff --git a/grpc/grpc-ktor-server-test/src/test/resources/logback.xml b/grpc/grpc-ktor-server-test/src/test/resources/logback.xml new file mode 100644 index 000000000..85ffa5cfa --- /dev/null +++ b/grpc/grpc-ktor-server-test/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + diff --git a/grpc/grpc-ktor-server/build.gradle.kts b/grpc/grpc-ktor-server/build.gradle.kts new file mode 100644 index 000000000..e71d8343e --- /dev/null +++ b/grpc/grpc-ktor-server/build.gradle.kts @@ -0,0 +1,19 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +plugins { + alias(libs.plugins.conventions.kmp) + alias(libs.plugins.kotlinx.rpc) +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api(projects.grpc.grpcCore) + implementation(libs.ktor.server.core) + } + } + } +} diff --git a/grpc/grpc-ktor-server/gradle.properties b/grpc/grpc-ktor-server/gradle.properties new file mode 100644 index 000000000..b68c20f8d --- /dev/null +++ b/grpc/grpc-ktor-server/gradle.properties @@ -0,0 +1,5 @@ +# +# Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. +# + +kotlinx.rpc.exclude.wasmWasi=true diff --git a/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt b/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt new file mode 100644 index 000000000..12a9be845 --- /dev/null +++ b/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt @@ -0,0 +1,76 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.ktor.server + +import io.ktor.server.application.Application +import io.ktor.server.application.ApplicationStopped +import io.ktor.server.application.ApplicationStopping +import io.ktor.server.application.log +import io.ktor.server.config.getAs +import io.ktor.util.AttributeKey +import kotlinx.rpc.RpcServer +import kotlinx.rpc.grpc.GrpcServer +import kotlinx.rpc.grpc.ServerBuilder + +@Suppress("ConstPropertyName") +public object GrpcConfigKeys { + public const val grpcHostPortPath: String = "ktor.deployment.grpcPort" +} + +/** + * Key used to store and retrieve the [GrpcServer] instance within the application's attributes. + */ +public val GrpcServerKey: AttributeKey = AttributeKey("GrpcServerPluginAttributesKey") + +/** + * Configures and starts a gRPC server within the Ktor application. + * This function integrates with the Ktor lifecycle and manages the lifecycle of the gRPC server + * by subscribing to [ApplicationStopping] and [ApplicationStopped] events. + * It ensures that a gRPC server is properly initialized, started, and shutdown when the application stops. + * + * @param port The port on which the gRPC server will listen for incoming connections. + * Defaults to the value specified in the `ktor.deployment.grpcPort` configuration, or 8001 if not configured. + * @param configure Allows additional configuration of the gRPC server using a platform-specific [ServerBuilder]. + * @param builder A block used to define and register gRPC services for the gRPC server. + * @return The instance of the initialized and running [GrpcServer]. + * @throws IllegalStateException if a gRPC server is already installed or the specified port conflicts with + * an existing HTTP/HTTPS server port. + */ +public fun Application.grpc( + port: Int = environment.config.propertyOrNull(GrpcConfigKeys.grpcHostPortPath)?.getAs() ?: 8001, + configure: ServerBuilder<*>.() -> Unit = {}, + builder: RpcServer.() -> Unit, +): GrpcServer { + if (attributes.contains(GrpcServerKey)) { + error("gRPC Server is already installed, second call to grpc() is not allowed") + } + + var newServer = false + val server = attributes.computeIfAbsent(GrpcServerKey) { + newServer = true + GrpcServer(port, configure, builder) + } + + if (!newServer) { + error("A race detected while installing gRPC Server, second call to grpc() is not allowed") + } + + server.start() + log.debug("Started gRPC server on port $port") + + val stoppingHandle = monitor.subscribe(ApplicationStopping) { + log.debug("Stopping gRPC server") + attributes.getOrNull(GrpcServerKey)?.shutdown() + } + + monitor.subscribe(ApplicationStopped) { + log.debug("gRPC server complete shutdown") + attributes.getOrNull(GrpcServerKey)?.shutdownNow() + + stoppingHandle.dispose() + } + + return server +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 2f84c7fa7..255cff875 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -31,6 +31,9 @@ includePublic(":protobuf-plugin") include(":grpc") includePublic(":grpc:grpc-core") +includePublic(":grpc:grpc-ktor-server") +// temporary module until KMP project structure support in Protobuf plugin +include(":grpc:grpc-ktor-server-test") includePublic(":bom") diff --git a/versions-root/libs.versions.toml b/versions-root/libs.versions.toml index a319b6c3f..c3332abaf 100644 --- a/versions-root/libs.versions.toml +++ b/versions-root/libs.versions.toml @@ -102,6 +102,7 @@ protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "p protobuf-java-util = { module = "com.google.protobuf:protobuf-java-util", version.ref = "protobuf" } protobuf-kotlin = { module = "com.google.protobuf:protobuf-kotlin", version.ref = "protobuf" } protobuf-gradle-plugin = { module = "com.google.protobuf:protobuf-gradle-plugin", version.ref = "protobuf-gradle" } +grpc-api = { module = "io.grpc:grpc-api", version.ref = "grpc" } grpc-stub = { module = "io.grpc:grpc-stub", version.ref = "grpc" } grpc-util = { module = "io.grpc:grpc-util", version.ref = "grpc" } grpc-netty = { module = "io.grpc:grpc-netty", version.ref = "grpc" }