diff --git a/docs/pages/kotlinx-rpc/topics/platforms.topic b/docs/pages/kotlinx-rpc/topics/platforms.topic
index 1511fbaaa..f846b1dfe 100644
--- a/docs/pages/kotlinx-rpc/topics/platforms.topic
+++ b/docs/pages/kotlinx-rpc/topics/platforms.topic
@@ -1,4 +1,8 @@
+
+
@@ -68,6 +72,14 @@
appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+protobuf-plugin |
+Jvm Only |
+- |
+- |
+- |
+
+
utils |
jvm |
@@ -76,6 +88,14 @@
appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+grpc-core |
+jvm |
+browsernode |
+wasmJsbrowserd8node |
+appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+
krpc-client |
jvm |
diff --git a/grpc/grpc-core/build.gradle.kts b/grpc/grpc-core/build.gradle.kts
index 68496ae23..fa6dd6983 100644
--- a/grpc/grpc-core/build.gradle.kts
+++ b/grpc/grpc-core/build.gradle.kts
@@ -14,17 +14,20 @@ kotlin {
api(projects.core)
api(projects.utils)
api(libs.coroutines.core)
+
+ implementation(libs.atomicfu)
}
}
jvmMain {
dependencies {
+ api(libs.grpc.api)
api(libs.grpc.util)
api(libs.grpc.stub)
api(libs.grpc.protobuf)
- api(libs.grpc.kotlin.stub)
+ implementation(libs.grpc.kotlin.stub) // causes problems to jpms if api
api(libs.protobuf.java.util)
- api(libs.protobuf.kotlin)
+ implementation(libs.protobuf.kotlin)
}
}
}
diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt
index 8d10243a2..7ba5c588e 100644
--- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt
+++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcClient.kt
@@ -10,6 +10,7 @@ import kotlinx.rpc.RpcClient
import kotlinx.rpc.grpc.descriptor.GrpcClientDelegate
import kotlinx.rpc.grpc.descriptor.GrpcServiceDescriptor
import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap
+import kotlin.time.Duration
/**
* GrpcClient manages gRPC communication by providing implementation for making asynchronous RPC calls.
@@ -19,6 +20,20 @@ import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap
public class GrpcClient internal constructor(private val channel: ManagedChannel) : RpcClient {
private val stubs = RpcInternalConcurrentHashMap()
+ public fun shutdown() {
+ stubs.clear()
+ channel.shutdown()
+ }
+
+ public fun shutdownNow() {
+ stubs.clear()
+ channel.shutdownNow()
+ }
+
+ public suspend fun awaitTermination(duration: Duration) {
+ channel.awaitTermination(duration)
+ }
+
override suspend fun call(call: RpcCall): T {
return call.delegate().call(call)
}
@@ -39,11 +54,11 @@ public class GrpcClient internal constructor(private val channel: ManagedChannel
* Constructor function for the [GrpcClient] class.
*/
public fun GrpcClient(
- name: String,
+ hostname: String,
port: Int,
configure: ManagedChannelBuilder<*>.() -> Unit = {},
): GrpcClient {
- val channel = ManagedChannelBuilder(name, port).apply(configure).buildChannel()
+ val channel = ManagedChannelBuilder(hostname, port).apply(configure).buildChannel()
return GrpcClient(channel)
}
diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt
index 4600ea17c..f69865b9b 100644
--- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt
+++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcServer.kt
@@ -4,6 +4,7 @@
package kotlinx.rpc.grpc
+import kotlinx.atomicfu.atomic
import kotlinx.rpc.RpcServer
import kotlinx.rpc.descriptor.serviceDescriptorOf
import kotlinx.rpc.grpc.annotations.Grpc
@@ -65,9 +66,13 @@ public class GrpcServer internal constructor(
return grpc.delegate.definitionFor(service)
}
+ private val buildLock = atomic(false)
+
internal fun build() {
- internalServer = Server(serverBuilder)
- isBuilt = true
+ if (buildLock.compareAndSet(expect = false, update = true)) {
+ internalServer = Server(serverBuilder)
+ isBuilt = true
+ }
}
override val isShutdown: Boolean
diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt
index 5368b683b..c7ea10b53 100644
--- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt
+++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt
@@ -68,7 +68,7 @@ public interface ManagedChannel {
*/
public expect abstract class ManagedChannelBuilder>
-internal expect fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*>
+internal expect fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*>
internal expect fun ManagedChannelBuilder(target: String): ManagedChannelBuilder<*>
internal expect fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel
diff --git a/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt b/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt
index b7330da46..2218a5078 100644
--- a/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt
+++ b/grpc/grpc-core/src/jsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.js.kt
@@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel {
error("JS target is not supported in gRPC")
}
-internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> {
+internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> {
error("JS target is not supported in gRPC")
}
diff --git a/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt b/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt
index 602f57c82..559007568 100644
--- a/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt
+++ b/grpc/grpc-core/src/jvmMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.jvm.kt
@@ -24,8 +24,8 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel {
return build().toKotlin()
}
-internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> {
- return io.grpc.ManagedChannelBuilder.forAddress(name, port)
+internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> {
+ return io.grpc.ManagedChannelBuilder.forAddress(hostname, port)
}
internal actual fun ManagedChannelBuilder(target: String): ManagedChannelBuilder<*> {
diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt
index 69d4d7c93..41c05518d 100644
--- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt
+++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt
@@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel {
error("Native target is not supported in gRPC")
}
-internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> {
+internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> {
error("Native target is not supported in gRPC")
}
diff --git a/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt b/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt
index 0fbd98458..8e9101a03 100644
--- a/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt
+++ b/grpc/grpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.wasmJs.kt
@@ -20,7 +20,7 @@ internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel {
error("WasmJS target is not supported in gRPC")
}
-internal actual fun ManagedChannelBuilder(name: String, port: Int): ManagedChannelBuilder<*> {
+internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> {
error("WasmJS target is not supported in gRPC")
}
diff --git a/grpc/grpc-ktor-server-test/build.gradle.kts b/grpc/grpc-ktor-server-test/build.gradle.kts
new file mode 100644
index 000000000..c2d5de339
--- /dev/null
+++ b/grpc/grpc-ktor-server-test/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+plugins {
+ alias(libs.plugins.conventions.jvm)
+ alias(libs.plugins.kotlinx.rpc)
+ alias(libs.plugins.protobuf)
+}
+
+dependencies {
+ // for the jar dependency
+ testImplementation(kotlin("test"))
+ testImplementation(projects.grpc.grpcCore)
+ testImplementation(projects.grpc.grpcKtorServer)
+
+ testImplementation(libs.grpc.kotlin.stub)
+ testImplementation(libs.grpc.netty)
+
+ testImplementation(libs.ktor.server.core)
+ testImplementation(libs.ktor.server.test.host)
+ testRuntimeOnly(libs.logback.classic)
+}
+
+rpc {
+ grpc {
+ enabled = true
+
+ val globalRootDir: String by extra
+
+ plugin {
+ locator {
+ path = "$globalRootDir/protobuf-plugin/build/libs/protobuf-plugin-$version-all.jar"
+ }
+ }
+
+ tasksMatching { it.isTest }.all {
+ dependsOn(project(":protobuf-plugin").tasks.jar)
+ }
+ }
+}
diff --git a/grpc/grpc-ktor-server-test/gradle.properties b/grpc/grpc-ktor-server-test/gradle.properties
new file mode 100644
index 000000000..b68c20f8d
--- /dev/null
+++ b/grpc/grpc-ktor-server-test/gradle.properties
@@ -0,0 +1,5 @@
+#
+# Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+#
+
+kotlinx.rpc.exclude.wasmWasi=true
diff --git a/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt b/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt
new file mode 100644
index 000000000..f299fd649
--- /dev/null
+++ b/grpc/grpc-ktor-server-test/src/test/kotlin/kotlinx/rpc/grpc/ktor/server/test/TestServer.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.rpc.grpc.ktor.server.test
+
+import io.ktor.server.testing.testApplication
+import kotlinx.rpc.grpc.GrpcClient
+import kotlin.test.Test
+import kotlinx.rpc.grpc.ktor.server.grpc
+import kotlinx.rpc.registerService
+import kotlinx.rpc.withService
+import kotlin.test.assertEquals
+import kotlin.time.Duration.Companion.minutes
+
+class KtorTestServiceImpl : KtorTestService {
+ override suspend fun sayHello(message: Hello): Hello {
+ return message
+ }
+}
+
+const val PORT = 8085
+
+class TestServer {
+ @Test
+ fun testPlainRequests() = testApplication {
+ application {
+ grpc(PORT) {
+ registerService { KtorTestServiceImpl() }
+ }
+ }
+
+ startApplication()
+
+ val client = GrpcClient("localhost", PORT) {
+ usePlaintext()
+ }
+
+ val response = client.withService().sayHello(Hello { message = "Hello" })
+ assertEquals("Hello", response.message, "Wrong response message")
+
+ client.shutdown()
+ client.awaitTermination(1.minutes)
+ }
+}
diff --git a/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto b/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto
new file mode 100644
index 000000000..2dfd01e0b
--- /dev/null
+++ b/grpc/grpc-ktor-server-test/src/test/proto/ktor-test-service.proto
@@ -0,0 +1,11 @@
+syntax = "proto3";
+
+package kotlinx.rpc.grpc.ktor.server.test;
+
+message Hello {
+ string message = 1;
+}
+
+service KtorTestService {
+ rpc sayHello(Hello) returns (Hello);
+}
diff --git a/grpc/grpc-ktor-server-test/src/test/resources/logback.xml b/grpc/grpc-ktor-server-test/src/test/resources/logback.xml
new file mode 100644
index 000000000..85ffa5cfa
--- /dev/null
+++ b/grpc/grpc-ktor-server-test/src/test/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
diff --git a/grpc/grpc-ktor-server/build.gradle.kts b/grpc/grpc-ktor-server/build.gradle.kts
new file mode 100644
index 000000000..e71d8343e
--- /dev/null
+++ b/grpc/grpc-ktor-server/build.gradle.kts
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+plugins {
+ alias(libs.plugins.conventions.kmp)
+ alias(libs.plugins.kotlinx.rpc)
+}
+
+kotlin {
+ sourceSets {
+ commonMain {
+ dependencies {
+ api(projects.grpc.grpcCore)
+ implementation(libs.ktor.server.core)
+ }
+ }
+ }
+}
diff --git a/grpc/grpc-ktor-server/gradle.properties b/grpc/grpc-ktor-server/gradle.properties
new file mode 100644
index 000000000..b68c20f8d
--- /dev/null
+++ b/grpc/grpc-ktor-server/gradle.properties
@@ -0,0 +1,5 @@
+#
+# Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+#
+
+kotlinx.rpc.exclude.wasmWasi=true
diff --git a/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt b/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt
new file mode 100644
index 000000000..12a9be845
--- /dev/null
+++ b/grpc/grpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/grpc/ktor/server/Server.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.rpc.grpc.ktor.server
+
+import io.ktor.server.application.Application
+import io.ktor.server.application.ApplicationStopped
+import io.ktor.server.application.ApplicationStopping
+import io.ktor.server.application.log
+import io.ktor.server.config.getAs
+import io.ktor.util.AttributeKey
+import kotlinx.rpc.RpcServer
+import kotlinx.rpc.grpc.GrpcServer
+import kotlinx.rpc.grpc.ServerBuilder
+
+@Suppress("ConstPropertyName")
+public object GrpcConfigKeys {
+ public const val grpcHostPortPath: String = "ktor.deployment.grpcPort"
+}
+
+/**
+ * Key used to store and retrieve the [GrpcServer] instance within the application's attributes.
+ */
+public val GrpcServerKey: AttributeKey = AttributeKey("GrpcServerPluginAttributesKey")
+
+/**
+ * Configures and starts a gRPC server within the Ktor application.
+ * This function integrates with the Ktor lifecycle and manages the lifecycle of the gRPC server
+ * by subscribing to [ApplicationStopping] and [ApplicationStopped] events.
+ * It ensures that a gRPC server is properly initialized, started, and shutdown when the application stops.
+ *
+ * @param port The port on which the gRPC server will listen for incoming connections.
+ * Defaults to the value specified in the `ktor.deployment.grpcPort` configuration, or 8001 if not configured.
+ * @param configure Allows additional configuration of the gRPC server using a platform-specific [ServerBuilder].
+ * @param builder A block used to define and register gRPC services for the gRPC server.
+ * @return The instance of the initialized and running [GrpcServer].
+ * @throws IllegalStateException if a gRPC server is already installed or the specified port conflicts with
+ * an existing HTTP/HTTPS server port.
+ */
+public fun Application.grpc(
+ port: Int = environment.config.propertyOrNull(GrpcConfigKeys.grpcHostPortPath)?.getAs() ?: 8001,
+ configure: ServerBuilder<*>.() -> Unit = {},
+ builder: RpcServer.() -> Unit,
+): GrpcServer {
+ if (attributes.contains(GrpcServerKey)) {
+ error("gRPC Server is already installed, second call to grpc() is not allowed")
+ }
+
+ var newServer = false
+ val server = attributes.computeIfAbsent(GrpcServerKey) {
+ newServer = true
+ GrpcServer(port, configure, builder)
+ }
+
+ if (!newServer) {
+ error("A race detected while installing gRPC Server, second call to grpc() is not allowed")
+ }
+
+ server.start()
+ log.debug("Started gRPC server on port $port")
+
+ val stoppingHandle = monitor.subscribe(ApplicationStopping) {
+ log.debug("Stopping gRPC server")
+ attributes.getOrNull(GrpcServerKey)?.shutdown()
+ }
+
+ monitor.subscribe(ApplicationStopped) {
+ log.debug("gRPC server complete shutdown")
+ attributes.getOrNull(GrpcServerKey)?.shutdownNow()
+
+ stoppingHandle.dispose()
+ }
+
+ return server
+}
diff --git a/jpms-check/build.gradle.kts b/jpms-check/build.gradle.kts
index d02ea18ac..f6e97eb39 100644
--- a/jpms-check/build.gradle.kts
+++ b/jpms-check/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
import util.targets.hasJavaModule
@@ -11,9 +11,13 @@ plugins {
description = "Internal module for checking JPMS compliance"
-tasks.register("generateModuleInfo") {
+val excludedProjects = listOf(
+ "protobuf-plugin",
+)
+
+val generateModuleInfo = tasks.register("generateModuleInfo") {
val modules = project.rootProject.subprojects
- .filter { it.hasJavaModule }
+ .filter { it.applicableForCheck() }
.map { it.javaModuleName() }
val moduleInfoPath = project.projectDir.absolutePath + "/src/main/java/module-info.java"
@@ -33,7 +37,7 @@ tasks.register("generateModuleInfo") {
}
tasks.getByName("compileJava") {
- dependsOn("generateModuleInfo")
+ dependsOn(generateModuleInfo)
val projectFiles = project.files()
doFirst {
@@ -58,9 +62,13 @@ dependencies {
.joinToString(":", prefix = ":") { segment -> segment.name }
it.plugins.withId("maven-publish") {
- if (it.hasJavaModule) {
+ if (it.applicableForCheck()) {
api(project(dep))
}
}
}
}
+
+private fun Project.applicableForCheck(): Boolean {
+ return hasJavaModule && name !in excludedProjects
+}
diff --git a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
index eec9fad21..1e6d73ee2 100644
--- a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
+++ b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
@@ -9,6 +9,7 @@ package kotlinx.rpc.protobuf
import kotlinx.rpc.protobuf.CodeGenerator.DeclarationType
import kotlinx.rpc.protobuf.model.*
import org.slf4j.Logger
+import kotlin.getValue
private const val RPC_INTERNAL_PACKAGE_SUFFIX = "_rpc_internal"
@@ -50,6 +51,7 @@ class ModelToKotlinGenerator(
generatePublicDeclaredEntities(this@generatePublicKotlinFile)
import("kotlinx.rpc.internal.utils.*")
+ import("kotlinx.coroutines.flow.*")
additionalPublicImports.forEach {
import(it)
@@ -76,6 +78,7 @@ class ModelToKotlinGenerator(
generateInternalDeclaredEntities(this@generateInternalKotlinFile)
import("kotlinx.rpc.internal.utils.*")
+ import("kotlinx.coroutines.flow.*")
additionalInternalImports.forEach {
import(it)
@@ -510,19 +513,22 @@ class ModelToKotlinGenerator(
code("@kotlinx.rpc.grpc.annotations.Grpc")
clazz(service.name.simpleName, declarationType = DeclarationType.Interface) {
service.methods.forEach { method ->
- // no streaming for now
val inputType by method.inputType
val outputType by method.outputType
function(
name = method.name,
- modifiers = "suspend",
- args = "message: ${inputType.name.safeFullName()}",
- returnType = outputType.name.safeFullName(),
+ modifiers = if (method.serverStreaming) "" else "suspend",
+ args = "message: ${inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)}",
+ returnType = outputType.name.safeFullName().wrapInFlowIf(method.serverStreaming),
)
}
}
}
+ private fun String.wrapInFlowIf(condition: Boolean): String {
+ return if (condition) "Flow<$this>" else this
+ }
+
private fun CodeGenerator.generateInternalService(service: ServiceDeclaration) {
code("@Suppress(\"unused\", \"all\")")
clazz(
@@ -566,11 +572,23 @@ class ModelToKotlinGenerator(
function(
name = grpcName,
- modifiers = "override suspend",
- args = "request: ${inputType.toPlatformMessageType()}",
- returnType = outputType.toPlatformMessageType(),
+ modifiers = "override${if (method.serverStreaming) "" else " suspend"}",
+ args = "request: ${inputType.toPlatformMessageType().wrapInFlowIf(method.clientStreaming)}",
+ returnType = outputType.toPlatformMessageType().wrapInFlowIf(method.serverStreaming),
) {
- code("return impl.${method.name}(request.toKotlin()).toPlatform()")
+ val toKotlin = if (method.clientStreaming) {
+ "map { it.toKotlin() }"
+ } else {
+ "toKotlin()"
+ }
+
+ val toPlatform = if (method.serverStreaming) {
+ "map { it.toPlatform() }"
+ } else {
+ "toPlatform()"
+ }
+
+ code("return impl.${method.name}(request.${toKotlin}).${toPlatform}")
importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
@@ -605,22 +623,14 @@ class ModelToKotlinGenerator(
typeParameters = "R",
returnType = "R",
) {
- code("val message = rpcCall.parameters[0]")
- code("@Suppress(\"UNCHECKED_CAST\")")
- scope("return when (rpcCall.callableName)") {
- service.methods.forEach { method ->
- val inputType by method.inputType
- val outputType by method.outputType
- val grpcName = method.name.replaceFirstChar { it.lowercase() }
- val result = "stub.$grpcName((message as ${inputType.name.safeFullName()}).toPlatform())"
- code("\"${method.name}\" -> $result.toKotlin() as R")
-
- importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
- importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
- }
+ val methods = service.methods.filter { !it.serverStreaming }
- code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
+ if (methods.isEmpty()) {
+ code("error(\"Illegal call: \${rpcCall.callableName}\")")
+ return@function
}
+
+ generateCallsImpls(methods)
}
function(
@@ -628,13 +638,57 @@ class ModelToKotlinGenerator(
modifiers = "override",
args = "rpcCall: kotlinx.rpc.RpcCall",
typeParameters = "R",
- returnType = "kotlinx.coroutines.flow.Flow",
+ returnType = "Flow",
) {
- code("error(\"Flow calls are not supported\")")
+ val methods = service.methods.filter { it.serverStreaming }
+
+ if (methods.isEmpty()) {
+ code("error(\"Illegal streaming call: \${rpcCall.callableName}\")")
+ return@function
+ }
+
+ generateCallsImpls(methods)
}
}
}
+ private fun CodeGenerator.generateCallsImpls(
+ methods: List,
+ ) {
+ code("val message = rpcCall.parameters[0]")
+ code("@Suppress(\"UNCHECKED_CAST\")")
+ scope("return when (rpcCall.callableName)") {
+ methods.forEach { method ->
+ val inputType by method.inputType
+ val outputType by method.outputType
+ val grpcName = method.name.replaceFirstChar { it.lowercase() }
+
+ val toKotlin = if (method.serverStreaming) {
+ "map { it.toKotlin() }"
+ } else {
+ "toKotlin()"
+ }
+
+ val toPlatform = if (method.clientStreaming) {
+ "map { it.toPlatform() }"
+ } else {
+ "toPlatform()"
+ }
+
+ val argumentCast = inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)
+ val resultCast = "R".wrapInFlowIf(method.serverStreaming)
+
+ val result = "stub.$grpcName((message as $argumentCast).${toPlatform})"
+ code("\"${method.name}\" -> $result.${toKotlin} as $resultCast")
+
+ importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
+ importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
+ }
+
+ code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
+ }
+ }
+
private fun MessageDeclaration.toPlatformMessageType(): String {
return "${outerClassName.safeFullName()}.${name.fullNestedName()}"
}
diff --git a/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt
new file mode 100644
index 000000000..047f74caf
--- /dev/null
+++ b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.rpc.protobuf.test
+
+import StreamingTestService
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collectIndexed
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.last
+import kotlinx.coroutines.flow.toList
+import kotlinx.rpc.RpcServer
+import kotlinx.rpc.registerService
+import kotlinx.rpc.withService
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+class StreamingTestServiceImpl : StreamingTestService {
+ override fun Server(message: References): Flow {
+ return flow { emit(message); emit(message); emit(message) }
+ }
+
+ override suspend fun Client(message: Flow): References {
+ return message.last()
+ }
+
+ override fun Bidi(message: Flow): Flow {
+ return message
+ }
+}
+
+class StreamingTest : GrpcServerTest() {
+ override fun RpcServer.registerServices() {
+ registerService { StreamingTestServiceImpl() }
+ }
+
+ @Test
+ fun testServerStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ service.Server(References {
+ other = Other {
+ field= 42
+ }
+ }).toList().run {
+ assertEquals(3, size)
+
+ forEach {
+ assertEquals(42, it.other.field)
+ }
+ }
+ }
+
+ @Test
+ fun testClientStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ val result = service.Client(flow {
+ repeat(3) {
+ emit(References {
+ other = Other {
+ field = 42 + it
+ }
+ })
+ }
+ })
+
+ assertEquals(44, result.other.field)
+ }
+
+ @Test
+ fun testBidiStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ service.Bidi(flow {
+ repeat(3) {
+ emit(References {
+ other = Other {
+ field = 42 + it
+ }
+ })
+ }
+ }).collectIndexed { i, it ->
+ assertEquals(42 + i, it.other.field)
+ }
+ }
+}
diff --git a/protobuf-plugin/src/test/proto/reference.proto b/protobuf-plugin/src/test/proto/reference.proto
index 6f276344c..4a68c5189 100644
--- a/protobuf-plugin/src/test/proto/reference.proto
+++ b/protobuf-plugin/src/test/proto/reference.proto
@@ -1,7 +1,5 @@
syntax = "proto3";
-import "all_primitives.proto";
-
message Other {
string arg = 1;
}
diff --git a/protobuf-plugin/src/test/proto/streaming.proto b/protobuf-plugin/src/test/proto/streaming.proto
new file mode 100644
index 000000000..061f22458
--- /dev/null
+++ b/protobuf-plugin/src/test/proto/streaming.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+
+import "reference_package.proto";
+
+service StreamingTestService {
+ rpc Server(kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
+ rpc Client(stream kotlinx.rpc.protobuf.test.References) returns (kotlinx.rpc.protobuf.test.References);
+ rpc Bidi(stream kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 2f84c7fa7..255cff875 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -31,6 +31,9 @@ includePublic(":protobuf-plugin")
include(":grpc")
includePublic(":grpc:grpc-core")
+includePublic(":grpc:grpc-ktor-server")
+// temporary module until KMP project structure support in Protobuf plugin
+include(":grpc:grpc-ktor-server-test")
includePublic(":bom")
diff --git a/versions-root/libs.versions.toml b/versions-root/libs.versions.toml
index a319b6c3f..c3332abaf 100644
--- a/versions-root/libs.versions.toml
+++ b/versions-root/libs.versions.toml
@@ -102,6 +102,7 @@ protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "p
protobuf-java-util = { module = "com.google.protobuf:protobuf-java-util", version.ref = "protobuf" }
protobuf-kotlin = { module = "com.google.protobuf:protobuf-kotlin", version.ref = "protobuf" }
protobuf-gradle-plugin = { module = "com.google.protobuf:protobuf-gradle-plugin", version.ref = "protobuf-gradle" }
+grpc-api = { module = "io.grpc:grpc-api", version.ref = "grpc" }
grpc-stub = { module = "io.grpc:grpc-stub", version.ref = "grpc" }
grpc-util = { module = "io.grpc:grpc-util", version.ref = "grpc" }
grpc-netty = { module = "io.grpc:grpc-netty", version.ref = "grpc" }