Skip to content

Commit 5032675

Browse files
committed
Fixed cancellation tests
1 parent e713b37 commit 5032675

File tree

14 files changed

+292
-109
lines changed

14 files changed

+292
-109
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,27 @@
55
package kotlinx.rpc.krpc.client
66

77
import kotlinx.atomicfu.atomic
8-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CancellationException
9+
import kotlinx.coroutines.CompletableDeferred
10+
import kotlinx.coroutines.CoroutineName
11+
import kotlinx.coroutines.CoroutineScope
12+
import kotlinx.coroutines.DelicateCoroutinesApi
13+
import kotlinx.coroutines.GlobalScope
14+
import kotlinx.coroutines.InternalCoroutinesApi
15+
import kotlinx.coroutines.SupervisorJob
16+
import kotlinx.coroutines.cancel
17+
import kotlinx.coroutines.cancelAndJoin
918
import kotlinx.coroutines.channels.Channel
19+
import kotlinx.coroutines.coroutineScope
20+
import kotlinx.coroutines.currentCoroutineContext
21+
import kotlinx.coroutines.ensureActive
1022
import kotlinx.coroutines.flow.Flow
1123
import kotlinx.coroutines.flow.FlowCollector
1224
import kotlinx.coroutines.flow.first
1325
import kotlinx.coroutines.flow.flow
26+
import kotlinx.coroutines.job
27+
import kotlinx.coroutines.launch
28+
import kotlinx.coroutines.supervisorScope
1429
import kotlinx.coroutines.sync.Mutex
1530
import kotlinx.coroutines.sync.withLock
1631
import kotlinx.rpc.RpcCall
@@ -21,7 +36,8 @@ import kotlinx.rpc.descriptor.RpcInvokator
2136
import kotlinx.rpc.internal.utils.InternalRpcApi
2237
import kotlinx.rpc.internal.utils.getOrNull
2338
import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap
24-
import kotlinx.rpc.krpc.*
39+
import kotlinx.rpc.krpc.KrpcConfig
40+
import kotlinx.rpc.krpc.KrpcTransport
2541
import kotlinx.rpc.krpc.client.internal.ClientStreamContext
2642
import kotlinx.rpc.krpc.client.internal.ClientStreamSerializer
2743
import kotlinx.rpc.krpc.client.internal.KrpcClientConnector
@@ -35,7 +51,6 @@ import kotlinx.serialization.StringFormat
3551
import kotlinx.serialization.modules.SerializersModule
3652
import kotlin.collections.first
3753
import kotlin.concurrent.Volatile
38-
import kotlin.coroutines.cancellation.CancellationException
3954
import kotlin.properties.Delegates
4055

