Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/pages/kotlinx-rpc/topics/platforms.topic
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
-->

<!DOCTYPE topic
SYSTEM "https://resources.jetbrains.com/writerside/1.0/xhtml-entities.dtd">
<!--suppress WrsMissingSpaceChecker -->
Expand Down Expand Up @@ -68,6 +72,14 @@
<td><list><li>apple<list><li>ios<list><li>iosArm64</li><li>iosSimulatorArm64</li><li>iosX64</li></list></li><li>macos<list><li>macosArm64</li><li>macosX64</li></list></li><li>watchos<list><li>watchosArm32</li><li>watchosArm64</li><li>watchosDeviceArm64</li><li>watchosSimulatorArm64</li><li>watchosX64</li></list></li><li>tvos<list><li>tvosArm64</li><li>tvosSimulatorArm64</li><li>tvosX64</li></list></li></list></li><li>linux<list><li>linuxArm64</li><li>linuxX64</li></list></li><li>windows<list><li>mingwX64</li></list></li></list></td>
</tr>

<tr>
<td>protobuf-plugin</td>
<td>Jvm Only</td>
<td>-</td>
<td>-</td>
<td>-</td>
</tr>

<tr>
<td>utils</td>
<td>jvm</td>
Expand All @@ -76,6 +88,14 @@
<td><list><li>apple<list><li>ios<list><li>iosArm64</li><li>iosSimulatorArm64</li><li>iosX64</li></list></li><li>macos<list><li>macosArm64</li><li>macosX64</li></list></li><li>watchos<list><li>watchosArm32</li><li>watchosArm64</li><li>watchosDeviceArm64</li><li>watchosSimulatorArm64</li><li>watchosX64</li></list></li><li>tvos<list><li>tvosArm64</li><li>tvosSimulatorArm64</li><li>tvosX64</li></list></li></list></li><li>linux<list><li>linuxArm64</li><li>linuxX64</li></list></li><li>windows<list><li>mingwX64</li></list></li></list></td>
</tr>

<tr>
<td>grpc-core</td>
<td>jvm</td>
<td><list><li>browser</li><li>node</li></list></td>
<td><list><li>wasmJs<list><li>browser</li><li>d8</li><li>node</li></list></li></list></td>
<td><list><li>apple<list><li>ios<list><li>iosArm64</li><li>iosSimulatorArm64</li><li>iosX64</li></list></li><li>macos<list><li>macosArm64</li><li>macosX64</li></list></li><li>watchos<list><li>watchosArm32</li><li>watchosArm64</li><li>watchosDeviceArm64</li><li>watchosSimulatorArm64</li><li>watchosX64</li></list></li><li>tvos<list><li>tvosArm64</li><li>tvosSimulatorArm64</li><li>tvosX64</li></list></li></list></li><li>linux<list><li>linuxArm64</li><li>linuxX64</li></list></li><li>windows<list><li>mingwX64</li></list></li></list></td>
</tr>

<tr>
<td>krpc-client</td>
<td>jvm</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package kotlinx.rpc.protobuf
import kotlinx.rpc.protobuf.CodeGenerator.DeclarationType
import kotlinx.rpc.protobuf.model.*
import org.slf4j.Logger
import kotlin.getValue

private const val RPC_INTERNAL_PACKAGE_SUFFIX = "_rpc_internal"

