diff --git a/docs/pages/kotlinx-rpc/topics/platforms.topic b/docs/pages/kotlinx-rpc/topics/platforms.topic
index 1511fbaaa..f846b1dfe 100644
--- a/docs/pages/kotlinx-rpc/topics/platforms.topic
+++ b/docs/pages/kotlinx-rpc/topics/platforms.topic
@@ -1,4 +1,8 @@
+
+
@@ -68,6 +72,14 @@
appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+protobuf-plugin |
+Jvm Only |
+- |
+- |
+- |
+
+
utils |
jvm |
@@ -76,6 +88,14 @@
appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+grpc-core |
+jvm |
+browsernode |
+wasmJsbrowserd8node |
+appleiosiosArm64iosSimulatorArm64iosX64 macosmacosArm64macosX64 watchoswatchosArm32watchosArm64watchosDeviceArm64watchosSimulatorArm64watchosX64 tvostvosArm64tvosSimulatorArm64tvosX64 linuxlinuxArm64linuxX64 windowsmingwX64 |
+
+
krpc-client |
jvm |
diff --git a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
index eec9fad21..1e6d73ee2 100644
--- a/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
+++ b/protobuf-plugin/src/main/kotlin/kotlinx/rpc/protobuf/ModelToKotlinGenerator.kt
@@ -9,6 +9,7 @@ package kotlinx.rpc.protobuf
import kotlinx.rpc.protobuf.CodeGenerator.DeclarationType
import kotlinx.rpc.protobuf.model.*
import org.slf4j.Logger
+import kotlin.getValue
private const val RPC_INTERNAL_PACKAGE_SUFFIX = "_rpc_internal"
@@ -50,6 +51,7 @@ class ModelToKotlinGenerator(
generatePublicDeclaredEntities(this@generatePublicKotlinFile)
import("kotlinx.rpc.internal.utils.*")
+ import("kotlinx.coroutines.flow.*")
additionalPublicImports.forEach {
import(it)
@@ -76,6 +78,7 @@ class ModelToKotlinGenerator(
generateInternalDeclaredEntities(this@generateInternalKotlinFile)
import("kotlinx.rpc.internal.utils.*")
+ import("kotlinx.coroutines.flow.*")
additionalInternalImports.forEach {
import(it)
@@ -510,19 +513,22 @@ class ModelToKotlinGenerator(
code("@kotlinx.rpc.grpc.annotations.Grpc")
clazz(service.name.simpleName, declarationType = DeclarationType.Interface) {
service.methods.forEach { method ->
- // no streaming for now
val inputType by method.inputType
val outputType by method.outputType
function(
name = method.name,
- modifiers = "suspend",
- args = "message: ${inputType.name.safeFullName()}",
- returnType = outputType.name.safeFullName(),
+ modifiers = if (method.serverStreaming) "" else "suspend",
+ args = "message: ${inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)}",
+ returnType = outputType.name.safeFullName().wrapInFlowIf(method.serverStreaming),
)
}
}
}
+ private fun String.wrapInFlowIf(condition: Boolean): String {
+ return if (condition) "Flow<$this>" else this
+ }
+
private fun CodeGenerator.generateInternalService(service: ServiceDeclaration) {
code("@Suppress(\"unused\", \"all\")")
clazz(
@@ -566,11 +572,23 @@ class ModelToKotlinGenerator(
function(
name = grpcName,
- modifiers = "override suspend",
- args = "request: ${inputType.toPlatformMessageType()}",
- returnType = outputType.toPlatformMessageType(),
+ modifiers = "override${if (method.serverStreaming) "" else " suspend"}",
+ args = "request: ${inputType.toPlatformMessageType().wrapInFlowIf(method.clientStreaming)}",
+ returnType = outputType.toPlatformMessageType().wrapInFlowIf(method.serverStreaming),
) {
- code("return impl.${method.name}(request.toKotlin()).toPlatform()")
+ val toKotlin = if (method.clientStreaming) {
+ "map { it.toKotlin() }"
+ } else {
+ "toKotlin()"
+ }
+
+ val toPlatform = if (method.serverStreaming) {
+ "map { it.toPlatform() }"
+ } else {
+ "toPlatform()"
+ }
+
+ code("return impl.${method.name}(request.${toKotlin}).${toPlatform}")
importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
@@ -605,22 +623,14 @@ class ModelToKotlinGenerator(
typeParameters = "R",
returnType = "R",
) {
- code("val message = rpcCall.parameters[0]")
- code("@Suppress(\"UNCHECKED_CAST\")")
- scope("return when (rpcCall.callableName)") {
- service.methods.forEach { method ->
- val inputType by method.inputType
- val outputType by method.outputType
- val grpcName = method.name.replaceFirstChar { it.lowercase() }
- val result = "stub.$grpcName((message as ${inputType.name.safeFullName()}).toPlatform())"
- code("\"${method.name}\" -> $result.toKotlin() as R")
-
- importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
- importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
- }
+ val methods = service.methods.filter { !it.serverStreaming }
- code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
+ if (methods.isEmpty()) {
+ code("error(\"Illegal call: \${rpcCall.callableName}\")")
+ return@function
}
+
+ generateCallsImpls(methods)
}
function(
@@ -628,13 +638,57 @@ class ModelToKotlinGenerator(
modifiers = "override",
args = "rpcCall: kotlinx.rpc.RpcCall",
typeParameters = "R",
- returnType = "kotlinx.coroutines.flow.Flow",
+ returnType = "Flow",
) {
- code("error(\"Flow calls are not supported\")")
+ val methods = service.methods.filter { it.serverStreaming }
+
+ if (methods.isEmpty()) {
+ code("error(\"Illegal streaming call: \${rpcCall.callableName}\")")
+ return@function
+ }
+
+ generateCallsImpls(methods)
}
}
}
+ private fun CodeGenerator.generateCallsImpls(
+ methods: List,
+ ) {
+ code("val message = rpcCall.parameters[0]")
+ code("@Suppress(\"UNCHECKED_CAST\")")
+ scope("return when (rpcCall.callableName)") {
+ methods.forEach { method ->
+ val inputType by method.inputType
+ val outputType by method.outputType
+ val grpcName = method.name.replaceFirstChar { it.lowercase() }
+
+ val toKotlin = if (method.serverStreaming) {
+ "map { it.toKotlin() }"
+ } else {
+ "toKotlin()"
+ }
+
+ val toPlatform = if (method.clientStreaming) {
+ "map { it.toPlatform() }"
+ } else {
+ "toPlatform()"
+ }
+
+ val argumentCast = inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)
+ val resultCast = "R".wrapInFlowIf(method.serverStreaming)
+
+ val result = "stub.$grpcName((message as $argumentCast).${toPlatform})"
+ code("\"${method.name}\" -> $result.${toKotlin} as $resultCast")
+
+ importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
+ importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
+ }
+
+ code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
+ }
+ }
+
private fun MessageDeclaration.toPlatformMessageType(): String {
return "${outerClassName.safeFullName()}.${name.fullNestedName()}"
}
diff --git a/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt
new file mode 100644
index 000000000..047f74caf
--- /dev/null
+++ b/protobuf-plugin/src/test/kotlin/kotlinx/rpc/protobuf/test/StreamingTest.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.rpc.protobuf.test
+
+import StreamingTestService
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collectIndexed
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.last
+import kotlinx.coroutines.flow.toList
+import kotlinx.rpc.RpcServer
+import kotlinx.rpc.registerService
+import kotlinx.rpc.withService
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+class StreamingTestServiceImpl : StreamingTestService {
+ override fun Server(message: References): Flow {
+ return flow { emit(message); emit(message); emit(message) }
+ }
+
+ override suspend fun Client(message: Flow): References {
+ return message.last()
+ }
+
+ override fun Bidi(message: Flow): Flow {
+ return message
+ }
+}
+
+class StreamingTest : GrpcServerTest() {
+ override fun RpcServer.registerServices() {
+ registerService { StreamingTestServiceImpl() }
+ }
+
+ @Test
+ fun testServerStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ service.Server(References {
+ other = Other {
+ field= 42
+ }
+ }).toList().run {
+ assertEquals(3, size)
+
+ forEach {
+ assertEquals(42, it.other.field)
+ }
+ }
+ }
+
+ @Test
+ fun testClientStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ val result = service.Client(flow {
+ repeat(3) {
+ emit(References {
+ other = Other {
+ field = 42 + it
+ }
+ })
+ }
+ })
+
+ assertEquals(44, result.other.field)
+ }
+
+ @Test
+ fun testBidiStreaming() = runGrpcTest { grpcClient ->
+ val service = grpcClient.withService()
+ service.Bidi(flow {
+ repeat(3) {
+ emit(References {
+ other = Other {
+ field = 42 + it
+ }
+ })
+ }
+ }).collectIndexed { i, it ->
+ assertEquals(42 + i, it.other.field)
+ }
+ }
+}
diff --git a/protobuf-plugin/src/test/proto/reference.proto b/protobuf-plugin/src/test/proto/reference.proto
index 6f276344c..4a68c5189 100644
--- a/protobuf-plugin/src/test/proto/reference.proto
+++ b/protobuf-plugin/src/test/proto/reference.proto
@@ -1,7 +1,5 @@
syntax = "proto3";
-import "all_primitives.proto";
-
message Other {
string arg = 1;
}
diff --git a/protobuf-plugin/src/test/proto/streaming.proto b/protobuf-plugin/src/test/proto/streaming.proto
new file mode 100644
index 000000000..061f22458
--- /dev/null
+++ b/protobuf-plugin/src/test/proto/streaming.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+
+import "reference_package.proto";
+
+service StreamingTestService {
+ rpc Server(kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
+ rpc Client(stream kotlinx.rpc.protobuf.test.References) returns (kotlinx.rpc.protobuf.test.References);
+ rpc Bidi(stream kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
+}