Skip to content

Commit f86edae

Browse files
committed
Fixed CancellationTest
1 parent 78fe17d commit f86edae

File tree

4 files changed

+13
-18
lines changed

4 files changed

+13
-18
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.CancellationException
99
import kotlinx.coroutines.CompletableDeferred
1010
import kotlinx.coroutines.CoroutineName
1111
import kotlinx.coroutines.CoroutineScope
12-
import kotlinx.coroutines.InternalCoroutinesApi
1312
import kotlinx.coroutines.SupervisorJob
1413
import kotlinx.coroutines.cancel
1514
import kotlinx.coroutines.cancelAndJoin
@@ -77,7 +76,6 @@ public abstract class InitializedKrpcClient(
7776
* serializing data, tracking streams, processing exceptions, and other protocol responsibilities.
7877
* Leaves out the delivery of encoded messages to the specific implementations with [KrpcTransport].
7978
*/
80-
@OptIn(InternalCoroutinesApi::class)
8179
public abstract class KrpcClient : RpcClient, KrpcEndpoint {
8280
/**
8381
* Called once to provide [KrpcTransport] for this client.
@@ -152,25 +150,22 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
152150

153151
val context = SupervisorJob(transport.coroutineContext.job)
154152

155-
context.job.invokeOnCompletion(onCancelling = true) {
156-
if (clientCancelled) {
157-
return@invokeOnCompletion
158-
}
159-
160-
clientCancelled = true
161-
153+
context.job.invokeOnCompletion {
162154
try {
163-
if (!clientCancelledByServer) {
155+
if (!clientCancelled && !clientCancelledByServer) {
164156
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
165157
}
166158

167-
requestChannels.values.forEach {
168-
it.close(CancellationException("Client cancelled"))
169-
it.cancel()
170-
}
159+
clientCancelled = true
171160
} catch (_ : Exception) {
172161
// ignore, we are already cancelled
173162
} finally {
163+
requestChannels.values.forEach {
164+
val cause = CancellationException("Client cancelled")
165+
it.close(cause)
166+
it.cancel(cause)
167+
}
168+
174169
requestChannels.clear()
175170
}
176171
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import kotlin.reflect.KClass
3131
* @param transport [KrpcTransport] instance that will be used to send and receive RPC messages.
3232
* IMPORTANT: Must be exclusive to this server, otherwise unexpected behavior may occur.
3333
*/
34-
@OptIn(InternalCoroutinesApi::class)
3534
public abstract class KrpcServer(
3635
private val config: KrpcConfig.Server,
3736
transport: KrpcTransport,
@@ -87,7 +86,7 @@ public abstract class KrpcServer(
8786
private var cancelledByClient = false
8887

8988
init {
90-
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
89+
internalScope.coroutineContext.job.invokeOnCompletion {
9190
if (!cancelledByClient) {
9291
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
9392
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class CancellationServiceImpl : CancellationService {
112112

113113
override suspend fun outgoingStreamWithDelayedResponse(stream: Flow<Int>) {
114114
try {
115+
waitCounter.increment()
115116
consume(stream)
116117

117118
fence.await()

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class CancellationTest {
171171
}
172172

173173
val clientFlowJob = launch {
174-
service.outgoingStream(flow {
174+
service.outgoingStreamWithDelayedResponse(flow {
175175
emit(0)
176176
println("[testCancelClient] emit 0")
177177
serverInstance().fence.await()
@@ -241,7 +241,7 @@ class CancellationTest {
241241
}
242242

243243
val clientFlowJob = launch {
244-
service.outgoingStream(flow {
244+
service.outgoingStreamWithDelayedResponse(flow {
245245
emit(0)
246246
println("[testCancelServer] emit 0")
247247
serverInstance().fence.await()

0 commit comments

Comments
 (0)