Skip to content

Commit c9c0e1c

Browse files
committed
grpc-native: Address PR comments
Signed-off-by: Johannes Zottele <[email protected]>
1 parent 81f26ea commit c9c0e1c

File tree

4 files changed

+31
-29
lines changed

4 files changed

+31
-29
lines changed

cinterop-c/include/kgrpc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ bool kgrpc_iomgr_run_in_background();
5757
*
5858
* Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable
5959
* callback-driven method dispatch via the Core C API.
60+
* If the C++ API is exposed to the C API, this can be removed (https://github.com/grpc/grpc/issues/40632).
6061
*
6162
* When the gRPC Core needs to accept a new call for the specified method, it invokes:
6263
* kgrpc_registered_call_allocation alloc = allocator();

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc.grpc.test
66
import kotlinx.coroutines.CompletableDeferred
77
import kotlinx.coroutines.delay
88
import kotlinx.coroutines.runBlocking
9+
import kotlinx.coroutines.test.runTest
910
import kotlinx.coroutines.withTimeout
1011
import kotlinx.rpc.grpc.GrpcServer
1112
import kotlinx.rpc.grpc.GrpcTrailers
@@ -270,21 +271,20 @@ class GreeterServiceImpl : GreeterService {
270271
* Run this on JVM before executing tests.
271272
*/
272273
@Test
273-
fun runServer() {
274-
runBlocking {
275-
val server = GrpcServer(
276-
port = PORT,
277-
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
278-
)
274+
fun runServer() = runTest {
275+
val server = GrpcServer(
276+
port = PORT,
277+
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
278+
)
279+
280+
try {
281+
server.start()
282+
println("Server started")
283+
server.awaitTermination()
284+
} finally {
285+
server.shutdown()
286+
server.awaitTermination()
279287

280-
try {
281-
server.start()
282-
println("Server started")
283-
server.awaitTermination()
284-
} finally {
285-
server.shutdown()
286-
server.awaitTermination()
287-
}
288288
}
289289
}
290290

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServer.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ internal class NativeServer(
7070

7171
private val cq = CompletionQueue()
7272

73-
val raw: CPointer<grpc_server> = grpc_server_create(null, null)!!
73+
val raw: CPointer<grpc_server> = grpc_server_create(null, null)
74+
?: error("Failed to create server")
7475

7576
// holds all stable references to MethodAllocationCtx objects.
7677
// the stable references must eventually be disposed.

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/internal/NativeServerCall.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ internal class NativeServerCall<Request, Response>(
5959
grpc_call_unref(it)
6060
}
6161

62-
private var listener = DeferredCallListener<Request>()
62+
private val listener = DeferredCallListener<Request>()
6363
private var methodDescriptor: MethodDescriptor<Request, Response>? = null
64-
private var callbackMutex = ReentrantLock()
64+
private val callbackMutex = ReentrantLock()
6565
private var initialized = false
6666
private var cancelled = false
67-
private var finalized = atomic(false)
67+
private val finalized = atomic(false)
6868

6969
// Tracks whether at least one request message has been received on this call.
7070
private var receivedFirstMessage = false
@@ -190,7 +190,7 @@ internal class NativeServerCall<Request, Response>(
190190

191191
override fun request(numMessages: Int) {
192192
check(initialized) { internalError("Call not initialized") }
193-
// TODO: Remove the num constraint
193+
// TODO: Remove the num constraint (KRPC-213)
194194
require(numMessages == 1) { internalError("numMessages must be 1") }
195195
val methodDescriptor = checkNotNull(methodDescriptor) { internalError("Method descriptor not set") }
196196

@@ -320,34 +320,34 @@ private class DeferredCallListener<T> : ServerCall.Listener<T>() {
320320
@Volatile
321321
private var delegate: ServerCall.Listener<T>? = null
322322
private val mutex = ReentrantLock()
323-
private val q = ArrayDeque<(ServerCall.Listener<T>) -> Unit>()
323+
private val queue = ArrayDeque<(ServerCall.Listener<T>) -> Unit>()
324324

325325
fun setDelegate(d: ServerCall.Listener<T>) {
326326
mutex.withLock {
327327
if (delegate != null) return
328328
delegate = d
329329
}
330330
// drain the queue
331-
q.forEach { it(d) }
332-
q.clear()
331+
queue.forEach { it(d) }
332+
queue.clear()
333333
}
334334

335-
private inline fun deliver(crossinline f: (ServerCall.Listener<T>) -> Unit) {
336-
val d = delegate
337-
if (d != null) {
335+
private inline fun deliver(crossinline invokeListener: (ServerCall.Listener<T>) -> Unit) {
336+
val currentDelegate = delegate
337+
if (currentDelegate != null) {
338338
// fast path (delegate is already set)
339-
f(d); return
339+
invokeListener(currentDelegate); return
340340
}
341341
// slow path: re-check under lock
342-
val dd = mutex.withLock {
342+
val safeCurrentDelegate = mutex.withLock {
343343
val cur = delegate
344344
if (cur == null) {
345-
q.addLast { f(it) }
345+
queue.addLast { invokeListener(it) }
346346
null
347347
} else cur
348348
}
349349
// if the delegate was already set, call it
350-
if (dd != null) f(dd)
350+
if (safeCurrentDelegate != null) invokeListener(safeCurrentDelegate)
351351
}
352352

353353
override fun onMessage(message: T) = deliver { it.onMessage(message) }

0 commit comments

Comments
 (0)