Skip to content

Commit 265c57b

Browse files
committed
grpc: Fix race condition bug
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 6327c89 commit 265c57b

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/CallbackFuture.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,6 @@ internal class CallbackFuture<T : Any> {
4646
}
4747
}
4848
}
49+
50+
val isCompleted: Boolean get() = state.value is State.Done
4951
}

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/CompletionQueue.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ internal class CompletionQueue {
119119
* See [BatchResult] for possible outcomes.
120120
*/
121121
fun runBatch(call: CPointer<grpc_call>, ops: CPointer<grpc_op>, nOps: ULong): BatchResult {
122+
if (_shutdownDone.isCompleted) return BatchResult.CQShutdown
123+
122124
val completion = CallbackFuture<Boolean>()
123125
val tag = newCbTag(completion, OPS_COMPLETE_CB)
124126

@@ -194,8 +196,8 @@ private fun opsCompleteCb(functor: CPointer<grpc_completion_queue_functor>?, ok:
194196
private fun shutdownCb(functor: CPointer<grpc_completion_queue_functor>?, ok: Int) {
195197
val tag = functor!!.reinterpret<kgrpc_cb_tag>()
196198
val cq = tag.pointed.user_data!!.asStableRef<CompletionQueue>().get()
197-
cq._shutdownDone.complete(Unit)
198199
cq._state.value = CompletionQueue.State.CLOSED
200+
cq._shutdownDone.complete(Unit)
199201
grpc_completion_queue_destroy(cq.raw)
200202
}
201203

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ internal class NativeServerCall<Request, Response>(
6767
private val callbackMutex = ReentrantLock()
6868
private var initialized = false
6969
private var cancelled = false
70+
private var closed = false
7071
private val finalized = atomic(false)
7172

7273
// tracks whether the initial metadata has been sent.
@@ -143,6 +144,7 @@ internal class NativeServerCall<Request, Response>(
143144
}
144145

145146
fun cancel(status: grpc_status_code, message: String) {
147+
cancelled = true
146148
grpc_call_cancel_with_status(raw, status, message, null)
147149
}
148150

@@ -168,6 +170,9 @@ internal class NativeServerCall<Request, Response>(
168170
cleanup: () -> Unit = {},
169171
onSuccess: () -> Unit = {},
170172
) {
173+
// if we are already closed, we cannot run any more batches.
174+
if (closed || cancelled) return cleanup()
175+
171176
when (val result = cq.runBatch(raw, ops, nOps)) {
172177
is BatchResult.Submitted -> {
173178
result.future.onComplete {
@@ -286,6 +291,8 @@ internal class NativeServerCall<Request, Response>(
286291

287292
override fun close(status: Status, trailers: GrpcMetadata) {
288293
check(initialized) { internalError("Call not initialized") }
294+
closed = true
295+
289296
val arena = Arena()
290297

291298
val details = status.getDescription()?.toGrpcSlice()
@@ -327,7 +334,7 @@ internal class NativeServerCall<Request, Response>(
327334
}
328335

329336

330-
private fun <T> tryRun(block: () -> T): T {
337+
private inline fun <T> tryRun(crossinline block: () -> T): T {
331338
try {
332339
return block()
333340
} catch (e: Throwable) {

0 commit comments

Comments
 (0)