Skip to content

Commit 751308d

Browse files
committed
grpc-native: Working rewrite state
Signed-off-by: Johannes Zottele <[email protected]>
1 parent f6dfbe0 commit 751308d

File tree

5 files changed

+473
-281
lines changed

5 files changed

+473
-281
lines changed

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ internal class NativeManagedChannel(
6161
dispatcher: CoroutineDispatcher = Dispatchers.Default,
6262
) : ManagedChannel, ManagedChannelPlatform() {
6363

64+
// A reference to make sure the grpc_init() was called. (it is released after shutdown)
65+
@Suppress("unused")
66+
private val rt = GrpcRuntime.acquire()
67+
6468
private val channelJob = SupervisorJob()
6569
private val callJobSupervisor = SupervisorJob(channelJob)
6670
private val channelScope = CoroutineScope(channelJob + dispatcher)
@@ -80,7 +84,7 @@ internal class NativeManagedChannel(
8084

8185
private var isShutdownInternal: Boolean = false
8286
override val isShutdown: Boolean = isShutdownInternal
83-
private var isTerminatedInternal = CompletableDeferred(Unit)
87+
private var isTerminatedInternal = CompletableDeferred<Unit>()
8488
override val isTerminated: Boolean
8589
get() = isTerminatedInternal.isCompleted
8690

@@ -92,43 +96,49 @@ internal class NativeManagedChannel(
9296
}
9397

9498
override fun shutdown(): ManagedChannel {
95-
channelScope.launch {
96-
shutdownInternal(false)
97-
}
99+
shutdownInternal(false)
98100
return this
99101
}
100102

101103
override fun shutdownNow(): ManagedChannel {
102-
channelScope.launch {
103-
shutdownInternal(true)
104-
}
104+
shutdownInternal(true)
105105
return this
106106
}
107107

108-
private suspend fun shutdownInternal(force: Boolean) {
108+
private fun shutdownInternal(force: Boolean) {
109109
isShutdownInternal = true
110+
if (isTerminatedInternal.isCompleted) {
111+
return
112+
}
110113
if (force) {
111114
callJobSupervisor.cancelChildren(CancellationException("Channel is shutting down"))
112115
}
113-
// prevent any start() calls on already created jobs
116+
// prevent any start() calls on already call jobs
117+
callJobSupervisor.children.forEach {
118+
(it as CompletableJob).complete()
119+
}
114120
callJobSupervisor.complete()
115-
cq.shutdown(force)
116-
// wait for child jobs to complete.
117-
// should be immediate, as the completion queue is shutdown.
118-
callJobSupervisor.join()
119-
isTerminatedInternal.complete(Unit)
121+
channelScope.launch {
122+
withContext(NonCancellable) {
123+
// wait for all child jobs to complete.
124+
callJobSupervisor.join()
125+
// wait for the completion queue to shut down.
126+
cq.shutdown(force).await()
127+
if (isTerminatedInternal.complete(Unit)) {
128+
// release the grpc runtime, so it might call grpc_shutdown()
129+
rt.close()
130+
}
131+
}
132+
}
120133
}
121134

122-
123135
override fun <RequestT, ResponseT> newCall(
124136
methodDescriptor: MethodDescriptor<RequestT, ResponseT>,
125137
callOptions: GrpcCallOptions,
126138
): ClientCall<RequestT, ResponseT> {
127139
check(!isShutdown) { "Channel is shutdown" }
128140

129-
val parent = channelScope.coroutineContext[Job]!!
130-
val callJob = Job(parent)
131-
val callScope = CoroutineScope(callJob)
141+
val callJob = Job(callJobSupervisor)
132142

133143
val methodNameSlice = methodDescriptor.getFullMethodName().toGrpcSlice()
134144
val rawCall = grpc_channel_create_call(
@@ -137,7 +147,7 @@ internal class NativeManagedChannel(
137147
) ?: error("Failed to create call")
138148

139149
return NativeClientCall(
140-
cq, rawCall, methodDescriptor, callScope
150+
cq, rawCall, methodDescriptor, callJob
141151
)
142152
}
143153

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt

Lines changed: 45 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@ package kotlinx.rpc.grpc.internal
88

99
import cnames.structs.grpc_call
1010
import kotlinx.atomicfu.atomic
11+
import kotlinx.atomicfu.locks.SynchronizedObject
12+
import kotlinx.atomicfu.locks.synchronized
1113
import kotlinx.cinterop.*
12-
import kotlinx.coroutines.NonCancellable
13-
import kotlinx.coroutines.suspendCancellableCoroutine
14-
import kotlinx.coroutines.withContext
14+
import kotlinx.coroutines.CompletableDeferred
1515
import libgrpcpp_c.*
1616
import platform.posix.memset
17-
import kotlin.coroutines.Continuation
18-
import kotlin.coroutines.resume
1917
import kotlin.experimental.ExperimentalNativeApi
2018
import kotlin.native.ref.createCleaner
2119

@@ -33,20 +31,24 @@ internal sealed interface BatchResult {
3331
*/
3432
internal class CompletionQueue {
3533

36-
private enum class State { OPEN, SHUTTING_DOWN, CLOSED }
34+
internal enum class State { OPEN, SHUTTING_DOWN, CLOSED }
3735

3836
// if the queue was called with forceShutdown = true,
3937
// it will reject all new batches and wait for all current ones to finish.
4038
private var forceShutdown = false
41-
private val state = atomic(State.OPEN)
4239

4340
// internal as it must be accessible from the SHUTDOWN_CB,
4441
// but it shouldn't be used from outside this file.
4542
@Suppress("PropertyName")
46-
internal val _shutdownDone = kotlinx.coroutines.CompletableDeferred<Unit>()
43+
internal val _state = atomic(State.OPEN)
44+
45+
// internal as it must be accessible from the SHUTDOWN_CB,
46+
// but it shouldn't be used from outside this file.
47+
@Suppress("PropertyName")
48+
internal val _shutdownDone = CompletableDeferred<Unit>()
4749

4850
// used for spinning lock. false means not used (available)
49-
private val batchStartGuard = atomic(false)
51+
private val batchStartGuard = SynchronizedObject()
5052

5153
private val thisStableRef = StableRef.create(this)
5254

@@ -71,105 +73,75 @@ internal class CompletionQueue {
7173
}
7274

7375
// TODO: Remove this method
74-
suspend fun runBatch(call: NativeClientCall<*, *>, ops: CPointer<grpc_op>, nOps: ULong) =
76+
fun runBatch(call: NativeClientCall<*, *>, ops: CPointer<grpc_op>, nOps: ULong) =
7577
runBatch(call.raw, ops, nOps)
7678

77-
suspend fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong): BatchResult =
78-
suspendCancellableCoroutine<BatchResult> { cont ->
79-
val tag = newCbTag(cont, OPS_COMPLETE_CB)
79+
fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong): CompletableDeferred<BatchResult> {
80+
val completion = CompletableDeferred<BatchResult>()
81+
val tag = newCbTag(completion, OPS_COMPLETE_CB)
8082

81-
var err = grpc_call_error.GRPC_CALL_ERROR
83+
var err = grpc_call_error.GRPC_CALL_ERROR
84+
synchronized(batchStartGuard) {
8285
// synchronizes access to grpc_call_start_batch
83-
withBatchStartLock {
84-
if (forceShutdown || state.value == State.CLOSED) {
85-
// if the queue is either closed or in the process of a FORCE shutdown,
86-
// new batches will instantly fail.
87-
deleteCbTag(tag)
88-
cont.resume(BatchResult.CQShutdown)
89-
return@suspendCancellableCoroutine
90-
}
91-
92-
err = grpc_call_start_batch(call, ops, nOps, tag, null)
93-
}
94-
95-
if (err != grpc_call_error.GRPC_CALL_OK) {
96-
// if the call was not successful, the callback will not be invoked.
86+
if (forceShutdown || _state.value == State.CLOSED) {
87+
// if the queue is either closed or in the process of a FORCE shutdown,
88+
// new batches will instantly fail.
9789
deleteCbTag(tag)
98-
cont.resume(BatchResult.CallError(grpc_call_error.GRPC_CALL_ERROR))
99-
return@suspendCancellableCoroutine
90+
completion.complete(BatchResult.CQShutdown)
91+
return completion
10092
}
10193

94+
err = grpc_call_start_batch(call, ops, nOps, tag, null)
95+
}
10296

103-
cont.invokeOnCancellation {
104-
@Suppress("UnusedExpression")
105-
// keep reference, otherwise the cleaners might get invoked before the batch finishes
106-
this
107-
// cancel the call if one of its batches is canceled.
108-
// grpc_call_cancel is thread-safe and can be called several times.
109-
// the callback is invoked anyway, so the tag doesn't get deleted here.
110-
if (it != null) {
111-
grpc_call_cancel_with_status(
112-
call,
113-
grpc_status_code.GRPC_STATUS_CANCELLED,
114-
"Call got cancelled: ${it.message}",
115-
null
116-
)
117-
} else {
118-
grpc_call_cancel(call, null)
119-
}
120-
}
97+
if (err != grpc_call_error.GRPC_CALL_OK) {
98+
// if the call was not successful, the callback will not be invoked.
99+
deleteCbTag(tag)
100+
completion.complete(BatchResult.CallError(err))
101+
return completion
121102
}
122103

104+
return completion
105+
}
106+
123107
// must not be canceled as it cleans resources and sets the state to CLOSED
124-
suspend fun shutdown(force: Boolean = false) = withContext(NonCancellable) {
108+
fun shutdown(force: Boolean = false): CompletableDeferred<Unit> {
125109
if (force) {
126110
forceShutdown = true
127111
}
128-
if (!state.compareAndSet(State.OPEN, State.SHUTTING_DOWN)) {
112+
if (!_state.compareAndSet(State.OPEN, State.SHUTTING_DOWN)) {
129113
// the first call to shutdown() makes transition and to SHUTTING_DOWN and
130114
// initiates shut down. all other invocations await the shutdown.
131-
_shutdownDone.await()
132-
return@withContext
115+
return _shutdownDone
133116
}
134117

135118
// wait until all batch operations since the state transitions were started.
136-
// this is required to prevent batches from starting after shutdown was initialized
137-
withBatchStartLock { }
138-
139-
grpc_completion_queue_shutdown(raw)
140-
_shutdownDone.await()
141-
state.value = State.CLOSED
142-
}
143-
144-
private inline fun withBatchStartLock(block: () -> Unit) {
145-
try {
146-
// spin until this thread occupies the guard
147-
@Suppress("ControlFlowWithEmptyBody")
148-
while (!batchStartGuard.compareAndSet(expect = false, update = true)) {
149-
}
150-
block()
151-
} finally {
152-
// set guard to "not occupied"
153-
batchStartGuard.value = false
119+
// this is required to prevent batches from starting after shutdown was initialized.
120+
// however, this lock will be available very fast, so it shouldn't be a problem.'
121+
synchronized(batchStartGuard) {
122+
grpc_completion_queue_shutdown(raw)
154123
}
124+
125+
return _shutdownDone
155126
}
156127
}
157128

158129
// kq stands for kompletion_queue lol
159130
@CName("kq_ops_complete_cb")
160131
private fun opsCompleteCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
161132
val tag = functor!!.reinterpret<grpc_cb_tag>()
162-
val cont = tag.pointed.user_data!!.asStableRef<Continuation<BatchResult>>().get()
133+
val cont = tag.pointed.user_data!!.asStableRef<CompletableDeferred<BatchResult>>().get()
163134
deleteCbTag(tag)
164-
if (ok != 0) cont.resume(BatchResult.Success)
165-
else cont.resume(BatchResult.ResultError)
135+
if (ok != 0) cont.complete(BatchResult.Success)
136+
else cont.complete(BatchResult.ResultError)
166137
}
167138

168139
@CName("kq_shutdown_cb")
169140
private fun shutdownCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
170141
val tag = functor!!.reinterpret<grpc_cb_tag>()
171142
val cq = tag.pointed.user_data!!.asStableRef<CompletionQueue>().get()
172143
cq._shutdownDone.complete(Unit)
144+
cq._state.value = CompletionQueue.State.CLOSED
173145
grpc_completion_queue_destroy(cq.raw)
174146
}
175147

0 commit comments

Comments
 (0)