Skip to content

Commit f6dfbe0

Browse files
committed
grpc-native: Working version of NativeClientCall
Signed-off-by: Johannes Zottele <[email protected]>
1 parent f3e1ef4 commit f6dfbe0

File tree

10 files changed

+607
-133
lines changed

10 files changed

+607
-133
lines changed

cinterop-c/include/grpcpp_c.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ typedef struct {
4747
} grpc_cb_tag;
4848

4949

50-
5150
grpc_client_t *grpc_client_create_insecure(const char *target);
5251
void grpc_client_delete(const grpc_client_t *client);
5352

@@ -77,6 +76,10 @@ typedef struct grpc_channel_credentials grpc_channel_credentials_t;
7776

7877
bool kgrpc_iomgr_run_in_background();
7978

79+
80+
/////// UTILS ///////
81+
82+
8083
#ifdef __cplusplus
8184
}
8285
#endif

cinterop-c/src/grpcpp_c.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ extern "C" {
208208
return grpc_iomgr_run_in_background();
209209
}
210210

211+
211212
}
212213

213214

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
syntax = "proto3";
2+
3+
4+
// The request message containing the user's name.
5+
message HelloRequest {
6+
string name = 1;
7+
optional uint32 timeout = 2;
8+
}
9+
10+
11+
// The response message containing the greetings
12+
message HelloReply {
13+
string message = 1;
14+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
headers = grpcpp_c.h grpc/grpc.h grpc/credentials.h
2-
headerFilter= grpcpp_c.h grpc/slice.h grpc/byte_buffer.h grpc/grpc.h grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h
1+
headers = grpcpp_c.h grpc/grpc.h grpc/credentials.h grpc/byte_buffer_reader.h
2+
headerFilter= grpcpp_c.h grpc/slice.h grpc/byte_buffer.h grpc/grpc.h \
3+
grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h grpc/byte_buffer_reader.h
34

45
noStringConversion = grpc_slice_from_copied_buffer my_grpc_slice_from_copied_buffer
5-
strictEnums = grpc_status_code, grpc_connectivity_state grpc_call_error
6+
strictEnums = grpc_status_code grpc_connectivity_state grpc_call_error
67

78
staticLibraries = libgrpcpp_c_static.a

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ package kotlinx.rpc.grpc
1010
import cnames.structs.grpc_channel
1111
import kotlinx.cinterop.CPointer
1212
import kotlinx.cinterop.ExperimentalForeignApi
13-
import kotlinx.rpc.grpc.internal.ClientCall
14-
import kotlinx.rpc.grpc.internal.GrpcCallOptions
15-
import kotlinx.rpc.grpc.internal.GrpcChannel
16-
import kotlinx.rpc.grpc.internal.MethodDescriptor
13+
import kotlinx.coroutines.*
14+
import kotlinx.rpc.grpc.internal.*
1715
import libgrpcpp_c.*
1816
import kotlin.experimental.ExperimentalNativeApi
1917
import kotlin.native.ref.createCleaner
@@ -34,13 +32,15 @@ public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>>
3432
}
3533

3634
internal actual fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel {
37-
return NativeManagedChannel(
38-
target = "localhost:50051",
39-
credentials = GrpcCredentials(
40-
grpc_insecure_credentials_create()
41-
?: error("Failed to create credentials")
42-
)
43-
)
35+
TODO("Not yet implemented")
36+
// return NativeManagedChannel(
37+
// target = "localhost:50051",
38+
// credentials = GrpcCredentials(
39+
// grpc_insecure_credentials_create()
40+
// ?: error("Failed to create credentials")
41+
// ),
42+
//
43+
// )
4444
}
4545

4646
internal actual fun ManagedChannelBuilder(hostname: String, port: Int): ManagedChannelBuilder<*> {
@@ -52,43 +52,93 @@ internal actual fun ManagedChannelBuilder(target: String): ManagedChannelBuilder
5252
}
5353

5454

