44
55package kotlinx.rpc.grpc.internal
66
7+ import kotlinx.atomicfu.atomic
78import kotlinx.coroutines.CancellationException
89import kotlinx.coroutines.CoroutineScope
910import kotlinx.coroutines.cancel
@@ -21,8 +22,6 @@ import kotlinx.rpc.grpc.StatusCode
2122import kotlinx.rpc.grpc.StatusException
2223import kotlinx.rpc.grpc.StatusRuntimeException
2324import kotlinx.rpc.internal.utils.InternalRpcApi
24- import kotlin.concurrent.atomics.AtomicBoolean
25- import kotlin.concurrent.atomics.ExperimentalAtomicApi
2625
2726@InternalRpcApi
2827public fun <Request , Response > CoroutineScope.unaryServerMethodDefinition (
@@ -105,7 +104,6 @@ private fun <Request, Response> CoroutineScope.serverCallHandler(
105104 serverCallListenerImpl(call, implementation)
106105 }
107106
108- @OptIn(ExperimentalAtomicApi ::class )
109107private fun <Request , Response > CoroutineScope.serverCallListenerImpl (
110108 handler : ServerCall <Request , Response >,
111109 implementation : (Flow <Request >) -> Flow <Response >,
@@ -116,7 +114,7 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
116114 val requestsStarted = AtomicBoolean (false ) // enforces read-once
117115
118116 val requests = flow {
119- check(requestsStarted.compareAndSet(expectedValue = false , newValue = true )) {
117+ check(requestsStarted.value. compareAndSet(expect = false , update = true )) {
120118 " requests flow can only be collected once"
121119 }
122120
@@ -141,7 +139,7 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
141139 val failure = runCatching {
142140 implementation(requests).collect {
143141 // once we have a response message, check if we've sent headers yet - if not, do so
144- if (headersSent.compareAndSet(expectedValue = false , newValue = true )) {
142+ if (headersSent.value. compareAndSet(expect = false , update = true )) {
145143 mutex.withLock {
146144 handler.sendHeaders(GrpcTrailers ())
147145 }
@@ -152,7 +150,7 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
152150 }.exceptionOrNull()
153151 // check headers again once we're done collecting the response flow - if we received
154152 // no elements or threw an exception, then we wouldn't have sent them
155- if (failure == null && headersSent.compareAndSet(expectedValue = false , newValue = true )) {
153+ if (failure == null && headersSent.value. compareAndSet(expect = false , update = true )) {
156154 mutex.withLock {
157155 handler.sendHeaders(GrpcTrailers ())
158156 }
@@ -217,6 +215,10 @@ private fun <Request, Response> CoroutineScope.serverCallListenerImpl(
217215 )
218216}
219217
218+ private class AtomicBoolean (initialValue : Boolean ) {
219+ val value = atomic(initialValue)
220+ }
221+
220222private class ServerCallListenerState {
221223 var isReceiving = true
222224}
0 commit comments