Skip to content

Commit 65bd4ec

Browse files
committed
Update sendMessage usages to ignore the closed channel when possible
1 parent 1e7d5c6 commit 65bd4ec

File tree

6 files changed

+57
-44
lines changed

6 files changed

+57
-44
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
216216
connector.subscribeToGenericMessages(::handleGenericMessage)
217217
connector.subscribeToProtocolMessages(::handleProtocolMessage)
218218

219-
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
219+
connector.sendMessageChecked(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL)) {
220+
// ignore, we are already cancelled and have a cause
221+
}
220222
}
221223
}
222224

@@ -477,7 +479,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
477479
connectionId = outgoingStream.connectionId,
478480
serviceId = outgoingStream.serviceId,
479481
)
480-
sendException(message)
482+
connector.sendMessageChecked(message) {
483+
// ignore, we are already cancelled and have a cause
484+
}
481485

482486
// stop the flow and its coroutine, other flows are not affected
483487
throw e
@@ -491,7 +495,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
491495
connectionId = outgoingStream.connectionId,
492496
serviceId = outgoingStream.serviceId,
493497
)
494-
sendException(message)
498+
connector.sendMessageChecked(message) {
499+
// ignore, we are already cancelled and have a cause
500+
}
495501

496502
throw cause
497503
}
@@ -507,14 +513,6 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
507513
sender.sendMessage(message)
508514
}
509515

510-
private suspend fun sendException(message: KrpcMessage) {
511-
try {
512-
sender.sendMessage(message)
513-
} catch (_: ClosedSendChannelException) {
514-
// ignore, we are already cancelled and have a cause
515-
}
516-
}
517-
518516
private suspend fun collectAndSendOutgoingStream(
519517
serialFormat: SerialFormat,
520518
flow: Flow<*>,
@@ -552,9 +550,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
552550
}
553551
}
554552

555-
try {
556-
sender.sendMessage(message)
557-
} catch (e: ClosedSendChannelException) {
553+
sender.sendMessageChecked(message) { e ->
558554
throw CancellationException("Request cancelled", e)
559555
}
560556
}

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ public interface KrpcMessageSender {
3535
public fun drainSendQueueAndClose(message: String)
3636
}
3737

38+
@InternalRpcApi
39+
public suspend inline fun KrpcMessageSender.sendMessageChecked(
40+
message: KrpcMessage,
41+
onChannelClosed: (ClosedSendChannelException) -> Unit,
42+
) {
43+
try {
44+
sendMessage(message)
45+
} catch (e: ClosedSendChannelException) {
46+
onChannelClosed(e)
47+
}
48+
}
49+
3850
internal typealias KrpcMessageSubscription<Message> = suspend (Message) -> Unit
3951

