Skip to content

Commit f4908ff

Browse files
committed
Added stress tests
1 parent 2d5b2c7 commit f4908ff

File tree

8 files changed

+345
-54
lines changed

8 files changed

+345
-54
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,15 +295,15 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
295295

296296
sendCancellation(CancellationType.REQUEST, call.descriptor.fqName, callId)
297297

298-
connector.unsubscribeFromMessages(call.descriptor.fqName, callId) {
299-
cancellingRequests.remove(callId)
300-
}
298+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
299+
cancellingRequests.remove(callId)
301300
}
302301

303302
throw e
304303
} finally {
305304
channel.close()
306305
requestChannels.remove(callId)
306+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
307307
}
308308
}
309309
}

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/internal/KrpcClientConnector.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ internal class KrpcClientConnector private constructor(
2020
KrpcConnector(serialFormat, transport, config, isServer = false)
2121
)
2222

23-
fun unsubscribeFromMessages(serviceTypeString: String, callId: String, callback: suspend () -> Unit = {}) {
24-
connector.unsubscribeFromMessages(HandlerKey.ServiceCall(serviceTypeString, callId), callback)
23+
suspend fun unsubscribeFromMessages(serviceTypeString: String, callId: String) {
24+
connector.unsubscribeFromMessages(HandlerKey.ServiceCall(serviceTypeString, callId))
2525
}
2626

2727
suspend fun subscribeToCallResponse(

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -110,25 +110,34 @@ public class KrpcConnector(
110110
}.sendMessage(message)
111111
}
112112

113-
public fun unsubscribeFromMessages(key: HandlerKey<*>, callback: suspend () -> Unit = {}) {
113+
public fun unsubscribeFromMessagesAsync(key: HandlerKey<*>, callback: suspend () -> Unit = {}) {
114+
if (!keyLocks.containsKey(key)) {
115+
println("Key $key is not registered")
116+
return
117+
}
118+
114119
transportScope.launch(CoroutineName("krpc-connector-unsubscribe-$key")) {
115-
withLockForKey(key) {
116-
if (key is HandlerKey.Service) {
117-
receiveHandlers.withKeys { keys ->
118-
keys
119-
.filter { it is HandlerKey.ServiceCall && it.serviceType == key.serviceType }
120-
.forEach {
121-
cleanForKey(it)
122-
}
123-
}
120+
unsubscribeFromMessages(key)
124121

125-
serviceSubscriptions.remove(key)
126-
} else {
127-
cleanForKey(key)
122+
callback()
123+
}
124+
}
125+
126+
public suspend fun unsubscribeFromMessages(key: HandlerKey<*>) {
127+
withLockForKey(key, createKey = false) {
128+
if (key is HandlerKey.Service) {
129+
receiveHandlers.withKeys { keys ->
130+
keys
131+
.filter { it is HandlerKey.ServiceCall && it.serviceType == key.serviceType }
132+
.forEach {
133+
cleanForKey(it)
134+
}
128135
}
129-
}
130136

131-
callback()
137+
serviceSubscriptions.remove(key)
138+
} else {
139+
cleanForKey(key)
140+
}
132141
}
133142
}
134143

@@ -141,23 +150,25 @@ public class KrpcConnector(
141150
public suspend fun <Message : KrpcMessage> subscribeToMessages(
142151
key: HandlerKey<Message>,
143152
subscription: KrpcMessageSubscription<Message>,
144-
): Unit = withLockForKey(key) {
145-
if (key is HandlerKey.Service) {
146-
serviceSubscriptions.computeIfAbsent(key) {
147-
@Suppress("UNCHECKED_CAST")
148-
subscription as KrpcMessageSubscription<KrpcCallMessage>
149-
}
153+
) {
154+
withLockForKey(key, createKey = true) {
155+
if (key is HandlerKey.Service) {
156+
serviceSubscriptions.computeIfAbsent(key) {
157+
@Suppress("UNCHECKED_CAST")
158+
subscription as KrpcMessageSubscription<KrpcCallMessage>
159+
}
150160

151-
receiveHandlers.withKeys { keys ->
152-
keys
153-
.filter { it is HandlerKey.ServiceCall && it.serviceType == key.serviceType }
154-
.forEach {
155-
@Suppress("UNCHECKED_CAST")
156-
subscribeWithActingHandlerPerTrack(it as HandlerKey<Message>, subscription)
157-
}
161+
receiveHandlers.withKeys { keys ->
162+
keys
163+
.filter { it is HandlerKey.ServiceCall && it.serviceType == key.serviceType }
164+
.forEach {
165+
@Suppress("UNCHECKED_CAST")
166+
subscribeWithActingHandlerPerTrack(it as HandlerKey<Message>, subscription)
167+
}
168+
}
169+
} else {
170+
subscribeWithActingHandlerPerTrack(key, subscription)
158171
}
159-
} else {
160-
subscribeWithActingHandlerPerTrack(key, subscription)
161172
}
162173
}
163174

@@ -366,7 +377,7 @@ public class KrpcConnector(
366377
sendHandlers[HandlerKey.Protocol]?.updateWindowSize(sendBufferSize ?: -1)
367378
}
368379

369-
withLockForKey(key) {
380+
withLockForKey(key, createKey = true) {
370381
val handler = handlerFor(key)
371382

372383
handler.handle(message) { cause ->
@@ -378,7 +389,7 @@ public class KrpcConnector(
378389

379390
sendMessage(failure)
380391
}
381-
}.onFailure {
392+
}?.onFailure {
382393
if (message.isException) {
383394
return@onFailure
384395
}
@@ -390,13 +401,13 @@ public class KrpcConnector(
390401
)
391402

392403
sendMessage(failure)
393-
}.onClosed {
404+
}?.onClosed {
394405
// do nothing; it's a service message, meaning that the service is dead
395406
}
396407
}
397408

398409
private suspend fun processServiceMessage(message: KrpcCallMessage, key: HandlerKey<KrpcCallMessage>) {
399-
val (handler, result) = withLockForKey(key) {
410+
val (handler, result) = withLockForKey(key, createKey = true) {
400411
val handler = receiveHandlers[key]
401412
?: if (config.waitTimeout.isPositive()) {
402413
handlerFor(key)
@@ -419,7 +430,7 @@ public class KrpcConnector(
419430
}
420431

421432
handler to result
422-
}
433+
} ?: error("unreachable, as we create a lock for the key")
423434

424435
result.onFailure {
425436
if (message.isException) {
@@ -445,20 +456,22 @@ public class KrpcConnector(
445456
transportScope.launch(CoroutineName("krpc-connector-discard-if-unprocessed-$key")) {
446457
delay(config.waitTimeout)
447458

448-
withLockForKey(key) {
459+
withLockForKey(key, createKey = false) {
449460
if (handler.processingStarted || receiveHandlers[key] != handler) {
450461
return@launch
451462
}
452463

453464
receiveHandlers.remove(key)
454-
handler.close(
455-
key = key,
456-
e = illegalStateException(
457-
"Waiting limit of ${config.waitTimeout} " +
458-
"is exceeded for unprocessed messages with $key"
459-
),
460-
)
465+
keyLocks.remove(key)
461466
}
467+
468+
handler.close(
469+
key = key,
470+
e = illegalStateException(
471+
"Waiting limit of ${config.waitTimeout} " +
472+
"is exceeded for unprocessed messages with $key"
473+
),
474+
)
462475
}
463476
}
464477
}
@@ -477,15 +490,22 @@ public class KrpcConnector(
477490

478491
private suspend inline fun <T> withLockForKey(
479492
key: HandlerKey<*>,
493+
createKey: Boolean,
480494
action: () -> T,
481-
): T {
495+
): T? {
482496
val lockKey = if (key is HandlerKey.ServiceCall && role == SERVER_ROLE) {
483497
HandlerKey.Service(key.serviceType)
484498
} else {
485499
key
486500
}
487501

488-
return keyLocks.computeIfAbsent(lockKey) { Mutex() }.withLock(action = action)
502+
val mutex = if (createKey) {
503+
keyLocks.computeIfAbsent(lockKey) { Mutex() }
504+
} else {
505+
keyLocks[lockKey]
506+
}
507+
508+
return mutex?.withLock(action = action)
489509
}
490510

491511
internal companion object {

krpc/krpc-logging/build.gradle.kts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
plugins {
@@ -15,5 +15,11 @@ kotlin {
1515
implementation(projects.utils)
1616
}
1717
}
18+
19+
jvmMain {
20+
dependencies {
21+
implementation(libs.slf4j.api)
22+
}
23+
}
1824
}
1925
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerConnector.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ internal class KrpcServerConnector private constructor(
2121
)
2222

2323
fun unsubscribeFromServiceMessages(serviceTypeString: String, callback: suspend () -> Unit = {}) {
24-
connector.unsubscribeFromMessages(HandlerKey.Service(serviceTypeString), callback)
24+
connector.unsubscribeFromMessagesAsync(HandlerKey.Service(serviceTypeString), callback)
25+
}
26+
27+
suspend fun unsubscribeFromCallMessages(serviceTypeString: String, callId: String) {
28+
connector.unsubscribeFromMessages(HandlerKey.ServiceCall(serviceTypeString, callId))
2529
}
2630

2731
suspend fun subscribeToServiceMessages(

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ internal class KrpcServerService<@Rpc T : Any>(
193193
connectionId = callData.connectionId,
194194
serviceId = callData.serviceId,
195195
).also { connector.sendMessage(it) }
196+
} finally {
197+
connector.unsubscribeFromCallMessages(callData.serviceType, callData.callId)
196198
}
197199

198200
if (failure != null) {

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import kotlin.time.ExperimentalTime
1818

1919
class LocalTransport(
2020
parentContext: CoroutineContext? = null,
21+
private val recordTimestamps: Boolean = true,
2122
) : CoroutineScope {
2223
override val coroutineContext = SupervisorJob(parentContext?.get(Job))
2324

@@ -34,7 +35,9 @@ class LocalTransport(
3435

3536
@OptIn(ExperimentalTime::class)
3637
override suspend fun send(message: KrpcTransportMessage) {
37-
lastMessageSentOnClient.getAndSet(Clock.System.now().toEpochMilliseconds())
38+
if (recordTimestamps) {
39+
lastMessageSentOnClient.getAndSet(Clock.System.now().toEpochMilliseconds())
40+
}
3841
serverIncoming.send(message)
3942
}
4043

@@ -50,7 +53,9 @@ class LocalTransport(
5053

5154
@OptIn(ExperimentalTime::class)
5255
override suspend fun send(message: KrpcTransportMessage) {
53-
lastMessageSentOnServer.getAndSet(Clock.System.now().toEpochMilliseconds())
56+
if (recordTimestamps) {
57+
lastMessageSentOnServer.getAndSet(Clock.System.now().toEpochMilliseconds())
58+
}
5459
clientIncoming.send(message)
5560
}
5661

0 commit comments

Comments
 (0)