4156
/**
@@ -343,17 +358,8 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
343358
}
344359

345360
is KrpcCallMessage.CallException -> {
346-
val cause = runCatching {
347-
message.cause.deserialize()
348-
}
349-
350-
val result = if (cause.isFailure) {
351-
cause.exceptionOrNull()!!
352-
} else {
353-
cause.getOrNull()!!
354-
}
355-
356-
channel.close(result)
361+
val cause = message.cause.deserialize()
362+
channel.close(cause)
357363
}
358364

359365
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
@@ -458,6 +464,21 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
458464
serviceTypeString = serviceTypeString,
459465
)
460466
} catch (e: CancellationException) {
467+
currentCoroutineContext().ensureActive()
468+
469+
val wrapped = ManualCancellationException(e)
470+
val serializedReason = serializeException(wrapped)
471+
val message = KrpcCallMessage.StreamCancel(
472+
callId = outgoingStream.callId,
473+
serviceType = serviceTypeString,
474+
streamId = outgoingStream.streamId,
475+
cause = serializedReason,
476+
connectionId = outgoingStream.connectionId,
477+
serviceId = outgoingStream.serviceId,
478+
)
479+
sender.sendMessage(message)
480+
481+
// stop the flow and its coroutine, other flows are not affected
461482
throw e
462483
} catch (@Suppress("detekt.TooGenericExceptionCaught") cause: Throwable) {
463484
val serializedReason = serializeException(cause)

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.kt

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.krpc.internal
66

7+
import kotlinx.coroutines.CancellationException
78
import kotlinx.rpc.internal.rpcInternalTypeName
89
import kotlinx.rpc.internal.utils.InternalRpcApi
910

@@ -19,8 +20,46 @@ public fun serializeException(cause: Throwable): SerializedException {
1920

2021
internal expect fun Throwable.stackElements(): List<StackElement>
2122

23+
internal expect fun SerializedException.deserializeUnsafe(): Throwable
24+
25+
internal fun SerializedException.nonJvmManualCancellationExceptionDeserialize(): ManualCancellationException? {
26+
if (className == ManualCancellationException::class.rpcInternalTypeName) {
27+
val cancellation = cause?.deserializeUnsafe()
28+
?: error("ManualCancellationException must have a cause")
29+
30+
return ManualCancellationException(
31+
CancellationException(
32+
message = cancellation.message,
33+
cause = cancellation.cause,
34+
)
35+
)
36+
}
37+
38+
return null
39+
}
40+
41+
@InternalRpcApi
42+
public fun SerializedException.deserialize(): Throwable {
43+
val cause = runCatching {
44+
deserializeUnsafe()
45+
}
46+
47+
val result = if (cause.isFailure) {
48+
cause.exceptionOrNull()!!
49+
} else {
50+
val ex = cause.getOrNull()!!
51+
if (ex is ManualCancellationException) {
52+
ex.cause
53+
} else {
54+
ex
55+
}
56+
}
57+
58+
return result
59+
}
60+
2261
@InternalRpcApi
23-
public expect fun SerializedException.deserialize(): Throwable
62+
public class ManualCancellationException(override val cause: CancellationException): RuntimeException()
2463

2564
internal expect class DeserializedException(
2665
toStringMessage: String,

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,11 @@ internal class KrpcActingReceiveHandler(
182182

183183
override fun close(key: HandlerKey<*>, e: Throwable?) {
184184
if (e != null) {
185-
job.cancel(CancellationException(null, e))
185+
if (e is CancellationException) {
186+
job.cancel(e)
187+
} else {
188+
job.cancel(CancellationException(null, e))
189+
}
186190
} else {
187191
job.cancel()
188192
}
@@ -191,24 +195,14 @@ internal class KrpcActingReceiveHandler(
191195
}
192196

193197
private suspend fun tryHandle(message: KrpcMessage): HandlerResult {
194-
val result = runCatching {
198+
try {
195199
callHandler(message)
196-
}
197200

198-
return when {
199-
result.isFailure -> {
200-
val exception = result.exceptionOrNull()
201-
202-
if (exception is CancellationException) {
203-
throw exception
204-
}
205-
206-
HandlerResult.Failure(exception)
207-
}
208-
209-
else -> {
210-
HandlerResult.Success
211-
}
201+
return HandlerResult.Success
202+
} catch (e: CancellationException) {
203+
throw e
204+
} catch (e: Throwable) {
205+
return HandlerResult.Failure(e)
212206
}
213207
}
214208

krpc/krpc-core/src/jsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.js.kt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("detekt.MatchingDeclarationName")
66

77
package kotlinx.rpc.krpc.internal
88

9-
import kotlinx.rpc.internal.utils.InternalRpcApi
10-
119
internal actual class DeserializedException actual constructor(
1210
private val toStringMessage: String,
1311
actual override val message: String,
@@ -24,7 +22,7 @@ internal actual class DeserializedException actual constructor(
2422

2523
internal actual fun Throwable.stackElements(): List<StackElement> = emptyList()
2624

27-
@InternalRpcApi
28-
public actual fun SerializedException.deserialize(): Throwable {
29-
return DeserializedException(toStringMessage, message, stacktrace, cause, className)
25+
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
26+
return nonJvmManualCancellationExceptionDeserialize()
27+
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
3028
}

krpc/krpc-core/src/jvmMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.jvm.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.krpc.internal
66

7-
import kotlinx.rpc.internal.utils.InternalRpcApi
87
import java.lang.reflect.Constructor
98
import java.lang.reflect.Modifier
109

@@ -37,8 +36,7 @@ internal actual fun Throwable.stackElements(): List<StackElement> = stackTrace.m
3736
)
3837
}
3938

40-
@InternalRpcApi
41-
public actual fun SerializedException.deserialize(): Throwable {
39+
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
4240
try {
4341
val clazz = Class.forName(className)
4442
val fieldsCount = clazz.fieldsCountOrDefault(throwableFields)

krpc/krpc-core/src/nativeMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.native.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("detekt.MatchingDeclarationName")
@@ -21,6 +21,7 @@ internal actual class DeserializedException actual constructor(
2121

2222
internal actual fun Throwable.stackElements(): List<StackElement> = emptyList()
2323

24-
public actual fun SerializedException.deserialize(): Throwable {
25-
return DeserializedException(toStringMessage, message, stacktrace, cause, className)
24+
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
25+
return nonJvmManualCancellationExceptionDeserialize()
26+
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
2627
}

krpc/krpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasm.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("detekt.MatchingDeclarationName")
@@ -24,7 +24,7 @@ internal actual class DeserializedException actual constructor(
2424

2525
internal actual fun Throwable.stackElements(): List<StackElement> = emptyList()
2626

27-
@InternalRpcApi
28-
public actual fun SerializedException.deserialize(): Throwable {
29-
return DeserializedException(toStringMessage, message, stacktrace, cause, className)
27+
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
28+
return nonJvmManualCancellationExceptionDeserialize()
29+
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
3030
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public abstract class KrpcServer(
133133
}
134134
}
135135

136-
override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass<Service>) {
136+
final override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass<Service>) {
137137
connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName)
138138
rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)
139139
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerConnector.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import kotlinx.serialization.SerialFormat
1111

1212
internal class KrpcServerConnector private constructor(
1313
private val connector: KrpcConnector,
14-
): KrpcMessageSender by connector {
14+
) : KrpcMessageSender by connector {
1515
constructor(
1616
serialFormat: SerialFormat,
1717
transport: KrpcTransport,

0 commit comments

Comments
 (0)