4052
/**
@@ -72,9 +84,7 @@ public class KrpcConnector(
7284

7385
// prevent errors ping-pong
7486
private suspend fun sendMessageIgnoreClosure(message: KrpcMessage) {
75-
try {
76-
sendMessage(message)
77-
} catch (_: ClosedSendChannelException) {
87+
sendMessageChecked(message) {
7888
// ignore
7989
}
8090
}

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ public interface KrpcEndpoint {
4444
).toMap()
4545
)
4646

47-
try {
48-
sender.sendMessage(message)
49-
} catch (_: ClosedSendChannelException) {
47+
sender.sendMessageChecked(message) {
5048
// ignore, call was already closed
5149
}
5250
}
@@ -76,7 +74,9 @@ public interface KrpcEndpoint {
7674
connectionId = message.connectionId,
7775
)
7876

79-
sender.sendMessage(failure)
77+
sender.sendMessageChecked(failure) {
78+
// ignore, call was already closed
79+
}
8080
}
8181
}
8282

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,17 +212,15 @@ internal class KrpcActingReceiveHandler(
212212
}
213213

214214
internal suspend fun broadcastWindowUpdate(update: Int, connectionId: Long?, serviceType: String, callId: String) {
215-
try {
216-
sender.sendMessage(
217-
KrpcGenericMessage(
218-
connectionId = connectionId,
219-
pluginParams = mutableMapOf(
220-
KrpcPluginKey.WINDOW_UPDATE to "$update",
221-
KrpcPluginKey.WINDOW_KEY to "$serviceType/$callId",
222-
),
223-
)
215+
sender.sendMessageChecked(
216+
KrpcGenericMessage(
217+
connectionId = connectionId,
218+
pluginParams = mutableMapOf(
219+
KrpcPluginKey.WINDOW_UPDATE to "$update",
220+
KrpcPluginKey.WINDOW_KEY to "$serviceType/$callId",
221+
),
224222
)
225-
} catch (_: ClosedSendChannelException) {
223+
) {
226224
// ignore, connection is closed, no more channel updates are needed
227225
}
228226
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ public abstract class KrpcServer(
104104
when (message) {
105105
is KrpcProtocolMessage.Handshake -> {
106106
supportedPlugins = message.supportedPlugins
107-
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL, connectionId = 1))
107+
connector.sendMessageChecked(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL, connectionId = 1)) {
108+
// ignore the closed connection
109+
}
108110
}
109111

110112
is KrpcProtocolMessage.Failure -> {

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ internal class KrpcServerService<@Rpc T : Any>(
6060
connectionId = message.connectionId,
6161
)
6262

63-
connector.sendMessage(errorMessage)
63+
connector.sendMessageChecked(errorMessage) {
64+
// ignore, the client probably already disconnected
65+
}
66+
6467
unsubscribeFromCallMessages(message.callId)
6568
}
6669
}
@@ -180,7 +183,7 @@ internal class KrpcServerService<@Rpc T : Any>(
180183

181184
sendFlowMessages(serialFormat, returnSerializer, value, callData)
182185
} else {
183-
sendMessages(serialFormat, returnSerializer, value, callData)
186+
sendMessageValue(serialFormat, returnSerializer, value, callData)
184187
}
185188
} catch (cause: CancellationException) {
186189
currentCoroutineContext().ensureActive()
@@ -201,9 +204,7 @@ internal class KrpcServerService<@Rpc T : Any>(
201204
serviceId = callData.serviceId,
202205
)
203206

204-
try {
205-
connector.sendMessage(exceptionMessage)
206-
} catch (_: ClosedSendChannelException) {
207+
connector.sendMessageChecked(exceptionMessage) {
207208
// ignore, the client probably already disconnected
208209
}
209210

@@ -225,7 +226,7 @@ internal class KrpcServerService<@Rpc T : Any>(
225226
connector.unsubscribeFromCallMessages(descriptor.fqName, callId)
226227
}
227228

228-
private suspend fun sendMessages(
229+
private suspend fun sendMessageValue(
229230
serialFormat: SerialFormat,
230231
returnSerializer: KSerializer<Any?>,
231232
value: Any?,
@@ -303,20 +304,22 @@ internal class KrpcServerService<@Rpc T : Any>(
303304
connector.sendMessage(result)
304305
}
305306

306-
connector.sendMessage(
307+
connector.sendMessageChecked(
307308
KrpcCallMessage.StreamFinished(
308309
callId = callData.callId,
309310
serviceType = descriptor.fqName,
310311
connectionId = callData.connectionId,
311312
serviceId = callData.serviceId,
312313
streamId = SINGLE_STREAM_ID,
313314
)
314-
)
315+
) {
316+
// do nothing
317+
}
315318
} catch (cause: CancellationException) {
316319
throw cause
317320
} catch (cause: Throwable) {
318321
val serializedCause = serializeException(cause)
319-
connector.sendMessage(
322+
connector.sendMessageChecked(
320323
KrpcCallMessage.StreamCancel(
321324
callId = callData.callId,
322325
serviceType = descriptor.fqName,
@@ -325,7 +328,9 @@ internal class KrpcServerService<@Rpc T : Any>(
325328
streamId = SINGLE_STREAM_ID,
326329
cause = serializedCause,
327330
)
328-
)
331+
) {
332+
// do nothing
333+
}
329334
}
330335
}
331336

@@ -351,7 +356,7 @@ internal class KrpcServerService<@Rpc T : Any>(
351356
closeReceiving(callId, message, cause, fromJob = false)
352357

353358
if (!supportedPlugins.contains(KrpcPlugin.NO_ACK_CANCELLATION)) {
354-
connector.sendMessage(
359+
connector.sendMessageChecked(
355360
KrpcGenericMessage(
356361
connectionId = null,
357362
pluginParams = mapOf(
@@ -360,7 +365,9 @@ internal class KrpcServerService<@Rpc T : Any>(
360365
KrpcPluginKey.CANCELLATION_ID to callId,
361366
)
362367
)
363-
)
368+
) {
369+
// do nothing
370+
}
364371
}
365372

366373
unsubscribeFromCallMessages(callId)

0 commit comments

Comments
 (0)