diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt index ea0d015ec..64f6ee4aa 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.kt @@ -40,7 +40,7 @@ public interface ManagedChannel { * * @return whether the channel is terminated, as would be done by [isTerminated]. */ - public suspend fun awaitTermination(duration: Duration): Boolean + public suspend fun awaitTermination(duration: Duration = Duration.INFINITE): Boolean /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately canceled. diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt index 20317e199..52fc05c80 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendClientCalls.kt @@ -159,7 +159,7 @@ private fun rpcImpl( * there is room in the buffer. */ val responses = Channel(1) - val ready = Ready() + val ready = Ready { handler.isReady() } handler.start(channelResponseListener(responses, ready), headers) @@ -200,7 +200,7 @@ private fun rpcImpl( private fun channelResponseListener( responses: Channel, ready: Ready, -) = clientCallListener( +) = clientCallListener( onHeaders = { // todo check what happens here }, @@ -226,7 +226,7 @@ private fun channelResponseListener( // todo really needed? internal fun Flow.singleOrStatusFlow( expected: String, - descriptor: Any + descriptor: Any, ): Flow = flow { var found = false collect { @@ -252,16 +252,22 @@ internal suspend fun Flow.singleOrStatus( descriptor: Any ): T = singleOrStatusFlow(expected, descriptor).single() -internal class Ready { +internal class Ready(private val isReallyReady: () -> Boolean) { // A CONFLATED channel never suspends to send, and two notifications of readiness are equivalent // to one private val channel = Channel(Channel.CONFLATED) fun onReady() { - channel.trySend(Unit) + channel.trySend(Unit).onFailure { e -> + throw e ?: AssertionError( + "Should be impossible; a CONFLATED channel should never return false on offer" + ) + } } suspend fun suspendUntilReady() { - channel.receive() + while (!isReallyReady()) { + channel.receive() + } } } diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt index 65ed6549a..2f2af9ddd 100644 --- a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/internal/suspendServerCalls.kt @@ -72,8 +72,10 @@ public fun CoroutineScope.serverStreamingServerMethodDefinit flow { requests .singleOrStatusFlow("request", descriptor) - .collect { req -> - implementation(req).collect { resp -> emit(resp) } + .collect { request -> + implementation(request).collect { response -> + emit(response) + } } } } @@ -108,7 +110,7 @@ private fun CoroutineScope.serverCallListenerImpl( handler: ServerCall, implementation: (Flow) -> Flow, ): ServerCall.Listener { - val readiness = Ready() + val ready = Ready { handler.isReady()} val requestsChannel = Channel(1) val requestsStarted = AtomicBoolean(false) // enforces read-once @@ -118,8 +120,8 @@ private fun CoroutineScope.serverCallListenerImpl( "requests flow can only be collected once" } - handler.request(1) try { + handler.request(1) for (request in requestsChannel) { emit(request) handler.request(1) @@ -144,8 +146,10 @@ private fun CoroutineScope.serverCallListenerImpl( handler.sendHeaders(GrpcTrailers()) } } - readiness.suspendUntilReady() - mutex.withLock { handler.sendMessage(it) } + ready.suspendUntilReady() + mutex.withLock { + handler.sendMessage(it) + } } }.exceptionOrNull() // check headers again once we're done collecting the response flow - if we received @@ -180,7 +184,9 @@ private fun CoroutineScope.serverCallListenerImpl( } } ?: GrpcTrailers() - mutex.withLock { handler.close(closeStatus, trailers) } + mutex.withLock { + handler.close(closeStatus, trailers) + } } return serverCallListener( @@ -209,7 +215,7 @@ private fun CoroutineScope.serverCallListenerImpl( requestsChannel.close() }, onReady = { - readiness.onReady() + ready.onReady() }, onComplete = {} ) diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/RawClientServerTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/RawClientServerTest.kt index 5f1008b86..fed9d92a9 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/RawClientServerTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/RawClientServerTest.kt @@ -5,6 +5,8 @@ package kotlinx.rpc.grpc import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList @@ -98,6 +100,9 @@ class RawClientServerTest { methodDefinition: CoroutineScope.(MethodDescriptor) -> ServerMethodDefinition, block: suspend (GrpcChannel, MethodDescriptor) -> Unit, ) = kotlinx.coroutines.test.runTest { + val serverJob = Job() + val serverScope = CoroutineScope(serverJob) + val clientChannel = ManagedChannelBuilder("localhost", PORT).apply { usePlaintext() }.buildChannel() @@ -122,13 +127,19 @@ class RawClientServerTest { methods = methods, schemaDescriptor = Unit, ), - methods = methods.map { methodDefinition(it) }, + methods = methods.map { serverScope.methodDefinition(it) }, ) ) val server = Server(builder) server.start() block(clientChannel.platformApi, descriptor) + + serverJob.cancelAndJoin() + clientChannel.shutdown() + clientChannel.awaitTermination() + server.shutdown() + server.awaitTermination() } companion object {