Skip to content

Commit 411f6fb

Browse files
committed
Fix "Channel was closed" exceptions
1 parent 96be806 commit 411f6fb

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Job
1010
import kotlinx.coroutines.cancel
1111
import kotlinx.coroutines.channels.Channel
12+
import kotlinx.coroutines.channels.ClosedSendChannelException
1213
import kotlinx.coroutines.delay
14+
import kotlinx.coroutines.job
1315
import kotlinx.coroutines.launch
1416
import kotlinx.coroutines.sync.Mutex
1517
import kotlinx.coroutines.sync.withLock
@@ -68,6 +70,15 @@ public class KrpcConnector(
6870

6971
private val dumpLogger by lazy { RpcInternalDumpLoggerContainer.provide() }
7072

73+
// prevent errors ping-pong
74+
private suspend fun sendMessageIgnoreClosure(message: KrpcMessage) {
75+
try {
76+
sendMessage(message)
77+
} catch (_: ClosedSendChannelException) {
78+
// ignore
79+
}
80+
}
81+
7182
override suspend fun sendMessage(message: KrpcMessage) {
7283
if (message is KrpcProtocolMessage.Handshake) {
7384
message.pluginParams[KrpcPluginKey.WINDOW_UPDATE] = "${config.perCallBufferSize}"
@@ -271,6 +282,13 @@ public class KrpcConnector(
271282
processMessage(transport.receiveCatching().getOrNull() ?: break)
272283
}
273284
}
285+
286+
transportScope.coroutineContext.job.invokeOnCompletion {
287+
receiveHandlers.clear()
288+
keyLocks.clear()
289+
serviceSubscriptions.clear()
290+
sendHandlers.clear()
291+
}
274292
}
275293

276294
private fun decodeMessage(transportMessage: KrpcTransportMessage): KrpcMessage? {
@@ -344,7 +362,7 @@ public class KrpcConnector(
344362
}
345363

346364
is WindowResult.Failure -> {
347-
sendMessage(
365+
sendMessageIgnoreClosure(
348366
KrpcProtocolMessage.Failure(
349367
errorMessage = result.message,
350368
connectionId = message.connectionId,
@@ -384,13 +402,18 @@ public class KrpcConnector(
384402
val handler = handlerFor(key)
385403

386404
handler.handle(message) { cause ->
405+
if (message.isException) {
406+
return@handle
407+
}
408+
387409
val failure = KrpcProtocolMessage.Failure(
388410
connectionId = message.connectionId,
389411
errorMessage = "Failed to process $key, error: ${cause?.message}",
390412
failedMessage = message,
391413
)
392414

393-
sendMessage(failure)
415+
// too late for exceptions
416+
sendMessageIgnoreClosure(failure)
394417
}
395418
}?.onFailure {
396419
if (message.isException) {
@@ -403,7 +426,8 @@ public class KrpcConnector(
403426
failedMessage = message,
404427
)
405428

406-
sendMessage(failure)
429+
// too late for exceptions
430+
sendMessageIgnoreClosure(failure)
407431
}?.onClosed {
408432
// do nothing; it's a service message, meaning that the service is dead
409433
}
@@ -424,6 +448,10 @@ public class KrpcConnector(
424448
}
425449

426450
val result = handler.handle(message) { initialCause ->
451+
if (message.isException) {
452+
return@handle
453+
}
454+
427455
val cause = illegalStateException(
428456
message = "Failed to process call ${message.callId} for service ${message.serviceType}",
429457
cause = initialCause,

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.rpc.krpc.internal
66

77
import kotlinx.coroutines.CoroutineName
8+
import kotlinx.coroutines.channels.ClosedSendChannelException
89
import kotlinx.coroutines.launch
910
import kotlinx.rpc.internal.utils.InternalRpcApi
1011

@@ -43,7 +44,11 @@ public interface KrpcEndpoint {
4344
).toMap()
4445
)
4546

46-
sender.sendMessage(message)
47+
try {
48+
sender.sendMessage(message)
49+
} catch (_: ClosedSendChannelException) {
50+
// ignore, call was already closed
51+
}
4752
}
4853

4954
if (closeTransportAfterSending) {

0 commit comments

Comments
 (0)