Skip to content

Commit f3e1ef4

Browse files
committed
grpc-native: Working CompletionQueue
Signed-off-by: Johannes Zottele <[email protected]>
1 parent c19cdd6 commit f3e1ef4

File tree

5 files changed

+82
-34
lines changed

5 files changed

+82
-34
lines changed

cinterop-c/include/grpcpp_c.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define GRPCPP_C_H
77

88
#include <stdint.h>
9+
#include <stdbool.h>
910
#include <grpc/grpc.h>
1011
#include <grpc/slice.h>
1112
#include <grpc/byte_buffer.h>
@@ -74,6 +75,8 @@ grpc_status_code_t grpc_byte_buffer_dump_to_single_slice(grpc_byte_buffer *byte_
7475
typedef struct grpc_channel grpc_channel_t;
7576
typedef struct grpc_channel_credentials grpc_channel_credentials_t;
7677

78+
bool kgrpc_iomgr_run_in_background();
79+
7780
#ifdef __cplusplus
7881
}
7982
#endif

cinterop-c/src/grpcpp_c.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <grpcpp/impl/client_unary_call.h>
1111
#include <google/protobuf/io/coded_stream.h>
1212
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
13+
#include "src/core/lib/iomgr/iomgr.h"
1314

1415
namespace pb = google::protobuf;
1516

@@ -203,7 +204,9 @@ extern "C" {
203204

204205
//// CHANNEL ////
205206

206-
207+
bool kgrpc_iomgr_run_in_background() {
208+
return grpc_iomgr_run_in_background();
209+
}
207210

208211
}
209212

grpc/grpc-core/src/nativeInterop/cinterop/libgrpcpp_c.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ headers = grpcpp_c.h grpc/grpc.h grpc/credentials.h
22
headerFilter= grpcpp_c.h grpc/slice.h grpc/byte_buffer.h grpc/grpc.h grpc/impl/grpc_types.h grpc/credentials.h grpc/support/time.h
33

44
noStringConversion = grpc_slice_from_copied_buffer my_grpc_slice_from_copied_buffer
5+
strictEnums = grpc_status_code, grpc_connectivity_state grpc_call_error
56

67
staticLibraries = libgrpcpp_c_static.a

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package kotlinx.rpc.grpc.internal
99
import cnames.structs.grpc_call
1010
import kotlinx.atomicfu.atomic
1111
import kotlinx.cinterop.*
12-
import kotlinx.coroutines.coroutineScope
1312
import kotlinx.coroutines.suspendCancellableCoroutine
1413
import libgrpcpp_c.*
1514
import platform.posix.memset
@@ -19,9 +18,20 @@ import kotlin.coroutines.resumeWithException
1918
import kotlin.experimental.ExperimentalNativeApi
2019
import kotlin.native.ref.createCleaner
2120