55+
private const val GRPC_PROPAGATE_DEFAULTS = 0x0000FFFFu
56+
5557
internal class NativeManagedChannel(
56-
private val target: String,
58+
target: String,
5759
// we must store them, otherwise the credentials are getting released
58-
private val credentials: GrpcCredentials,
60+
credentials: GrpcCredentials,
61+
dispatcher: CoroutineDispatcher = Dispatchers.Default,
5962
) : ManagedChannel, ManagedChannelPlatform() {
6063

64+
private val channelJob = SupervisorJob()
65+
private val callJobSupervisor = SupervisorJob(channelJob)
66+
private val channelScope = CoroutineScope(channelJob + dispatcher)
67+
68+
// the channel's completion queue, handling all request operations
69+
private val cq = CompletionQueue()
70+
6171
internal val raw: CPointer<grpc_channel> = grpc_channel_create(target, credentials.raw, null)
6272
?: error("Failed to create channel")
73+
74+
@Suppress("unused")
6375
private val rawCleaner = createCleaner(raw) {
6476
grpc_channel_destroy(it)
6577
}
6678

6779
override val platformApi: ManagedChannelPlatform = this
6880

69-
override val isShutdown: Boolean
70-
get() = TODO("Not yet implemented")
81+
private var isShutdownInternal: Boolean = false
82+
override val isShutdown: Boolean = isShutdownInternal
83+
private var isTerminatedInternal = CompletableDeferred(Unit)
7184
override val isTerminated: Boolean
72-
get() = TODO("Not yet implemented")
85+
get() = isTerminatedInternal.isCompleted
7386

7487
override suspend fun awaitTermination(duration: Duration): Boolean {
75-
TODO("Not yet implemented")
88+
withTimeoutOrNull(duration) {
89+
isTerminatedInternal.await()
90+
} ?: return false
91+
return true
7692
}
7793

7894
override fun shutdown(): ManagedChannel {
79-
TODO("Not yet implemented")
95+
channelScope.launch {
96+
shutdownInternal(false)
97+
}
98+
return this
8099
}
81100

82101
override fun shutdownNow(): ManagedChannel {
83-
TODO("Not yet implemented")
102+
channelScope.launch {
103+
shutdownInternal(true)
104+
}
105+
return this
106+
}
107+
108+
private suspend fun shutdownInternal(force: Boolean) {
109+
isShutdownInternal = true
110+
if (force) {
111+
callJobSupervisor.cancelChildren(CancellationException("Channel is shutting down"))
112+
}
113+
// prevent any start() calls on already created jobs
114+
callJobSupervisor.complete()
115+
cq.shutdown(force)
116+
// wait for child jobs to complete.
117+
// should be immediate, as the completion queue is shutdown.
118+
callJobSupervisor.join()
119+
isTerminatedInternal.complete(Unit)
84120
}
85121

86122

87123
override fun <RequestT, ResponseT> newCall(
88124
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
89125
callOptions: GrpcCallOptions,
90126
): ClientCall<RequestT, ResponseT> {
91-
TODO("Not yet implemented")
127+
check(!isShutdown) { "Channel is shutdown" }
128+
129+
val parent = channelScope.coroutineContext[Job]!!
130+
val callJob = Job(parent)
131+
val callScope = CoroutineScope(callJob)
132+
133+
val methodNameSlice = methodDescriptor.getFullMethodName().toGrpcSlice()
134+
val rawCall = grpc_channel_create_call(
135+
channel = raw, parent_call = null, propagation_mask = GRPC_PROPAGATE_DEFAULTS, completion_queue = cq.raw,
136+
method = methodNameSlice, host = null, deadline = gpr_inf_future(GPR_CLOCK_REALTIME), reserved = null
137+
) ?: error("Failed to create call")
138+
139+
return NativeClientCall(
140+
cq, rawCall, methodDescriptor, callScope
141+
)
92142
}
93143

94144
override fun authority(): String {
@@ -98,10 +148,15 @@ internal class NativeManagedChannel(
98148
}
99149

100150

101-
internal class GrpcCredentials(
151+
internal sealed class GrpcCredentials(
102152
internal val raw: CPointer<grpc_channel_credentials_t>,
103153
) {
104154
val rawCleaner = createCleaner(raw) {
105155
grpc_channel_credentials_release(it)
106156
}
107157
}
158+
159+
internal class GrpcInsecureCredentials() :
160+
GrpcCredentials(grpc_insecure_credentials_create() ?: error("Failed to create credentials"))
161+
162+

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/ClientCall.native.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
6+
57
package kotlinx.rpc.grpc.internal
68

9+
import kotlinx.cinterop.ExperimentalForeignApi
710
import kotlinx.rpc.grpc.GrpcTrailers
811
import kotlinx.rpc.grpc.Status
912
import kotlinx.rpc.internal.utils.InternalRpcApi
13+
import kotlin.experimental.ExperimentalNativeApi
1014

1115
@InternalRpcApi
1216
public actual abstract class ClientCall<Request, Response> {

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,23 @@ package kotlinx.rpc.grpc.internal
99
import cnames.structs.grpc_call
1010
import kotlinx.atomicfu.atomic
1111
import kotlinx.cinterop.*
12+
import kotlinx.coroutines.NonCancellable
1213
import kotlinx.coroutines.suspendCancellableCoroutine
14+
import kotlinx.coroutines.withContext
1315
import libgrpcpp_c.*
1416
import platform.posix.memset
1517
import kotlin.coroutines.Continuation
1618
import kotlin.coroutines.resume
17-
import kotlin.coroutines.resumeWithException
1819
import kotlin.experimental.ExperimentalNativeApi
1920
import kotlin.native.ref.createCleaner
2021

22+
internal sealed interface BatchResult {
23+
object Success : BatchResult
24+
object ResultError : BatchResult
25+
object CQShutdown : BatchResult
26+
data class CallError(val error: grpc_call_error) : BatchResult
27+
}
28+
2129
/**
2230
* A coroutine wrapper around the grpc completion_queue, which manages message operations.
2331
* It is based on the "new" callback API; therefore, there are no kotlin-side threads required to poll
@@ -27,6 +35,9 @@ internal class CompletionQueue {
2735

2836
private enum class State { OPEN, SHUTTING_DOWN, CLOSED }
2937

38+
// if the queue was called with forceShutdown = true,
39+
// it will reject all new batches and wait for all current ones to finish.
40+
private var forceShutdown = false
3041
private val state = atomic(State.OPEN)
3142

3243
// internal as it must be accessible from the SHUTDOWN_CB,
@@ -47,7 +58,10 @@ internal class CompletionQueue {
4758

4859
val raw = grpc_completion_queue_create_for_callback(shutdownFunctor.ptr, null)
4960

61+
@Suppress("unused")
5062
private val thisStableRefCleaner = createCleaner(thisStableRef) { it.dispose() }
63+
64+
@Suppress("unused")
5165
private val shutdownFunctorCleaner = createCleaner(shutdownFunctor) { nativeHeap.free(it) }
5266

5367
init {
@@ -56,16 +70,22 @@ internal class CompletionQueue {
5670
require(kgrpc_iomgr_run_in_background()) { "The gRPC iomgr is not running background threads, required for callback based APIs." }
5771
}
5872

59-
suspend fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong) =
60-
suspendCancellableCoroutine<grpc_call_error> { cont ->
73+
// TODO: Remove this method
74+
suspend fun runBatch(call: NativeClientCall<*, *>, ops: CPointer<grpc_op>, nOps: ULong) =
75+
runBatch(call.raw, ops, nOps)
76+
77+
suspend fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong): BatchResult =
78+
suspendCancellableCoroutine<BatchResult> { cont ->
6179
val tag = newCbTag(cont, OPS_COMPLETE_CB)
6280

6381
var err = grpc_call_error.GRPC_CALL_ERROR
6482
// synchronizes access to grpc_call_start_batch
6583
withBatchStartLock {
66-
if (state.value != State.OPEN) {
84+
if (forceShutdown || state.value == State.CLOSED) {
85+
// if the queue is either closed or in the process of a FORCE shutdown,
86+
// new batches will instantly fail.
6787
deleteCbTag(tag)
68-
cont.resume(grpc_call_error.GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN)
88+
cont.resume(BatchResult.CQShutdown)
6989
return@suspendCancellableCoroutine
7090
}
7191

@@ -75,7 +95,7 @@ internal class CompletionQueue {
7595
if (err != grpc_call_error.GRPC_CALL_OK) {
7696
// if the call was not successful, the callback will not be invoked.
7797
deleteCbTag(tag)
78-
cont.resume(err)
98+
cont.resume(BatchResult.CallError(grpc_call_error.GRPC_CALL_ERROR))
7999
return@suspendCancellableCoroutine
80100
}
81101

@@ -87,16 +107,29 @@ internal class CompletionQueue {
87107
// cancel the call if one of its batches is canceled.
88108
// grpc_call_cancel is thread-safe and can be called several times.
89109
// the callback is invoked anyway, so the tag doesn't get deleted here.
90-
grpc_call_cancel(call, null)
110+
if (it != null) {
111+
grpc_call_cancel_with_status(
112+
call,
113+
grpc_status_code.GRPC_STATUS_CANCELLED,
114+
"Call got cancelled: ${it.message}",
115+
null
116+
)
117+
} else {
118+
grpc_call_cancel(call, null)
119+
}
91120
}
92121
}
93122

94-
suspend fun shutdown() {
123+
// must not be canceled as it cleans resources and sets the state to CLOSED
124+
suspend fun shutdown(force: Boolean = false) = withContext(NonCancellable) {
125+
if (force) {
126+
forceShutdown = true
127+
}
95128
if (!state.compareAndSet(State.OPEN, State.SHUTTING_DOWN)) {
96129
// the first call to shutdown() makes transition and to SHUTTING_DOWN and
97130
// initiates shut down. all other invocations await the shutdown.
98131
_shutdownDone.await()
99-
return
132+
return@withContext
100133
}
101134

102135
// wait until all batch operations since the state transitions were started.
@@ -126,10 +159,10 @@ internal class CompletionQueue {
126159
@CName("kq_ops_complete_cb")
127160
private fun opsCompleteCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
128161
val tag = functor!!.reinterpret<grpc_cb_tag>()
129-
val cont = tag.pointed.user_data!!.asStableRef<Continuation<grpc_call_error>>().get()
162+
val cont = tag.pointed.user_data!!.asStableRef<Continuation<BatchResult>>().get()
130163
deleteCbTag(tag)
131-
if (ok != 0) cont.resume(grpc_call_error.GRPC_CALL_OK)
132-
else cont.resumeWithException(IllegalStateException("batch failed"))
164+
if (ok != 0) cont.resume(BatchResult.Success)
165+
else cont.resume(BatchResult.ResultError)
133166
}
134167

135168
@CName("kq_shutdown_cb")

0 commit comments

Comments
 (0)