Skip to content

Commit abed7eb

Browse files
authored
Fix log error messages (#139)
* KRPC-94 Error messages in log after graceful stream cancellation * KRPC-95 Error message when client already cancelled the stream but server sends new element * KRPC-96 Occasional error message in log after stream cancellation
1 parent 3876a37 commit abed7eb

File tree

16 files changed

+148
-33
lines changed

16 files changed

+148
-33
lines changed

core/src/commonMain/kotlin/kotlinx/rpc/StreamScope.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ public class StreamScope(
4848
getRequestScope(callId).coroutineContext.job.invokeOnCompletion(onCancelling = true, handler = handler)
4949
}
5050

51-
public fun cancelRequestScopeById(callId: String, message: String, cause: Throwable?) {
52-
requests.remove(callId)?.cancel(message, cause)
51+
public fun cancelRequestScopeById(callId: String, message: String, cause: Throwable?): Job? {
52+
return requests.remove(callId)?.apply { cancel(message, cause) }?.coroutineContext?.job
5353
}
5454

5555
// Group stream launches by callId. In case one fails, so do others

core/src/commonMain/kotlin/kotlinx/rpc/internal/RPCStreamContext.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public class RPCStreamContext(
7272
}
7373
}
7474

75-
public fun cancel(message: String, cause: Throwable?) {
76-
streamScope.cancelRequestScopeById(callId, message, cause)
75+
public fun cancel(message: String, cause: Throwable?): Job? {
76+
return streamScope.cancelRequestScopeById(callId, message, cause)
7777
}
7878

7979
init {

core/src/commonMain/kotlin/kotlinx/rpc/internal/transport/CancellationType.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import kotlinx.rpc.internal.IndexedEnum
88
import kotlinx.rpc.internal.InternalRPCApi
99

1010
@InternalRPCApi
11+
@Suppress("detekt.MagicNumber")
1112
public enum class CancellationType(override val uniqueIndex: Int) : IndexedEnum {
1213
ENDPOINT(0),
1314
SERVICE(1),
1415
REQUEST(2),
16+
CANCELLATION_ACK(3),
1517
;
1618

1719
internal companion object {
@@ -25,9 +27,8 @@ public enum class CancellationType(override val uniqueIndex: Int) : IndexedEnum
2527
}
2628

2729
@InternalRPCApi
28-
public fun RPCMessage.cancellationType(): CancellationType {
30+
public fun RPCMessage.cancellationType(): CancellationType? {
2931
return get(RPCPluginKey.CANCELLATION_TYPE)?.let { value ->
3032
CancellationType.valueOfNull(value)
31-
?: error("Unknown ${RPCPluginKey.CANCELLATION_TYPE} value: $value")
32-
} ?: error("Expected ${RPCPluginKey.CANCELLATION_TYPE} field")
33+
}
3334
}

core/src/commonMain/kotlin/kotlinx/rpc/internal/transport/RPCConnector.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.internal.transport
66

7+
import kotlinx.coroutines.CancellationException
78
import kotlinx.coroutines.CoroutineScope
89
import kotlinx.coroutines.launch
910
import kotlinx.coroutines.sync.Mutex
@@ -166,9 +167,12 @@ public class RPCConnector<SubscriptionKey>(
166167
return
167168
}
168169

170+
val initialCause = (result as? HandlerResult.Failure)?.cause
171+
169172
val cause = IllegalStateException(
170173
"Failed to process call ${message.callId} for service ${message.serviceType}, " +
171-
"${subscriptions.size} attempts failed"
174+
"${subscriptions.size} attempts failed",
175+
initialCause,
172176
)
173177

174178
val callException = RPCCallMessage.CallException(
@@ -196,8 +200,11 @@ public class RPCConnector<SubscriptionKey>(
196200

197201
return when {
198202
result.isFailure -> {
199-
logger.error(result.exceptionOrNull()) { "Failed to handle message with key $key" }
200-
HandlerResult.Failure(result.exceptionOrNull())
203+
val exception = result.exceptionOrNull()
204+
if (exception !is CancellationException) {
205+
logger.error(exception) { "Failed to handle message with key $key" }
206+
}
207+
HandlerResult.Failure(exception)
201208
}
202209

203210
else -> {

core/src/commonMain/kotlin/kotlinx/rpc/internal/transport/RPCEndpoint.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,5 @@ public interface RPCEndpoint {
7474
}
7575

7676
@InternalRPCApi
77-
public fun handleCancellation(message: RPCGenericMessage)
77+
public suspend fun handleCancellation(message: RPCGenericMessage)
7878
}

core/src/commonMain/kotlin/kotlinx/rpc/internal/transport/RPCServiceHandler.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.internal.transport
66

7+
import kotlinx.coroutines.CancellationException
78
import kotlinx.coroutines.flow.Flow
89
import kotlinx.coroutines.flow.SharedFlow
910
import kotlinx.coroutines.flow.StateFlow
@@ -56,6 +57,9 @@ public abstract class RPCServiceHandler {
5657
)
5758
}
5859
}
60+
} catch (e : CancellationException) {
61+
// canceled by a streamScope
62+
throw e
5963
} catch (@Suppress("detekt.TooGenericExceptionCaught") cause: Throwable) {
6064
mutex.withLock {
6165
val serializedReason = serializeException(cause)

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/client/KRPCClient.kt

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import kotlinx.rpc.client.internal.RPCClientConnector
1515
import kotlinx.rpc.client.internal.RPCFlow
1616
import kotlinx.rpc.internal.*
1717
import kotlinx.rpc.internal.logging.CommonLogger
18+
import kotlinx.rpc.internal.map.ConcurrentHashMap
1819
import kotlinx.rpc.internal.transport.*
1920
import kotlinx.serialization.BinaryFormat
2021
import kotlinx.serialization.SerialFormat
@@ -69,6 +70,9 @@ public abstract class KRPCClient(
6970

7071
private var clientCancelled = false
7172

73+
// callId to serviceTypeString
74+
private val cancellingRequests = ConcurrentHashMap<String, String>()
75+
7276
init {
7377
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
7478
clientCancelled = true
@@ -202,7 +206,7 @@ public abstract class KRPCClient(
202206

203207
callResult.invokeOnCompletion { cause ->
204208
if (cause != null) {
205-
connector.unsubscribeFromMessages(call.serviceTypeString, rpcCall.callId)
209+
cancellingRequests[rpcCall.callId] = call.serviceTypeString
206210

207211
rpcCall.streamContext.valueOrNull?.cancel("Request failed", cause)
208212

@@ -212,8 +216,12 @@ public abstract class KRPCClient(
212216
} else {
213217
val streamScope = rpcCall.streamContext.valueOrNull?.streamScope
214218

215-
streamScope?.onScopeCompletion(rpcCall.callId) {
219+
if (streamScope == null) {
216220
connector.unsubscribeFromMessages(call.serviceTypeString, rpcCall.callId)
221+
}
222+
223+
streamScope?.onScopeCompletion(rpcCall.callId) {
224+
cancellingRequests[rpcCall.callId] = call.serviceTypeString
217225

218226
sendCancellation(CancellationType.REQUEST, call.serviceId.toString(), rpcCall.callId)
219227
}
@@ -273,6 +281,10 @@ public abstract class KRPCClient(
273281
callResult: RequestCompletableDeferred<Any?>,
274282
) {
275283
connector.subscribeToCallResponse(call.serviceTypeString, callId) { message ->
284+
if (cancellingRequests.containsKey(callId)) {
285+
return@subscribeToCallResponse
286+
}
287+
276288
handleMessage(message, streamContext, call, serialFormat, callResult)
277289
}
278290

@@ -331,17 +343,25 @@ public abstract class KRPCClient(
331343
}
332344

333345
@InternalRPCApi
334-
final override fun handleCancellation(message: RPCGenericMessage) {
346+
final override suspend fun handleCancellation(message: RPCGenericMessage) {
335347
when (val type = message.cancellationType()) {
336348
CancellationType.ENDPOINT -> {
337349
cancel("Closing client after server cancellation") // we cancel this client
338350
}
339351

352+
CancellationType.CANCELLATION_ACK -> {
353+
val callId = message[RPCPluginKey.CANCELLATION_ID]
354+
?: error("Expected CANCELLATION_ID for cancellation of type 'request'")
355+
356+
val serviceTypeString = cancellingRequests.remove(callId) ?: return
357+
connector.unsubscribeFromMessages(serviceTypeString, callId)
358+
}
359+
340360
else -> {
341-
error(
361+
logger.warn {
342362
"Unsupported ${RPCPluginKey.CANCELLATION_TYPE} $type for client, " +
343363
"only 'endpoint' type may be sent by a server"
344-
)
364+
}
345365
}
346366
}
347367
}

krpc/krpc-server/src/jvmMain/kotlin/kotlinx/rpc/server/KRPCServer.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,16 @@ public abstract class KRPCServer(
135135
config = config,
136136
connector = connector,
137137
coroutineContext = serviceInstanceContext,
138-
)
138+
).apply {
139+
coroutineContext.job.invokeOnCompletion {
140+
connector.unsubscribeFromServiceMessages(serviceKClass.qualifiedClassName)
141+
}
142+
}
139143
}
140144

141145
@InternalRPCApi
142-
final override fun handleCancellation(message: RPCGenericMessage) {
143-
when (message.cancellationType()) {
146+
final override suspend fun handleCancellation(message: RPCGenericMessage) {
147+
when (val type = message.cancellationType()) {
144148
CancellationType.ENDPOINT -> {
145149
cancelledByClient = true
146150

@@ -152,7 +156,7 @@ public abstract class KRPCServer(
152156
val serviceId = message[RPCPluginKey.CLIENT_SERVICE_ID]?.toLongOrNull()
153157
?: error("Expected CLIENT_SERVICE_ID for cancellation of type 'service' as Long value")
154158

155-
rpcServices[serviceId]?.cancel("Sevice cancelled by client")
159+
rpcServices[serviceId]?.cancel("Service cancelled by client")
156160
}
157161

158162
CancellationType.REQUEST -> {
@@ -164,6 +168,13 @@ public abstract class KRPCServer(
164168

165169
rpcServices[serviceId]?.cancelRequest(callId, "Request cancelled by client")
166170
}
171+
172+
else -> {
173+
logger.warn {
174+
"Unsupported ${RPCPluginKey.CANCELLATION_TYPE} $type for server, " +
175+
"only 'endpoint' type may be sent by a server"
176+
}
177+
}
167178
}
168179
}
169180
}

krpc/krpc-server/src/jvmMain/kotlin/kotlinx/rpc/server/internal/RPCServerConnector.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ internal class RPCServerConnector private constructor(
3535
}
3636
)
3737

38+
fun unsubscribeFromServiceMessages(serviceTypeString: String) {
39+
connector.unsubscribeFromMessages(MessageKey.Service(serviceTypeString))
40+
}
41+
3842
suspend fun subscribeToServiceMessages(
3943
serviceTypeString: String,
4044
subscription: suspend (RPCCallMessage) -> Unit,

krpc/krpc-server/src/jvmMain/kotlin/kotlinx/rpc/server/internal/RPCServerService.kt

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import kotlinx.rpc.*
99
import kotlinx.rpc.internal.*
1010
import kotlinx.rpc.internal.logging.CommonLogger
1111
import kotlinx.rpc.internal.map.ConcurrentHashMap
12-
import kotlinx.rpc.internal.transport.RPCCallMessage
13-
import kotlinx.rpc.internal.transport.RPCMessageSender
14-
import kotlinx.rpc.internal.transport.RPCServiceHandler
12+
import kotlinx.rpc.internal.transport.*
1513
import kotlinx.serialization.BinaryFormat
1614
import kotlinx.serialization.StringFormat
1715
import java.lang.reflect.InvocationTargetException
@@ -262,13 +260,30 @@ internal class RPCServerService<T : RPC>(
262260
requestJob.start()
263261
}
264262

265-
fun cancelRequest(callId: String, message: String? = null, cause: Throwable? = null, fromJob: Boolean = false) {
263+
suspend fun cancelRequest(
264+
callId: String,
265+
message: String? = null,
266+
cause: Throwable? = null,
267+
fromJob: Boolean = false,
268+
) {
266269
requestMap.remove(callId)?.cancelAndClose(callId, message, cause, fromJob)
270+
271+
// acknowledge the cancellation
272+
sender.sendMessage(
273+
RPCGenericMessage(
274+
connectionId = null,
275+
pluginParams = mapOf(
276+
RPCPluginKey.GENERIC_MESSAGE_TYPE to RPCGenericMessage.CANCELLATION_TYPE,
277+
RPCPluginKey.CANCELLATION_TYPE to CancellationType.CANCELLATION_ACK.toString(),
278+
RPCPluginKey.CANCELLATION_ID to callId,
279+
)
280+
)
281+
)
267282
}
268283
}
269284

270285
private class RPCRequest(val handlerJob: Job, val streamContext: LazyRPCStreamContext) {
271-
fun cancelAndClose(
286+
suspend fun cancelAndClose(
272287
callId: String,
273288
message: String? = null,
274289
cause: Throwable? = null,
@@ -280,13 +295,17 @@ private class RPCRequest(val handlerJob: Job, val streamContext: LazyRPCStreamCo
280295
message != null -> handlerJob.cancel(message)
281296
else -> handlerJob.cancel()
282297
}
298+
299+
handlerJob.join()
283300
}
284301

285302
val ctx = streamContext.valueOrNull
286303
if (ctx == null) {
287-
streamContext.streamScopeOrNull?.cancelRequestScopeById(callId, message ?: "Scope cancelled", cause)
304+
streamContext.streamScopeOrNull
305+
?.cancelRequestScopeById(callId, message ?: "Scope cancelled", cause)
306+
?.join()
288307
} else {
289-
ctx.cancel(message ?: "Request cancelled", cause)
308+
ctx.cancel(message ?: "Request cancelled", cause)?.join()
290309
}
291310
}
292311
}

0 commit comments

Comments
 (0)