21+
/**
22+
* A coroutine wrapper around the grpc completion_queue, which manages message operations.
23+
* It is based on the "new" callback API; therefore, there are no kotlin-side threads required to poll
24+
* the queue.
25+
*/
2226
internal class CompletionQueue {
2327

24-
// internal as it must be accessible from the SHUTDOWN_CB
28+
private enum class State { OPEN, SHUTTING_DOWN, CLOSED }
29+
30+
private val state = atomic(State.OPEN)
31+
32+
// internal as it must be accessible from the SHUTDOWN_CB,
33+
// but it shouldn't be used from outside this file.
34+
@Suppress("PropertyName")
2535
internal val _shutdownDone = kotlinx.coroutines.CompletableDeferred<Unit>()
2636

2737
// used for spinning lock. false means not used (available)
@@ -40,58 +50,94 @@ internal class CompletionQueue {
4050
private val thisStableRefCleaner = createCleaner(thisStableRef) { it.dispose() }
4151
private val shutdownFunctorCleaner = createCleaner(shutdownFunctor) { nativeHeap.free(it) }
4252

43-
suspend fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong) = coroutineScope {
44-
suspendCancellableCoroutine<Unit> { cont ->
53+
init {
54+
// Assert grpc_iomgr_run_in_background() to guarantee that the event manager provides
55+
// IO threads and supports the callback API.
56+
require(kgrpc_iomgr_run_in_background()) { "The gRPC iomgr is not running background threads, required for callback based APIs." }
57+
}
58+
59+
suspend fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong) =
60+
suspendCancellableCoroutine<grpc_call_error> { cont ->
4561
val tag = newCbTag(cont, OPS_COMPLETE_CB)
4662

63+
var err = grpc_call_error.GRPC_CALL_ERROR
4764
// synchronizes access to grpc_call_start_batch
48-
while (!batchStartGuard.compareAndSet(expect = false, update = true)) {
49-
// could not be set to true (currently hold by different thread)
50-
}
65+
withBatchStartLock {
66+
if (state.value != State.OPEN) {
67+
deleteCbTag(tag)
68+
cont.resume(grpc_call_error.GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN)
69+
return@suspendCancellableCoroutine
70+
}
5171

52-
var err: UInt
53-
try {
5472
err = grpc_call_start_batch(call, ops, nOps, tag, null)
55-
} finally {
56-
batchStartGuard.value = false
5773
}
5874

59-
if (err != 0u) {
75+
if (err != grpc_call_error.GRPC_CALL_OK) {
76+
// if the call was not successful, the callback will not be invoked.
6077
deleteCbTag(tag)
61-
cont.resumeWithException(IllegalStateException("start_batch err=$err"))
78+
cont.resume(err)
6279
return@suspendCancellableCoroutine
6380
}
6481

6582

6683
cont.invokeOnCancellation {
67-
this // keep reference, otherwise the cleaners might get cleaned before batch finishes
68-
TODO("Implement call operation cancellation")
84+
@Suppress("UnusedExpression")
85+
// keep reference, otherwise the cleaners might get invoked before the batch finishes
86+
this
87+
// cancel the call if one of its batches is canceled.
88+
// grpc_call_cancel is thread-safe and can be called several times.
89+
// the callback is invoked anyway, so the tag doesn't get deleted here.
90+
grpc_call_cancel(call, null)
6991
}
7092
}
71-
}
7293

7394
suspend fun shutdown() {
74-
if (_shutdownDone.isCompleted) return
95+
if (!state.compareAndSet(State.OPEN, State.SHUTTING_DOWN)) {
96+
// the first call to shutdown() makes transition and to SHUTTING_DOWN and
97+
// initiates shut down. all other invocations await the shutdown.
98+
_shutdownDone.await()
99+
return
100+
}
101+
102+
// wait until all batch operations since the state transitions were started.
103+
// this is required to prevent batches from starting after shutdown was initialized
104+
withBatchStartLock { }
105+
75106
grpc_completion_queue_shutdown(raw)
76107
_shutdownDone.await()
108+
state.value = State.CLOSED
109+
}
110+
111+
private inline fun withBatchStartLock(block: () -> Unit) {
112+
try {
113+
// spin until this thread occupies the guard
114+
@Suppress("ControlFlowWithEmptyBody")
115+
while (!batchStartGuard.compareAndSet(expect = false, update = true)) {
116+
}
117+
block()
118+
} finally {
119+
// set guard to "not occupied"
120+
batchStartGuard.value = false
121+
}
77122
}
78123
}
79124

125+
// kq stands for kompletion_queue lol
80126
@CName("kq_ops_complete_cb")
81127
private fun opsCompleteCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
82128
val tag = functor!!.reinterpret<grpc_cb_tag>()
83-
val cont = tag.pointed.user_data!!.asStableRef<Continuation<Unit>>().get()
129+
val cont = tag.pointed.user_data!!.asStableRef<Continuation<grpc_call_error>>().get()
84130
deleteCbTag(tag)
85-
if (ok != 0) cont.resume(Unit) else cont.resumeWithException(IllegalStateException("batch failed"))
131+
if (ok != 0) cont.resume(grpc_call_error.GRPC_CALL_OK)
132+
else cont.resumeWithException(IllegalStateException("batch failed"))
86133
}
87134

88135
@CName("kq_shutdown_cb")
89136
private fun shutdownCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
90137
val tag = functor!!.reinterpret<grpc_cb_tag>()
91138
val cq = tag.pointed.user_data!!.asStableRef<CompletionQueue>().get()
92-
check(ok != 0) { "CQ shutdown failed" }
93-
grpc_completion_queue_destroy(cq.raw)
94139
cq._shutdownDone.complete(Unit)
140+
grpc_completion_queue_destroy(cq.raw)
95141
}
96142

97143
private val OPS_COMPLETE_CB = staticCFunction(::opsCompleteCb)

grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/internal/CoreTest.kt

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@ class GrpcCoreTest {
1919
val GRPC_PROPAGATE_DEFAULTS = 0x0000FFFFu
2020

2121
suspend fun doCall(reqBytes: ByteArray) = withArena { arena ->
22-
23-
val shutdownFunctor = arena.alloc<grpc_completion_queue_functor>()
24-
shutdownFunctor.functor_run = staticCFunction { tag, success -> println("Shutting down") }
25-
2622
val cq = CompletionQueue()
27-
// val cq = grpc_completion_queue_create_for_callback(shutdownFunctor.ptr, null)
2823

2924
val creds = grpc_insecure_credentials_create()!!
3025
val channel = grpc_channel_create("localhost:50051", creds, null)!!
@@ -86,18 +81,18 @@ class GrpcCoreTest {
8681

8782
coroutineScope {
8883

89-
launch {
90-
println("Shutting down")
91-
cq.shutdown()
92-
println("Shutdown")
93-
}
94-
9584
launch {
9685
println("Start continuation call")
9786
cq.runBatch(call!!, ops, 6u)
9887
println("Call continuation done")
9988
}
100-
}.join()
89+
90+
launch {
91+
println("Shutting down")
92+
cq.shutdown()
93+
println("Shutdown")
94+
}
95+
}
10196

10297

10398
println("Status code: ${statusCode.value}")

0 commit comments

Comments
 (0)