Skip to content

Commit d0ac56e

Browse files
authored
Fix call cancellation (#141)
1 parent 1110043 commit d0ac56e

File tree

12 files changed

+117
-66
lines changed

12 files changed

+117
-66
lines changed

core/api/core.api

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,14 @@ public abstract interface class kotlinx/rpc/RPCServer : kotlinx/coroutines/Corou
121121

122122
public abstract interface class kotlinx/rpc/RPCTransport : kotlinx/coroutines/CoroutineScope {
123123
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
124+
public abstract fun receiveCatching-IoAF18A (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
124125
public abstract fun send (Lkotlinx/rpc/RPCTransportMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
125126
}
126127

128+
public final class kotlinx/rpc/RPCTransport$DefaultImpls {
129+
public static fun receiveCatching-IoAF18A (Lkotlinx/rpc/RPCTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
130+
}
131+
127132
public abstract interface class kotlinx/rpc/RPCTransportMessage {
128133
}
129134

core/src/commonMain/kotlin/kotlinx/rpc/RPCTransport.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,13 @@ public interface RPCTransport : CoroutineScope {
4343
* @return received RPC message.
4444
*/
4545
public suspend fun receive(): RPCTransportMessage
46+
47+
/**
48+
* Suspends until next RPC message from a peer endpoint is received and then returns it.
49+
*
50+
* @return received RPC message as a [Result].
51+
*/
52+
public suspend fun receiveCatching(): Result<RPCTransportMessage> {
53+
return runCatching { receive() }
54+
}
4655
}

core/src/commonMain/kotlin/kotlinx/rpc/internal/scopedClientCall.kt

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,17 @@
44

55
package kotlinx.rpc.internal
66

7-
import kotlinx.coroutines.*
7+
import kotlinx.coroutines.CoroutineScope
88

99
/**
10-
* Bounds coroutine scopes of the request and provided RPC service.
10+
* Scopes client RPC call from a service with [serviceScope].
1111
*
1212
* Used by code generators.
1313
*/
1414
@InternalRPCApi
15-
@OptIn(InternalCoroutinesApi::class)
1615
@Suppress("unused")
1716
public suspend inline fun <T> scopedClientCall(serviceScope: CoroutineScope, crossinline body: suspend () -> T): T {
18-
val requestJob = currentCoroutineContext().job
19-
val handle = serviceScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
20-
requestJob.cancel(it as CancellationException)
21-
}
22-
23-
try {
24-
return serviceScoped(serviceScope) {
25-
body()
26-
}
27-
} finally {
28-
handle.dispose()
17+
return serviceScoped(serviceScope) {
18+
body()
2919
}
3020
}

core/src/commonMain/kotlin/kotlinx/rpc/internal/transport/RPCConnector.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class RPCConnector<SubscriptionKey>(
9797
init {
9898
launch {
9999
while (true) {
100-
processMessage(transport.receive())
100+
processMessage(transport.receiveCatching().getOrNull() ?: break)
101101
}
102102
}
103103
}

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ ktor-websockets = { module = "io.ktor:ktor-websockets", version.ref = "ktor" }
5151
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
5252
ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" }
5353
ktor-server-websockets = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" }
54+
ktor-server-test-host = { module = "io.ktor:ktor-server-test-host", version.ref = "ktor" }
5455
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
5556
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
5657
ktor-client-websockets = { module = "io.ktor:ktor-client-websockets", version.ref = "ktor" }
@@ -64,6 +65,7 @@ kotlin-logging = { module = "io.github.oshai:kotlin-logging", version.ref = "kot
6465
kotlin-logging-legacy = { module = "io.github.microutils:kotlin-logging", version.ref = "kotlin-logging" }
6566
coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }
6667
coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutines" }
68+
coroutines-debug = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-debug", version.ref = "coroutines" }
6769
detekt-gradle-plugin = { module = "io.gitlab.arturbosch.detekt:detekt-gradle-plugin", version.ref = "detekt-gradle-plugin" }
6870
kover-gradle-plugin = { module = "org.jetbrains.kotlinx:kover-gradle-plugin", version.ref = "kover" }
6971

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/client/KRPCClient.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.serialization.BinaryFormat
2121
import kotlinx.serialization.SerialFormat
2222
import kotlinx.serialization.StringFormat
2323
import kotlin.coroutines.CoroutineContext
24+
import kotlin.coroutines.cancellation.CancellationException
2425
import kotlin.reflect.typeOf
2526

2627
/**
@@ -204,6 +205,15 @@ public abstract class KRPCClient(
204205
handleOutgoingStreams(it, rpcCall.serialFormat, call.serviceTypeString)
205206
}
206207

208+
val handle = serviceScopeOrNull()?.run {
209+
serviceCoroutineScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) { cause ->
210+
// service can only be canceled, it cannot complete successfully
211+
callResult.completeExceptionally(CancellationException(cause))
212+
213+
rpcCall.streamContext.valueOrNull?.cancel("Service cancelled", cause)
214+
}
215+
}
216+
207217
callResult.invokeOnCompletion { cause ->
208218
if (cause != null) {
209219
cancellingRequests[rpcCall.callId] = call.serviceTypeString
@@ -213,14 +223,20 @@ public abstract class KRPCClient(
213223
if (!wrappedCallResult.callExceptionOccurred) {
214224
sendCancellation(CancellationType.REQUEST, call.serviceId.toString(), rpcCall.callId)
215225
}
226+
227+
handle?.dispose()
216228
} else {
217229
val streamScope = rpcCall.streamContext.valueOrNull?.streamScope
218230

219231
if (streamScope == null) {
232+
handle?.dispose()
233+
220234
connector.unsubscribeFromMessages(call.serviceTypeString, rpcCall.callId)
221235
}
222236

223237
streamScope?.onScopeCompletion(rpcCall.callId) {
238+
handle?.dispose()
239+
224240
cancellingRequests[rpcCall.callId] = call.serviceTypeString
225241

226242
sendCancellation(CancellationType.REQUEST, call.serviceId.toString(), rpcCall.callId)

krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ kotlin {
2929

3030
implementation(libs.kotlin.test)
3131
implementation(libs.ktor.server.netty)
32-
implementation(libs.ktor.client.cio)
32+
implementation(libs.ktor.server.test.host)
3333
}
3434
}
3535
}

krpc/krpc-ktor/krpc-ktor-core/src/commonMain/kotlin/kotlinx/rpc/transport/ktor/KtorTransport.kt

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,15 @@
55
package kotlinx.rpc.transport.ktor
66

77
import io.ktor.websocket.*
8-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.rpc.RPCTransport
1010
import kotlinx.rpc.RPCTransportMessage
1111
import kotlinx.rpc.internal.InternalRPCApi
12-
import kotlin.coroutines.CoroutineContext
1312

1413
@InternalRPCApi
15-
@OptIn(InternalCoroutinesApi::class, DelicateCoroutinesApi::class)
16-
public class KtorTransport(private val webSocketSession: WebSocketSession): RPCTransport {
17-
// Transport job should always be cancelled and never closed
18-
private val transportJob = Job()
19-
20-
override val coroutineContext: CoroutineContext = webSocketSession.coroutineContext + transportJob
21-
22-
init {
23-
// Close the socket when the transport job is cancelled manually
24-
transportJob.invokeOnCompletion(onCancelling = true) {
25-
if (!webSocketSession.coroutineContext.isActive) return@invokeOnCompletion
26-
GlobalScope.launch {
27-
webSocketSession.close()
28-
}
29-
}
30-
}
14+
public class KtorTransport(
15+
private val webSocketSession: WebSocketSession,
16+
) : RPCTransport, CoroutineScope by webSocketSession {
3117

3218
/**
3319
* Sends a single encoded RPC message over network (or any other medium) to a peer endpoint.

krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/transport/ktor/KtorTransportTest.kt

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,9 @@
66

77
package kotlinx.rpc.transport.ktor
88

9-
import io.ktor.client.*
10-
import io.ktor.client.plugins.websocket.*
119
import io.ktor.server.application.*
12-
import io.ktor.server.engine.*
13-
import io.ktor.server.netty.*
14-
import io.ktor.server.routing.*
10+
import io.ktor.server.testing.*
1511
import kotlinx.coroutines.cancel
16-
import kotlinx.coroutines.runBlocking
1712
import kotlinx.rpc.RPC
1813
import kotlinx.rpc.client.withService
1914
import kotlinx.rpc.serialization.json
@@ -42,44 +37,37 @@ class NewServiceImpl(
4237

4338
class KtorTransportTest {
4439
@Test
45-
fun testEcho() = runBlocking {
46-
val server = embeddedServer(Netty, port = 4242) {
47-
install(RPC)
48-
routing {
49-
rpc("/rpc") {
50-
rpcConfig {
51-
serialization {
52-
json {
53-
ignoreUnknownKeys = true
54-
}
40+
fun testEcho() = testApplication {
41+
install(RPC)
42+
routing {
43+
rpc("/rpc") {
44+
rpcConfig {
45+
serialization {
46+
json {
47+
ignoreUnknownKeys = true
5548
}
56-
57-
waitForServices = true
5849
}
5950

60-
registerService<NewService> { NewServiceImpl(it, call) }
51+
waitForServices = true
6152
}
62-
}
63-
}.start()
6453

65-
val clientWithGlobalConfig = HttpClient {
66-
install(WebSockets) {
67-
maxFrameSize = Int.MAX_VALUE.toLong() - 42
54+
registerService<NewService> { NewServiceImpl(it, call) }
6855
}
69-
install(kotlinx.rpc.transport.ktor.client.RPC) {
56+
}
57+
58+
val clientWithGlobalConfig = createClient {
59+
installRPC {
7060
serialization {
7161
json()
7262
}
7363
}
7464
}
7565

7666
val ktorRPCClient = clientWithGlobalConfig
77-
.rpc("ws://localhost:4242/rpc") {
67+
.rpc("/rpc") {
7868
headers["TestHeader"] = "test-header"
7969
}
8070

81-
assertEquals(Int.MAX_VALUE.toLong() - 42, ktorRPCClient.webSocketSession.maxFrameSize)
82-
8371
val serviceWithGlobalConfig = ktorRPCClient.withService<NewService>()
8472

8573
val firstActual = serviceWithGlobalConfig.echo("Hello, world!")
@@ -89,11 +77,11 @@ class KtorTransportTest {
8977
serviceWithGlobalConfig.cancel()
9078
clientWithGlobalConfig.cancel()
9179

92-
val clientWithNoConfig = HttpClient {
80+
val clientWithNoConfig = createClient {
9381
installRPC()
9482
}
9583

96-
val serviceWithLocalConfig = clientWithNoConfig.rpc("ws://localhost:4242/rpc") {
84+
val serviceWithLocalConfig = clientWithNoConfig.rpc("/rpc") {
9785
headers["TestHeader"] = "test-header"
9886

9987
rpcConfig {
@@ -109,7 +97,5 @@ class KtorTransportTest {
10997

11098
serviceWithLocalConfig.cancel()
11199
clientWithNoConfig.cancel()
112-
113-
server.stop()
114100
}
115101
}

krpc/krpc-test/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ kotlin {
4646
implementation(libs.logback.classic)
4747

4848
implementation(libs.coroutines.test)
49+
implementation(libs.coroutines.debug)
4950
implementation(libs.kotlin.reflect)
5051
}
5152
}

0 commit comments

Comments
 (0)