Skip to content

Commit 2184915

Browse files
committed
grpc-native: Implement bridge to common
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 1128209 commit 2184915

File tree

17 files changed

+476
-213
lines changed

17 files changed

+476
-213
lines changed

cinterop-c/include/grpcpp_c.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ typedef struct grpc_channel_credentials grpc_channel_credentials_t;
7777
bool kgrpc_iomgr_run_in_background();
7878

7979

80-
/////// UTILS ///////
81-
82-
8380
#ifdef __cplusplus
8481
}
8582
#endif

grpc/grpc-core/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ kotlin {
6161
implementation(libs.grpc.netty)
6262
}
6363
}
64+
65+
nativeMain {
66+
dependencies {
67+
// required for status.proto
68+
implementation(projects.protobuf.protobufCore)
69+
}
70+
}
6471
}
6572

6673
configureCLibCInterop(project, ":grpcpp_c_static") { cinteropCLib ->

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ import kotlinx.rpc.grpc.GrpcTrailers
88
import kotlinx.rpc.grpc.Status
99
import kotlinx.rpc.internal.utils.InternalRpcApi
1010

11+
/**
12+
* This class represents a client-side call to a server.
13+
* It provides the interface of the gRPC-Java ClientCall class; however, semantics are slightly different
14+
* on JVM and Native platforms.
15+
*
16+
* Callback execution:
17+
* - On JVM it is guaranteed that callbacks aren't executed concurrently.
18+
* - On Native, it is only guaranteed that `onClose` is called after all other callbacks finished.
19+
*/
1120
@InternalRpcApi
1221
public expect abstract class ClientCall<Request, Response> {
1322
@InternalRpcApi

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/MethodDescriptor.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ public enum class MethodType {
3737
UNKNOWN,
3838
}
3939

40+
/**
41+
* Creates a new [MethodDescriptor] instance.
42+
*
43+
* @param fullMethodName the full name of the method, consisting of the service name followed by a forward slash
44+
* and the method name. It does not include a leading slash.
45+
*/
4046
@InternalRpcApi
4147
public expect fun <Request, Response> methodDescriptor(
4248
fullMethodName: String,

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,13 @@
44

55
package kotlinx.rpc.grpc.internal
66

7-
import kotlinx.coroutines.CancellationException
8-
import kotlinx.coroutines.CoroutineName
9-
import kotlinx.coroutines.NonCancellable
10-
import kotlinx.coroutines.cancel
7+
import kotlinx.coroutines.*
118
import kotlinx.coroutines.channels.Channel
129
import kotlinx.coroutines.channels.onFailure
13-
import kotlinx.coroutines.coroutineScope
1410
import kotlinx.coroutines.flow.Flow
1511
import kotlinx.coroutines.flow.flow
1612
import kotlinx.coroutines.flow.single
17-
import kotlinx.coroutines.launch
18-
import kotlinx.coroutines.withContext
19-
import kotlinx.rpc.grpc.GrpcTrailers
20-
import kotlinx.rpc.grpc.Status
21-
import kotlinx.rpc.grpc.StatusCode
22-
import kotlinx.rpc.grpc.StatusException
23-
import kotlinx.rpc.grpc.code
13+
import kotlinx.rpc.grpc.*
2414
import kotlinx.rpc.internal.utils.InternalRpcApi
2515

2616
// heavily inspired by
@@ -249,7 +239,7 @@ internal fun <T> Flow<T>.singleOrStatusFlow(
249239

250240
internal suspend fun <T> Flow<T>.singleOrStatus(
251241
expected: String,
252-
descriptor: Any
242+
descriptor: Any,
253243
): T = singleOrStatusFlow(expected, descriptor).single()
254244

255245
internal class Ready(private val isReallyReady: () -> Boolean) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.grpc.test
6+
7+
import grpc.examples.echo.*
8+
import kotlinx.coroutines.delay
9+
import kotlinx.coroutines.flow.flow
10+
import kotlinx.coroutines.test.runTest
11+
import kotlinx.rpc.grpc.ManagedChannelBuilder
12+
import kotlinx.rpc.grpc.buildChannel
13+
import kotlinx.rpc.grpc.internal.*
14+
import kotlin.test.Test
15+
import kotlin.test.assertEquals
16+
17+
class CancellationClientTest {
18+
19+
@Test
20+
fun unaryEchoTest() = runTest(
21+
methodName = "UnaryEcho",
22+
type = MethodType.UNARY,
23+
) { channel, descriptor ->
24+
val response = unaryRpc(channel, descriptor, EchoRequest { message = "Eccchhooo" })
25+
assertEquals("Eccchhooo", response.message)
26+
}
27+
28+
@Test
29+
fun serverStreamingEchoTest() = runTest(
30+
methodName = "ServerStreamingEcho",
31+
type = MethodType.SERVER_STREAMING,
32+
) { channel, descriptor ->
33+
val response = serverStreamingRpc(channel, descriptor, EchoRequest { message = "Eccchhooo" })
34+
var i = 0
35+
response.collect {
36+
println("Received: ${i++}")
37+
assertEquals("Eccchhooo", it.message)
38+
}
39+
}
40+
41+
@Test
42+
fun clientStreamingTest() = runTest(
43+
methodName = "ServerStreamingEcho",
44+
type = MethodType.CLIENT_STREAMING,
45+
) { channel, descriptor ->
46+
val response = clientStreamingRpc(channel, descriptor, flow {
47+
repeat(5) {
48+
delay(100)
49+
println("Sending: ${it + 1}")
50+
emit(EchoRequest { message = "Eccchhooo" })
51+
}
52+
})
53+
val expected = "Eccchhooo,Eccchhooo,Eccchhooo,Eccchhooo,Eccchhooo"
54+
assertEquals(expected, response.message)
55+
}
56+
57+
fun runTest(
58+
methodName: String,
59+
type: MethodType,
60+
block: suspend (GrpcChannel, MethodDescriptor<EchoRequest, EchoResponse>) -> Unit,
61+
) = runTest {
62+
val channel = ManagedChannelBuilder("localhost:50051")
63+
.usePlaintext()
64+
.buildChannel()
65+
66+
val methodDescriptor = methodDescriptor(
67+
fullMethodName = "grpc.examples.echo.Echo/$methodName",
68+
requestCodec = EchoRequestInternal.CODEC,
69+
responseCodec = EchoResponseInternal.CODEC,
70+
type = type,
71+
schemaDescriptor = Unit,
72+
idempotent = true,
73+
safe = true,
74+
sampledToLocalTracing = true,
75+
)
76+
77+
try {
78+
block(channel.platformApi, methodDescriptor)
79+
} finally {
80+
channel.shutdown()
81+
channel.awaitTermination()
82+
}
83+
84+
85+
}
86+
87+
88+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
syntax = "proto3";
2+
3+
package grpc.examples.echo;
4+
5+
// EchoRequest is the request for echo.
6+
message EchoRequest {
7+
string message = 1;
8+
}
9+
10+
// EchoResponse is the response for echo.
11+
message EchoResponse {
12+
string message = 1;
13+
}
14+
15+
// Echo is the echo service.
16+
service Echo {
17+
// UnaryEcho is unary echo.
18+
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
19+
// ServerStreamingEcho is server side streaming.
20+
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
21+
// ClientStreamingEcho is client side streaming.
22+
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
23+
// BidirectionalStreamingEcho is bidi streaming.
24+
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
25+
}

grpc/grpc-core/src/nativeInterop/cinterop/libgrpcpp_c.def

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
headers = grpcpp_c.h grpc/grpc.h grpc/credentials.h grpc/byte_buffer_reader.h
1+
headers = grpcpp_c.h grpc/grpc.h grpc/credentials.h grpc/byte_buffer_reader.h \
2+
grpc/support/alloc.h
3+
24
headerFilter= grpcpp_c.h grpc/slice.h grpc/byte_buffer.h grpc/grpc.h \
3-
grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h grpc/byte_buffer_reader.h
5+
grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h grpc/byte_buffer_reader.h \
6+
grpc/support/alloc.h
47

58
noStringConversion = grpc_slice_from_copied_buffer my_grpc_slice_from_copied_buffer
69
strictEnums = grpc_status_code grpc_connectivity_state grpc_call_error

0 commit comments

Comments
 (0)