Skip to content

Commit f326ed2

Browse files
committed
grpc-native: Refactor to use callbacks instead of coroutines
Signed-off-by: Johannes Zottele <[email protected]>
1 parent a1e2985 commit f326ed2

File tree

5 files changed

+177
-162
lines changed

5 files changed

+177
-162
lines changed

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,15 @@ internal class NativeManagedChannel(
5858
target: String,
5959
// we must store them, otherwise the credentials are getting released
6060
credentials: GrpcCredentials,
61-
dispatcher: CoroutineDispatcher = Dispatchers.Default,
6261
) : ManagedChannel, ManagedChannelPlatform() {
6362

64-
// A reference to make sure the grpc_init() was called. (it is released after shutdown)
63+
// a reference to make sure the grpc_init() was called. (it is released after shutdown)
6564
@Suppress("unused")
6665
private val rt = GrpcRuntime.acquire()
6766

68-
private val channelJob = SupervisorJob()
69-
private val callJobSupervisor = SupervisorJob(channelJob)
70-
private val channelScope = CoroutineScope(channelJob + dispatcher)
67+
// job bundling all the call jobs created by this channel.
68+
// this allows easy cancellation of ongoing calls.
69+
private val callJobSupervisor = SupervisorJob()
7170

7271
// the channel's completion queue, handling all request operations
7372
private val cq = CompletionQueue()
@@ -111,23 +110,15 @@ internal class NativeManagedChannel(
111110
return
112111
}
113112
if (force) {
113+
// TODO: replace jobs by custom pendingCallClass.
114114
callJobSupervisor.cancelChildren(CancellationException("Channel is shutting down"))
115115
}
116-
// prevent any start() calls on already call jobs
117-
callJobSupervisor.children.forEach {
118-
(it as CompletableJob).complete()
119-
}
120-
callJobSupervisor.complete()
121-
channelScope.launch {
122-
withContext(NonCancellable) {
123-
// wait for all child jobs to complete.
124-
callJobSupervisor.join()
125-
// wait for the completion queue to shut down.
126-
cq.shutdown(force).await()
127-
if (isTerminatedInternal.complete(Unit)) {
128-
// release the grpc runtime, so it might call grpc_shutdown()
129-
rt.close()
130-
}
116+
117+
// wait for the completion queue to shut down.
118+
cq.shutdown(force).onComplete {
119+
if (isTerminatedInternal.complete(Unit)) {
120+
// release the grpc runtime, so it might call grpc_shutdown()
121+
rt.close()
131122
}
132123
}
133124
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CallbackFuture.kt

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,44 @@ package kotlinx.rpc.grpc.internal
66

77
import kotlinx.atomicfu.atomic
88

9+
/**
10+
* Thread safe future for callbacks.
11+
*/
912
internal class CallbackFuture<T : Any> {
10-
private val value = atomic<T?>(null)
11-
private val callback = atomic<((T) -> Unit)?>(null)
13+
private sealed interface State<out T> {
14+
data class Pending<T>(val cbs: List<(T) -> Unit> = emptyList()) : State<T>
15+
data class Done<T>(val value: T) : State<T>
16+
}
17+
18+
private val state = atomic<State<T>>(State.Pending())
1219

1320
fun complete(result: T) {
14-
if (value.compareAndSet(null, result)) {
15-
callback.getAndSet(null)?.invoke(result)
16-
} else {
17-
error("Already completed")
21+
var toInvoke: List<(T) -> Unit>
22+
while (true) {
23+
when (val s = state.value) {
24+
is State.Pending -> if (state.compareAndSet(s, State.Done(result))) {
25+
toInvoke = s.cbs
26+
break
27+
}
28+
29+
is State.Done -> error("Already completed")
30+
}
1831
}
32+
for (cb in toInvoke) cb(result)
1933
}
2034

21-
fun onComplete(cb: (T) -> Unit) {
22-
val r = value.value
23-
if (r != null) cb(r)
24-
else if (!callback.compareAndSet(null, cb)) {
25-
// Already someone registered → run immediately
26-
value.value?.let(cb)
35+
fun onComplete(callback: (T) -> Unit) {
36+
while (true) {
37+
when (val s = state.value) {
38+
is State.Done -> {
39+
callback(s.value); return
40+
}
41+
42+
is State.Pending -> {
43+
val next = State.Pending(s.cbs + callback) // copy-on-write append
44+
if (state.compareAndSet(s, next)) return
45+
}
46+
}
2747
}
2848
}
2949
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,19 @@
66

77
package kotlinx.rpc.grpc.internal
88

9-
import cnames.structs.grpc_call
109
import kotlinx.atomicfu.atomic
1110
import kotlinx.atomicfu.locks.SynchronizedObject
1211
import kotlinx.atomicfu.locks.synchronized
1312
import kotlinx.cinterop.*
14-
import kotlinx.coroutines.CompletableDeferred
1513
import libgrpcpp_c.*
1614
import platform.posix.memset
1715
import kotlin.experimental.ExperimentalNativeApi
1816
import kotlin.native.ref.createCleaner
1917

2018
internal sealed interface BatchResult {
21-
object Success : BatchResult
22-
object ResultError : BatchResult
2319
object CQShutdown : BatchResult
2420
data class CallError(val error: grpc_call_error) : BatchResult
21+
data class Called(val future: CallbackFuture<Boolean>) : BatchResult
2522
}
2623

2724
/**
@@ -45,7 +42,7 @@ internal class CompletionQueue {
4542
// internal as it must be accessible from the SHUTDOWN_CB,
4643
// but it shouldn't be used from outside this file.
4744
@Suppress("PropertyName")
48-
internal val _shutdownDone = CompletableDeferred<Unit>()
45+
internal val _shutdownDone = CallbackFuture<Unit>()
4946

5047
// used for spinning lock. false means not used (available)
5148
private val batchStartGuard = SynchronizedObject()
@@ -72,40 +69,41 @@ internal class CompletionQueue {
7269
require(kgrpc_iomgr_run_in_background()) { "The gRPC iomgr is not running background threads, required for callback based APIs." }
7370
}
7471

75-
// TODO: Remove this method
76-
fun runBatch(call: NativeClientCall<*, *>, ops: CPointer<grpc_op>, nOps: ULong) =
77-
runBatch(call.raw, ops, nOps)
78-
79-
fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong): CompletableDeferred<BatchResult> {
80-
val completion = CompletableDeferred<BatchResult>()
72+
fun runBatch(call: NativeClientCall<*, *>, ops: CPointer<grpc_op>, nOps: ULong): BatchResult {
73+
val completion = CallbackFuture<Boolean>()
8174
val tag = newCbTag(completion, OPS_COMPLETE_CB)
8275

8376
var err = grpc_call_error.GRPC_CALL_ERROR
77+
8478
synchronized(batchStartGuard) {
85-
// synchronizes access to grpc_call_start_batch
79+
if (_state.value == State.SHUTTING_DOWN && ops.pointed.op == GRPC_OP_RECV_STATUS_ON_CLIENT) {
80+
// if the queue is in the process of a SHUTDOWN,
81+
// new call status receive batches will be rejected.
82+
deleteCbTag(tag)
83+
return BatchResult.CQShutdown
84+
}
85+
8686
if (forceShutdown || _state.value == State.CLOSED) {
8787
// if the queue is either closed or in the process of a FORCE shutdown,
8888
// new batches will instantly fail.
8989
deleteCbTag(tag)
90-
completion.complete(BatchResult.CQShutdown)
91-
return completion
90+
return BatchResult.CQShutdown
9291
}
9392

94-
err = grpc_call_start_batch(call, ops, nOps, tag, null)
93+
err = grpc_call_start_batch(call.raw, ops, nOps, tag, null)
9594
}
9695

9796
if (err != grpc_call_error.GRPC_CALL_OK) {
9897
// if the call was not successful, the callback will not be invoked.
9998
deleteCbTag(tag)
100-
completion.complete(BatchResult.CallError(err))
101-
return completion
99+
return BatchResult.CallError(err)
102100
}
103101

104-
return completion
102+
return BatchResult.Called(completion)
105103
}
106104

107105
// must not be canceled as it cleans resources and sets the state to CLOSED
108-
fun shutdown(force: Boolean = false): CompletableDeferred<Unit> {
106+
fun shutdown(force: Boolean = false): CallbackFuture<Unit> {
109107
if (force) {
110108
forceShutdown = true
111109
}
@@ -130,10 +128,9 @@ internal class CompletionQueue {
130128
@CName("kq_ops_complete_cb")
131129
private fun opsCompleteCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
132130
val tag = functor!!.reinterpret<grpc_cb_tag>()
133-
val cont = tag.pointed.user_data!!.asStableRef<CompletableDeferred<BatchResult>>().get()
131+
val cont = tag.pointed.user_data!!.asStableRef<CallbackFuture<Boolean>>().get()
134132
deleteCbTag(tag)
135-
if (ok != 0) cont.complete(BatchResult.Success)
136-
else cont.complete(BatchResult.ResultError)
133+
cont.complete(ok != 0)
137134
}
138135

139136
@CName("kq_shutdown_cb")

0 commit comments

Comments
 (0)