Expand Down Expand Up @@ -50,6 +51,7 @@ class ModelToKotlinGenerator(
generatePublicDeclaredEntities(this@generatePublicKotlinFile)

import("kotlinx.rpc.internal.utils.*")
import("kotlinx.coroutines.flow.*")

additionalPublicImports.forEach {
import(it)
Expand All @@ -76,6 +78,7 @@ class ModelToKotlinGenerator(
generateInternalDeclaredEntities(this@generateInternalKotlinFile)

import("kotlinx.rpc.internal.utils.*")
import("kotlinx.coroutines.flow.*")

additionalInternalImports.forEach {
import(it)
Expand Down Expand Up @@ -510,19 +513,22 @@ class ModelToKotlinGenerator(
code("@kotlinx.rpc.grpc.annotations.Grpc")
clazz(service.name.simpleName, declarationType = DeclarationType.Interface) {
service.methods.forEach { method ->
// no streaming for now
val inputType by method.inputType
val outputType by method.outputType
function(
name = method.name,
modifiers = "suspend",
args = "message: ${inputType.name.safeFullName()}",
returnType = outputType.name.safeFullName(),
modifiers = if (method.serverStreaming) "" else "suspend",
args = "message: ${inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)}",
returnType = outputType.name.safeFullName().wrapInFlowIf(method.serverStreaming),
)
}
}
}

private fun String.wrapInFlowIf(condition: Boolean): String {
return if (condition) "Flow<$this>" else this
}

private fun CodeGenerator.generateInternalService(service: ServiceDeclaration) {
code("@Suppress(\"unused\", \"all\")")
clazz(
Expand Down Expand Up @@ -566,11 +572,23 @@ class ModelToKotlinGenerator(

function(
name = grpcName,
modifiers = "override suspend",
args = "request: ${inputType.toPlatformMessageType()}",
returnType = outputType.toPlatformMessageType(),
modifiers = "override${if (method.serverStreaming) "" else " suspend"}",
args = "request: ${inputType.toPlatformMessageType().wrapInFlowIf(method.clientStreaming)}",
returnType = outputType.toPlatformMessageType().wrapInFlowIf(method.serverStreaming),
) {
code("return impl.${method.name}(request.toKotlin()).toPlatform()")
val toKotlin = if (method.clientStreaming) {
"map { it.toKotlin() }"
} else {
"toKotlin()"
}

val toPlatform = if (method.serverStreaming) {
"map { it.toPlatform() }"
} else {
"toPlatform()"
}

code("return impl.${method.name}(request.${toKotlin}).${toPlatform}")

importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
Expand Down Expand Up @@ -605,36 +623,72 @@ class ModelToKotlinGenerator(
typeParameters = "R",
returnType = "R",
) {
code("val message = rpcCall.parameters[0]")
code("@Suppress(\"UNCHECKED_CAST\")")
scope("return when (rpcCall.callableName)") {
service.methods.forEach { method ->
val inputType by method.inputType
val outputType by method.outputType
val grpcName = method.name.replaceFirstChar { it.lowercase() }
val result = "stub.$grpcName((message as ${inputType.name.safeFullName()}).toPlatform())"
code("\"${method.name}\" -> $result.toKotlin() as R")

importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
}
val methods = service.methods.filter { !it.serverStreaming }

code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
if (methods.isEmpty()) {
code("error(\"Illegal call: \${rpcCall.callableName}\")")
return@function
}

generateCallsImpls(methods)
}

function(
name = "callServerStreaming",
modifiers = "override",
args = "rpcCall: kotlinx.rpc.RpcCall",
typeParameters = "R",
returnType = "kotlinx.coroutines.flow.Flow<R>",
returnType = "Flow<R>",
) {
code("error(\"Flow calls are not supported\")")
val methods = service.methods.filter { it.serverStreaming }

if (methods.isEmpty()) {
code("error(\"Illegal streaming call: \${rpcCall.callableName}\")")
return@function
}

generateCallsImpls(methods)
}
}
}

private fun CodeGenerator.generateCallsImpls(
methods: List<MethodDeclaration>,
) {
code("val message = rpcCall.parameters[0]")
code("@Suppress(\"UNCHECKED_CAST\")")
scope("return when (rpcCall.callableName)") {
methods.forEach { method ->
val inputType by method.inputType
val outputType by method.outputType
val grpcName = method.name.replaceFirstChar { it.lowercase() }

val toKotlin = if (method.serverStreaming) {
"map { it.toKotlin() }"
} else {
"toKotlin()"
}

val toPlatform = if (method.clientStreaming) {
"map { it.toPlatform() }"
} else {
"toPlatform()"
}

val argumentCast = inputType.name.safeFullName().wrapInFlowIf(method.clientStreaming)
val resultCast = "R".wrapInFlowIf(method.serverStreaming)

val result = "stub.$grpcName((message as $argumentCast).${toPlatform})"
code("\"${method.name}\" -> $result.${toKotlin} as $resultCast")

importRootDeclarationIfNeeded(inputType.name, "toPlatform", true)
importRootDeclarationIfNeeded(outputType.name, "toKotlin", true)
}

code("else -> error(\"Illegal call: \${rpcCall.callableName}\")")
}
}

private fun MessageDeclaration.toPlatformMessageType(): String {
return "${outerClassName.safeFullName()}.${name.fullNestedName()}"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.protobuf.test

import StreamingTestService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.toList
import kotlinx.rpc.RpcServer
import kotlinx.rpc.registerService
import kotlinx.rpc.withService
import kotlin.test.Test
import kotlin.test.assertEquals

class StreamingTestServiceImpl : StreamingTestService {
override fun Server(message: References): Flow<References> {
return flow { emit(message); emit(message); emit(message) }
}

override suspend fun Client(message: Flow<References>): References {
return message.last()
}

override fun Bidi(message: Flow<References>): Flow<References> {
return message
}
}

class StreamingTest : GrpcServerTest() {
override fun RpcServer.registerServices() {
registerService<StreamingTestService> { StreamingTestServiceImpl() }
}

@Test
fun testServerStreaming() = runGrpcTest { grpcClient ->
val service = grpcClient.withService<StreamingTestService>()
service.Server(References {
other = Other {
field= 42
}
}).toList().run {
assertEquals(3, size)

forEach {
assertEquals(42, it.other.field)
}
}
}

@Test
fun testClientStreaming() = runGrpcTest { grpcClient ->
val service = grpcClient.withService<StreamingTestService>()
val result = service.Client(flow {
repeat(3) {
emit(References {
other = Other {
field = 42 + it
}
})
}
})

assertEquals(44, result.other.field)
}

@Test
fun testBidiStreaming() = runGrpcTest { grpcClient ->
val service = grpcClient.withService<StreamingTestService>()
service.Bidi(flow {
repeat(3) {
emit(References {
other = Other {
field = 42 + it
}
})
}
}).collectIndexed { i, it ->
assertEquals(42 + i, it.other.field)
}
}
}
2 changes: 0 additions & 2 deletions protobuf-plugin/src/test/proto/reference.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
syntax = "proto3";

import "all_primitives.proto";

message Other {
string arg = 1;
}
Expand Down
9 changes: 9 additions & 0 deletions protobuf-plugin/src/test/proto/streaming.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

import "reference_package.proto";

service StreamingTestService {
rpc Server(kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
rpc Client(stream kotlinx.rpc.protobuf.test.References) returns (kotlinx.rpc.protobuf.test.References);
rpc Bidi(stream kotlinx.rpc.protobuf.test.References) returns (stream kotlinx.rpc.protobuf.test.References);
}