From f3da6ee5667481ce1e08cd8909bda2679a40293f Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Tue, 16 Sep 2025 19:10:00 +0200 Subject: [PATCH 1/7] Added Ktor closure tests --- .../rpc/krpc/ktor/KtorTransportTest.kt | 161 +++++++++++++++--- .../logging/RpcInternalCommonLogger.kt | 2 +- versions-root/libs.versions.toml | 2 +- 3 files changed, 143 insertions(+), 22 deletions(-) diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt index 680c6cfc5..394e25699 100644 --- a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt +++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt @@ -8,6 +8,7 @@ package kotlinx.rpc.krpc.ktor import io.ktor.client.* import io.ktor.client.engine.cio.* +import io.ktor.client.plugins.HttpRequestRetry import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.server.application.* @@ -17,9 +18,7 @@ import io.ktor.server.response.* import io.ktor.server.routing.* import io.ktor.server.testing.* import kotlinx.coroutines.* -import kotlinx.coroutines.debug.DebugProbes import kotlinx.rpc.annotations.Rpc -import kotlinx.rpc.krpc.client.KrpcClient import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer import kotlinx.rpc.krpc.internal.logging.dumpLogger @@ -32,12 +31,12 @@ import kotlinx.rpc.krpc.serialization.json.json import kotlinx.rpc.test.runTestWithCoroutinesProbes import kotlinx.rpc.withService import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import java.net.ServerSocket import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import kotlin.coroutines.cancellation.CancellationException -import kotlin.test.Ignore import kotlin.test.Test +import kotlin.test.fail import kotlin.time.Duration.Companion.seconds @Rpc @@ -62,13 +61,14 @@ interface SlowService { class SlowServiceImpl : SlowService { val received = CompletableDeferred() + val fence = CompletableDeferred() override suspend fun verySlow(): String { received.complete(Unit) - delay(Int.MAX_VALUE.toLong()) + fence.await() - error("Must not be called") + return "hello" } } @@ -134,10 +134,7 @@ class KtorTransportTest { @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) @Test - @Ignore("Wait for Ktor fix (https://github.com/ktorio/ktor/pull/4927) or apply workaround if rejected") - fun testEndpointsTerminateWhenWsDoes() = runTestWithCoroutinesProbes(timeout = 15.seconds) { - DebugProbes.install() - + fun testClientTerminatesWhenServerWsDoes() = runTestWithCoroutinesProbes(timeout = 60.seconds) { val logger = setupLogger() val port: Int = findFreePort() @@ -147,7 +144,7 @@ class KtorTransportTest { val serverReady = CompletableDeferred() val dropServer = CompletableDeferred() - val service = SlowServiceImpl() + val impl = SlowServiceImpl() @Suppress("detekt.GlobalCoroutineUsage") val serverJob = GlobalScope.launch(CoroutineName("server")) { @@ -171,22 +168,27 @@ class KtorTransportTest { } } - registerService { service } + registerService { impl } } } - }.start(wait = false) + }.startSuspend(wait = false) serverReady.complete(Unit) dropServer.await() - server.stop(shutdownGracePeriod = 100L, shutdownTimeout = 100L, timeUnit = TimeUnit.MILLISECONDS) + server.stopSuspend(gracePeriodMillis = 100L, timeoutMillis = 300L) } logger.info { "Server stopped" } } val ktorClient = HttpClient(CIO) { + install(HttpRequestRetry) { + retryOnServerErrors(maxRetries = 5) + exponentialDelay() + } + installKrpc { serialization { json() @@ -200,17 +202,18 @@ class KtorTransportTest { val rpcClient = ktorClient.rpc("ws://0.0.0.0:$port/rpc") - launch { + var cancellationExceptionCaught = false + val job = launch { try { rpcClient.withService().verySlow() - error("Must not be called") + fail("Must not be called") } catch (_: CancellationException) { - logger.info { "Cancellation exception caught for RPC request" } + cancellationExceptionCaught = true ensureActive() } } - service.received.await() + impl.received.await() logger.info { "Received RPC request" } @@ -218,14 +221,132 @@ class KtorTransportTest { logger.info { "Waiting for RPC client to complete" } - (rpcClient as KrpcClient).awaitCompletion() + rpcClient.awaitCompletion() + + job.join() + + assertTrue(cancellationExceptionCaught) logger.info { "RPC client completed" } ktorClient.close() + + serverJob.join() newPool.close() + } + + @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) + @Test + fun testServerTerminatesWhenClientWsDoes() = runTestWithCoroutinesProbes(timeout = 60.seconds) { + val logger = setupLogger() + + val port: Int = findFreePort() + + val newPool = Executors.newCachedThreadPool().asCoroutineDispatcher() + + val serverReady = CompletableDeferred() + val dropServer = CompletableDeferred() - serverJob.cancel() + val impl = SlowServiceImpl() + val sessionFinished = CompletableDeferred() + + @Suppress("detekt.GlobalCoroutineUsage") + val serverJob = GlobalScope.launch(CoroutineName("server")) { + withContext(newPool) { + val server = embeddedServer( + factory = Netty, + port = port, + parentCoroutineContext = newPool, + ) { + install(Krpc) + + routing { + get { + call.respondText("hello") + } + + rpc("/rpc") { + coroutineContext.job.invokeOnCompletion { + sessionFinished.complete(Unit) + } + + rpcConfig { + serialization { + json() + } + } + + registerService { impl } + } + } + }.startSuspend(wait = false) + + serverReady.complete(Unit) + + dropServer.await() + + server.stopSuspend(gracePeriodMillis = 100L, timeoutMillis = 300L) + } + + logger.info { "Server stopped" } + } + + val ktorClient = HttpClient(CIO) { + install(HttpRequestRetry) { + retryOnServerErrors(maxRetries = 5) + exponentialDelay() + } + + installKrpc { + serialization { + json() + } + } + } + + serverReady.await() + + assertEquals("hello", ktorClient.get("http://0.0.0.0:$port").bodyAsText()) + + val rpcClient = ktorClient.rpc("ws://0.0.0.0:$port/rpc") + + var cancellationExceptionCaught = false + val job = launch { + try { + rpcClient.withService().verySlow() + fail("Must not be called") + } catch (_: CancellationException) { + cancellationExceptionCaught = true + ensureActive() + } + } + + impl.received.await() + + logger.info { "Received RPC request, Dropping client" } + + ktorClient.cancel() + + logger.info { "Waiting for RPC client to complete" } + + rpcClient.awaitCompletion() + + logger.info { "Waiting for request to complete" } + + job.join() + + assertTrue(cancellationExceptionCaught) + + logger.info { "RPC client and request completed" } + + sessionFinished.await() + + logger.info { "Session finished" } + + dropServer.complete(Unit) + serverJob.join() + + newPool.close() } private fun findFreePort(): Int { diff --git a/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt b/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt index 2b763aa27..21bd5ce23 100644 --- a/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt +++ b/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt @@ -46,7 +46,7 @@ public interface RpcInternalCommonLogger { } public fun logger(kClass: KClass): RpcInternalCommonLogger { - return logger(kClass::class.simpleName ?: kClass.toString()) + return logger(kClass.qualifiedName ?: kClass.toString()) } public inline fun logger(): RpcInternalCommonLogger { diff --git a/versions-root/libs.versions.toml b/versions-root/libs.versions.toml index 2d73ae687..28244f478 100644 --- a/versions-root/libs.versions.toml +++ b/versions-root/libs.versions.toml @@ -9,7 +9,7 @@ kotlin-compiler = "0.0.0" # default to kotlin-lang or env.KOTLIN_COMPILER_VERSIO # kotlin independent versions detekt-analyzer = "1.23.6" coroutines = "1.10.2" -ktor = "3.2.1" +ktor = "3.3.0" kotlin-logging = "7.0.7" slf4j = "2.0.17" logback = "1.3.14" From 7a27c34333d1ae9266a5d8b69cad2fe9fe151463 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 14:49:53 +0200 Subject: [PATCH 2/7] More tests for stream cancelling, simple fixes for other staff --- .../kotlinx/rpc/krpc/client/KrpcClient.kt | 19 +++++- .../logging/RpcInternalCommonLogger.kt | 2 +- .../kotlinx/rpc/krpc/server/KrpcServer.kt | 7 ++- .../krpc/server/internal/KrpcServerService.kt | 18 ++++-- .../test/cancellation/CancellationService.kt | 9 ++- .../test/cancellation/CancellationTest.kt | 63 +++++++++++++++++-- 6 files changed, 101 insertions(+), 17 deletions(-) diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt index 57c28d844..b68516809 100644 --- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt +++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive @@ -476,7 +477,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { connectionId = outgoingStream.connectionId, serviceId = outgoingStream.serviceId, ) - sender.sendMessage(message) + sendException(message) // stop the flow and its coroutine, other flows are not affected throw e @@ -490,7 +491,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { connectionId = outgoingStream.connectionId, serviceId = outgoingStream.serviceId, ) - sender.sendMessage(message) + sendException(message) throw cause } @@ -506,6 +507,14 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { sender.sendMessage(message) } + private suspend fun sendException(message: KrpcMessage) { + try { + sender.sendMessage(message) + } catch (_: ClosedSendChannelException) { + // ignore, we are already cancelled and have a cause + } + } + private suspend fun collectAndSendOutgoingStream( serialFormat: SerialFormat, flow: Flow<*>, @@ -543,7 +552,11 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { } } - sender.sendMessage(message) + try { + sender.sendMessage(message) + } catch (e: ClosedSendChannelException) { + throw CancellationException("Request cancelled", e) + } } } diff --git a/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt b/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt index 21bd5ce23..a84e1c3d3 100644 --- a/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt +++ b/krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt @@ -46,7 +46,7 @@ public interface RpcInternalCommonLogger { } public fun logger(kClass: KClass): RpcInternalCommonLogger { - return logger(kClass.qualifiedName ?: kClass.toString()) + return logger(kClass.simpleName ?: kClass.toString()) } public inline fun logger(): RpcInternalCommonLogger { diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt index 24545fac6..dfbe1eb8c 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt @@ -135,7 +135,7 @@ public abstract class KrpcServer( final override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass) { connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName) - rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName) + rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)?.close() } private fun <@Rpc Service : Any> createNewServiceInstance( @@ -159,6 +159,11 @@ public abstract class KrpcServer( cancelledByClient = true internalScope.cancel("Server cancelled by client") + + rpcServices.values.forEach { service -> + service.close() + } + rpcServices.clear() } diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt index 2afac9ebc..5e2ea4cff 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt @@ -5,6 +5,7 @@ package kotlinx.rpc.krpc.server.internal import kotlinx.coroutines.* +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.flow.Flow import kotlinx.rpc.annotations.Rpc import kotlinx.rpc.descriptor.RpcInvokator @@ -200,7 +201,11 @@ internal class KrpcServerService<@Rpc T : Any>( serviceId = callData.serviceId, ) - connector.sendMessage(exceptionMessage) + try { + connector.sendMessage(exceptionMessage) + } catch (_: ClosedSendChannelException) { + // ignore, the client probably already disconnected + } closeReceiving(callId, "Server request failed", failure, fromJob = true) } else { @@ -324,7 +329,12 @@ internal class KrpcServerService<@Rpc T : Any>( } } - suspend fun closeReceiving( + fun close() { + requestMap.entries.forEach { (callId, request) -> request.cancelAndClose(callId) } + requestMap.clear() + } + + fun closeReceiving( callId: String, message: String? = null, cause: Throwable? = null, @@ -377,7 +387,7 @@ internal class KrpcServerService<@Rpc T : Any>( } internal class RpcRequest(val handlerJob: Job, val streamContext: ServerStreamContext) { - suspend fun cancelAndClose( + fun cancelAndClose( callId: String, message: String? = null, cause: Throwable? = null, @@ -389,8 +399,6 @@ internal class RpcRequest(val handlerJob: Job, val streamContext: ServerStreamCo message != null -> handlerJob.cancel(message) else -> handlerJob.cancel() } - - handlerJob.join() } streamContext.removeCall(callId, cause) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt index 0e32e05c7..947cb81ef 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt @@ -63,6 +63,7 @@ class CancellationServiceImpl : CancellationService { } override fun incomingStream(): Flow { + waitCounter.increment() return resumableFlow(fence) } @@ -95,7 +96,13 @@ class CancellationServiceImpl : CancellationService { } override suspend fun outgoingStream(stream: Flow) { - consume(stream) + try { + waitCounter.increment() + consume(stream) + } catch (e: CancellationException) { + cancellationsCounter.increment() + throw e + } } @OptIn(DelicateCoroutinesApi::class) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt index 0669d0082..19575c465 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt @@ -150,6 +150,7 @@ class CancellationTest { assertEquals(1, serverInstance().cancellationsCounter.value, "Expected 1 request to be cancelled") } + @OptIn(ExperimentalCoroutinesApi::class) @Test fun testCancelClient() = runCancellationTest { val firstRequestJob = launch { @@ -162,23 +163,48 @@ class CancellationTest { secondService.longRequest() } - serverInstance().waitCounter.await(2) + val clientFlowJob = launch { + service.outgoingStream(flow { + emit(0) + serverInstance().fence.await() + emit(1) + }) + } + + val serverFlowJob = launch { + try { + secondService.incomingStream().toList() + fail("Expected cancellation of the client to cancel the server flow") + } catch (e : CancellationException) { + serverInstance().cancellationsCounter.increment() + throw e + } + } + + serverInstance().waitCounter.await(4) client.close() client.awaitCompletion() server.awaitCompletion() firstRequestJob.join() secondRequestJob.join() - serverInstance().cancellationsCounter.await(2) + clientFlowJob.join() + + serverInstance().fence.complete(Unit) + serverFlowJob.join() + + serverInstance().cancellationsCounter.await(4) assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled") assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled") + assertTrue(clientFlowJob.isCancelled, "Expected clientFlowJob to be cancelled") + assertTrue(serverFlowJob.isCancelled, "Expected serverFlowJob to be cancelled") assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed") checkAlive(clientAlive = false, serverAlive = false) stopAllAndJoin() - assertEquals(2, serverInstance().cancellationsCounter.value, "Expected 2 requests to be cancelled") + assertEquals(4, serverInstance().cancellationsCounter.value, "Expected 4 requests to be cancelled") } @Test @@ -193,23 +219,48 @@ class CancellationTest { secondService.longRequest() } - serverInstance().waitCounter.await(2) // wait for requests to reach server + val clientFlowJob = launch { + service.outgoingStream(flow { + emit(0) + serverInstance().fence.await() + emit(1) + }) + } + + val serverFlowJob = launch { + try { + secondService.incomingStream().toList() + fail("Expected cancellation of the client to cancel the server flow") + } catch (e : CancellationException) { + serverInstance().cancellationsCounter.increment() + throw e + } + } + + serverInstance().waitCounter.await(4) // wait for requests to reach server server.close() server.awaitCompletion() client.awaitCompletion() firstRequestJob.join() secondRequestJob.join() - serverInstance().cancellationsCounter.await(2) + clientFlowJob.join() + + serverInstance().fence.complete(Unit) + serverFlowJob.join() + + serverInstance().cancellationsCounter.await(4) assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled") assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled") + assertTrue(clientFlowJob.isCancelled, "Expected clientFlowJob to be cancelled") + assertTrue(serverFlowJob.isCancelled, "Expected serverFlowJob to be cancelled") assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed") checkAlive(clientAlive = false, serverAlive = false) stopAllAndJoin() - assertEquals(2, serverInstance().cancellationsCounter.value, "Expected 2 requests to be cancelled") + assertEquals(4, serverInstance().cancellationsCounter.value, "Expected 4 requests to be cancelled") } @Test From 515391f0e31617bf5ee4dec4cd70be22887dba72 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 15:02:02 +0200 Subject: [PATCH 3/7] Update sendMessage usages to ignore the closed channel when possible --- .../kotlinx/rpc/krpc/client/KrpcClient.kt | 24 ++++++-------- .../rpc/krpc/internal/KrpcConnector.kt | 16 ++++++++-- .../kotlinx/rpc/krpc/internal/KrpcEndpoint.kt | 8 ++--- .../rpc/krpc/internal/KrpcReceiveHandler.kt | 18 +++++------ .../kotlinx/rpc/krpc/server/KrpcServer.kt | 4 ++- .../krpc/server/internal/KrpcServerService.kt | 31 ++++++++++++------- 6 files changed, 57 insertions(+), 44 deletions(-) diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt index b68516809..c3a8f27a3 100644 --- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt +++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt @@ -216,7 +216,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { connector.subscribeToGenericMessages(::handleGenericMessage) connector.subscribeToProtocolMessages(::handleProtocolMessage) - connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL)) + connector.sendMessageChecked(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL)) { + // ignore, we are already cancelled and have a cause + } } } @@ -477,7 +479,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { connectionId = outgoingStream.connectionId, serviceId = outgoingStream.serviceId, ) - sendException(message) + connector.sendMessageChecked(message) { + // ignore, we are already cancelled and have a cause + } // stop the flow and its coroutine, other flows are not affected throw e @@ -491,7 +495,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { connectionId = outgoingStream.connectionId, serviceId = outgoingStream.serviceId, ) - sendException(message) + connector.sendMessageChecked(message) { + // ignore, we are already cancelled and have a cause + } throw cause } @@ -507,14 +513,6 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { sender.sendMessage(message) } - private suspend fun sendException(message: KrpcMessage) { - try { - sender.sendMessage(message) - } catch (_: ClosedSendChannelException) { - // ignore, we are already cancelled and have a cause - } - } - private suspend fun collectAndSendOutgoingStream( serialFormat: SerialFormat, flow: Flow<*>, @@ -552,9 +550,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { } } - try { - sender.sendMessage(message) - } catch (e: ClosedSendChannelException) { + sender.sendMessageChecked(message) { e -> throw CancellationException("Request cancelled", e) } } diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt index a86001ea6..6609a31c3 100644 --- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt +++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcConnector.kt @@ -35,6 +35,18 @@ public interface KrpcMessageSender { public fun drainSendQueueAndClose(message: String) } +@InternalRpcApi +public suspend inline fun KrpcMessageSender.sendMessageChecked( + message: KrpcMessage, + onChannelClosed: (ClosedSendChannelException) -> Unit, +) { + try { + sendMessage(message) + } catch (e: ClosedSendChannelException) { + onChannelClosed(e) + } +} + internal typealias KrpcMessageSubscription = suspend (Message) -> Unit /** @@ -72,9 +84,7 @@ public class KrpcConnector( // prevent errors ping-pong private suspend fun sendMessageIgnoreClosure(message: KrpcMessage) { - try { - sendMessage(message) - } catch (_: ClosedSendChannelException) { + sendMessageChecked(message) { // ignore } } diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt index 2eb14c35b..0ac4d2251 100644 --- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt +++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt @@ -44,9 +44,7 @@ public interface KrpcEndpoint { ).toMap() ) - try { - sender.sendMessage(message) - } catch (_: ClosedSendChannelException) { + sender.sendMessageChecked(message) { // ignore, call was already closed } } @@ -76,7 +74,9 @@ public interface KrpcEndpoint { connectionId = message.connectionId, ) - sender.sendMessage(failure) + sender.sendMessageChecked(failure) { + // ignore, call was already closed + } } } diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt index 6e9c6ac3e..be276f6a8 100644 --- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt +++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt @@ -212,17 +212,15 @@ internal class KrpcActingReceiveHandler( } internal suspend fun broadcastWindowUpdate(update: Int, connectionId: Long?, serviceType: String, callId: String) { - try { - sender.sendMessage( - KrpcGenericMessage( - connectionId = connectionId, - pluginParams = mutableMapOf( - KrpcPluginKey.WINDOW_UPDATE to "$update", - KrpcPluginKey.WINDOW_KEY to "$serviceType/$callId", - ), - ) + sender.sendMessageChecked( + KrpcGenericMessage( + connectionId = connectionId, + pluginParams = mutableMapOf( + KrpcPluginKey.WINDOW_UPDATE to "$update", + KrpcPluginKey.WINDOW_KEY to "$serviceType/$callId", + ), ) - } catch (_: ClosedSendChannelException) { + ) { // ignore, connection is closed, no more channel updates are needed } } diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt index dfbe1eb8c..14d66b16e 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt @@ -104,7 +104,9 @@ public abstract class KrpcServer( when (message) { is KrpcProtocolMessage.Handshake -> { supportedPlugins = message.supportedPlugins - connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL, connectionId = 1)) + connector.sendMessageChecked(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL, connectionId = 1)) { + // ignore the closed connection + } } is KrpcProtocolMessage.Failure -> { diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt index 5e2ea4cff..3eee4b6f1 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt @@ -60,7 +60,10 @@ internal class KrpcServerService<@Rpc T : Any>( connectionId = message.connectionId, ) - connector.sendMessage(errorMessage) + connector.sendMessageChecked(errorMessage) { + // ignore, the client probably already disconnected + } + unsubscribeFromCallMessages(message.callId) } } @@ -180,7 +183,7 @@ internal class KrpcServerService<@Rpc T : Any>( sendFlowMessages(serialFormat, returnSerializer, value, callData) } else { - sendMessages(serialFormat, returnSerializer, value, callData) + sendMessageValue(serialFormat, returnSerializer, value, callData) } } catch (cause: CancellationException) { currentCoroutineContext().ensureActive() @@ -201,9 +204,7 @@ internal class KrpcServerService<@Rpc T : Any>( serviceId = callData.serviceId, ) - try { - connector.sendMessage(exceptionMessage) - } catch (_: ClosedSendChannelException) { + connector.sendMessageChecked(exceptionMessage) { // ignore, the client probably already disconnected } @@ -225,7 +226,7 @@ internal class KrpcServerService<@Rpc T : Any>( connector.unsubscribeFromCallMessages(descriptor.fqName, callId) } - private suspend fun sendMessages( + private suspend fun sendMessageValue( serialFormat: SerialFormat, returnSerializer: KSerializer, value: Any?, @@ -303,7 +304,7 @@ internal class KrpcServerService<@Rpc T : Any>( connector.sendMessage(result) } - connector.sendMessage( + connector.sendMessageChecked( KrpcCallMessage.StreamFinished( callId = callData.callId, serviceType = descriptor.fqName, @@ -311,12 +312,14 @@ internal class KrpcServerService<@Rpc T : Any>( serviceId = callData.serviceId, streamId = SINGLE_STREAM_ID, ) - ) + ) { + // do nothing + } } catch (cause: CancellationException) { throw cause } catch (cause: Throwable) { val serializedCause = serializeException(cause) - connector.sendMessage( + connector.sendMessageChecked( KrpcCallMessage.StreamCancel( callId = callData.callId, serviceType = descriptor.fqName, @@ -325,7 +328,9 @@ internal class KrpcServerService<@Rpc T : Any>( streamId = SINGLE_STREAM_ID, cause = serializedCause, ) - ) + ) { + // do nothing + } } } @@ -351,7 +356,7 @@ internal class KrpcServerService<@Rpc T : Any>( closeReceiving(callId, message, cause, fromJob = false) if (!supportedPlugins.contains(KrpcPlugin.NO_ACK_CANCELLATION)) { - connector.sendMessage( + connector.sendMessageChecked( KrpcGenericMessage( connectionId = null, pluginParams = mapOf( @@ -360,7 +365,9 @@ internal class KrpcServerService<@Rpc T : Any>( KrpcPluginKey.CANCELLATION_ID to callId, ) ) - ) + ) { + // do nothing + } } unsubscribeFromCallMessages(callId) From d6497f6e8bf77e919c18280a38e8e2877711a180 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 15:02:11 +0200 Subject: [PATCH 4/7] yarn.lock update --- docs/environment.md | 6 +++--- kotlin-js-store/wasm/yarn.lock | 7 +------ kotlin-js-store/yarn.lock | 7 +------ 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/docs/environment.md b/docs/environment.md index b2e4e3c8e..d98e62530 100644 --- a/docs/environment.md +++ b/docs/environment.md @@ -392,7 +392,7 @@ Here is a 'simple' guide for solving problems: - Docker - `Cannot connect to the Docker daemon` - open `Docker Desktop` - Kotlin/Js or Kotlin/Wasm - - `kotlinUpgradePackageLock` or `kotlinWasmUpgradePackageLock` (and also `kotlinNpmInstall` or `kotlinWasmNpmInstall`) + - `kotlinUpgradeYarnLock` or `kotlinWasmUpgradeYarnLock` (and also `kotlinNpmInstall` or `kotlinWasmNpmInstall`) have a funny tendency to fail sometimes, and you don't know why. I'll tell you! @@ -403,7 +403,7 @@ Here is a 'simple' guide for solving problems: If something doesn't work, your steps are: - Delete `package-lock.json` file - Delete `/build/js` / `/build/wasm` - - Run `kotlinUpgradePackageLock` / `kotlinWasmUpgradePackageLock` + - Run `kotlinUpgradeYarnLock` / `kotlinWasmUpgradeYarnLock` - If the problem persists: - Check that `/build//.npmrc` AND `/build//.yarnrc` are present - Check that `.yarnrc` contains one line: `registry: "https://packages.jetbrains.team/npm/p/krpc/build-deps/"` @@ -479,7 +479,7 @@ all included builds (not subprojects) must reflect the change. - `checkLegacyAbi` / `updateLegacyAbi` - ABI checks. See https://kotlinlang.org/docs/whatsnew22.html#binary-compatibility-validation-included-in-kotlin-gradle-plugin. Former BCV: https://github.com/Kotlin/binary-compatibility-validator -- `kotlinUpgradePackageLock` / `kotlinWasmUpgradePackageLock` - update [kotlin-js-store](../kotlin-js-store) contents, +- `kotlinUpgradeYarnLock` / `kotlinWasmUpgradeYarnLock` - update [kotlin-js-store](../kotlin-js-store) contents, usually after Kotlin version update. - `updateDocsChangelog` - put modified [CONTRIBUTING.md](../CONTRIBUTING.md) into [topics](pages/kotlinx-rpc/topics) - `detekt` - run detekt checks. diff --git a/kotlin-js-store/wasm/yarn.lock b/kotlin-js-store/wasm/yarn.lock index 022592ea7..46115a1c6 100644 --- a/kotlin-js-store/wasm/yarn.lock +++ b/kotlin-js-store/wasm/yarn.lock @@ -620,12 +620,7 @@ wrappy@1: resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f" integrity sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ== -ws@8.18.0: - version "8.18.0" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc" - integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw== - -ws@^8.18.2: +ws@8.18.3, ws@^8.18.2: version "8.18.3" resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/ws/-/ws-8.18.3.tgz#b56b88abffde62791c639170400c93dcb0c95472" integrity sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg== diff --git a/kotlin-js-store/yarn.lock b/kotlin-js-store/yarn.lock index 3804317ff..58dd6631a 100644 --- a/kotlin-js-store/yarn.lock +++ b/kotlin-js-store/yarn.lock @@ -2572,12 +2572,7 @@ wrappy@1: resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f" integrity sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ== -ws@8.18.0: - version "8.18.0" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc" - integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw== - -ws@^8.18.2: +ws@8.18.3, ws@^8.18.2: version "8.18.3" resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/ws/-/ws-8.18.3.tgz#b56b88abffde62791c639170400c93dcb0c95472" integrity sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg== From 8667afda9853b08203f91fa8859f8ee440187ea1 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 15:44:31 +0200 Subject: [PATCH 5/7] Fix yarn install --- .../src/main/kotlin/util/tasks/npm.kt | 16 +++++++++-- kotlin-js-store/wasm/yarn.lock | 12 ++++---- kotlin-js-store/yarn.lock | 28 +++++++++---------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/gradle-conventions/src/main/kotlin/util/tasks/npm.kt b/gradle-conventions/src/main/kotlin/util/tasks/npm.kt index baa06f54a..eb6b0bce3 100644 --- a/gradle-conventions/src/main/kotlin/util/tasks/npm.kt +++ b/gradle-conventions/src/main/kotlin/util/tasks/npm.kt @@ -10,11 +10,14 @@ import org.gradle.kotlin.dsl.assign import org.gradle.kotlin.dsl.configure import org.gradle.kotlin.dsl.provideDelegate import org.gradle.kotlin.dsl.withType +import org.jetbrains.kotlin.gradle.targets.js.nodejs.NodeJsRootExtension import org.jetbrains.kotlin.gradle.targets.js.yarn.YarnLockMismatchReport import org.jetbrains.kotlin.gradle.targets.js.yarn.YarnPlugin import org.jetbrains.kotlin.gradle.targets.js.yarn.YarnRootEnvSpec +import org.jetbrains.kotlin.gradle.targets.wasm.nodejs.WasmNodeJsRootExtension import org.jetbrains.kotlin.gradle.targets.wasm.yarn.WasmYarnPlugin import org.jetbrains.kotlin.gradle.targets.wasm.yarn.WasmYarnRootEnvSpec +import org.jetbrains.kotlin.gradle.targets.web.nodejs.BaseNodeJsRootExtension import org.jetbrains.kotlin.gradle.targets.web.yarn.BaseYarnRootEnvSpec import org.jetbrains.kotlin.gradle.targets.web.yarn.CommonYarnPlugin import util.other.optionalProperty @@ -25,12 +28,13 @@ import java.io.File private inline fun < reified Plugin : CommonYarnPlugin, reified Spec : BaseYarnRootEnvSpec, + reified NodeJsExtension: BaseNodeJsRootExtension, > Project.registerCustomNpmTasks( target: String, useProxy: Boolean, ) { val capitalizedTarget = target.replaceFirstChar { it.titlecase() } - tasks.register("execute${capitalizedTarget}NpmLogin") { + val login = tasks.register("execute${capitalizedTarget}NpmLogin") { if (!useProxyRepositories) { return@register } @@ -82,6 +86,12 @@ private inline fun < downloadBaseUrl = "https://packages.jetbrains.team/files/p/krpc/build-deps/" } } + + extensions.configure { + npmInstallTaskProvider.configure { + dependsOn(login) + } + } } } @@ -100,8 +110,8 @@ fun Project.configureNpm() { val kotlinMasterBuild by optionalProperty() val useProxy = useProxyRepositories - registerCustomNpmTasks("js", useProxy) - registerCustomNpmTasks("wasm", useProxy) + registerCustomNpmTasks("js", useProxy) + registerCustomNpmTasks("wasm", useProxy) // necessary for CI js tests rootProject.plugins.withType { diff --git a/kotlin-js-store/wasm/yarn.lock b/kotlin-js-store/wasm/yarn.lock index 46115a1c6..feb67de01 100644 --- a/kotlin-js-store/wasm/yarn.lock +++ b/kotlin-js-store/wasm/yarn.lock @@ -35,9 +35,9 @@ integrity sha512-C5Mc6rdnsaJDjO3UpGW/CQTHtCKaYlScZTly4JIu97Jxo/odCiH0ITnDXSJPTOrEKk/ycSZ0AOgTmkDtkOsvIA== "@types/node@*": - version "24.5.0" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/@types/node/-/node-24.5.0.tgz#70a482e6b1d50e603729d74e62a9a43705ddc9d7" - integrity sha512-y1dMvuvJspJiPSDZUQ+WMBvF7dpnEqN4x9DDC9ie5Fs/HUZJA3wFp7EhHoVaKX/iI0cRoECV8X2jL8zi0xrHCg== + version "24.5.1" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/@types/node/-/node-24.5.1.tgz#dab6917c47113eb4502d27d06e89a407ec0eff95" + integrity sha512-/SQdmUP2xa+1rdx7VwB9yPq8PaKej8TD5cQ+XfKDPWWC+VDJU4rvVVagXqKUzhKjtFoNA8rXDJAkCxQPAe00+Q== dependencies: undici-types "~7.12.0" @@ -83,9 +83,9 @@ b4a@^1.6.4: integrity sha512-ZovbrBV0g6JxK5cGUF1Suby1vLfKjv4RWi8IxoaO/Mon8BDD9I21RxjHFtgQ+kskJqLAVyQZly3uMBui+vhc8Q== bare-events@^2.2.0, bare-events@^2.5.4: - version "2.6.1" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/bare-events/-/bare-events-2.6.1.tgz#f793b28bdc3dcf147d7cf01f882a6f0b12ccc4a2" - integrity sha512-AuTJkq9XmE6Vk0FJVNq5QxETrSA/vKHarWVBG5l/JbdCL1prJemiyJqUS0jrlXO0MftuPq4m3YVYhoNc5+aE/g== + version "2.7.0" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/bare-events/-/bare-events-2.7.0.tgz#46596dae9c819c5891eb2dcc8186326ed5a6da54" + integrity sha512-b3N5eTW1g7vXkw+0CXh/HazGTcO5KYuu/RCNaJbDMPI6LHDi+7qe8EmxKUVe1sUbY2KZOVZFyj62x0OEz9qyAA== bare-fs@^4.0.1: version "4.4.4" diff --git a/kotlin-js-store/yarn.lock b/kotlin-js-store/yarn.lock index 58dd6631a..148ac45eb 100644 --- a/kotlin-js-store/yarn.lock +++ b/kotlin-js-store/yarn.lock @@ -134,9 +134,9 @@ integrity sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA== "@types/node@*", "@types/node@>=10.0.0": - version "24.5.0" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/@types/node/-/node-24.5.0.tgz#70a482e6b1d50e603729d74e62a9a43705ddc9d7" - integrity sha512-y1dMvuvJspJiPSDZUQ+WMBvF7dpnEqN4x9DDC9ie5Fs/HUZJA3wFp7EhHoVaKX/iI0cRoECV8X2jL8zi0xrHCg== + version "24.5.1" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/@types/node/-/node-24.5.1.tgz#dab6917c47113eb4502d27d06e89a407ec0eff95" + integrity sha512-/SQdmUP2xa+1rdx7VwB9yPq8PaKej8TD5cQ+XfKDPWWC+VDJU4rvVVagXqKUzhKjtFoNA8rXDJAkCxQPAe00+Q== dependencies: undici-types "~7.12.0" @@ -393,9 +393,9 @@ balanced-match@^1.0.0: integrity sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw== bare-events@^2.2.0, bare-events@^2.5.4: - version "2.6.1" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/bare-events/-/bare-events-2.6.1.tgz#f793b28bdc3dcf147d7cf01f882a6f0b12ccc4a2" - integrity sha512-AuTJkq9XmE6Vk0FJVNq5QxETrSA/vKHarWVBG5l/JbdCL1prJemiyJqUS0jrlXO0MftuPq4m3YVYhoNc5+aE/g== + version "2.7.0" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/bare-events/-/bare-events-2.7.0.tgz#46596dae9c819c5891eb2dcc8186326ed5a6da54" + integrity sha512-b3N5eTW1g7vXkw+0CXh/HazGTcO5KYuu/RCNaJbDMPI6LHDi+7qe8EmxKUVe1sUbY2KZOVZFyj62x0OEz9qyAA== bare-fs@^4.0.1: version "4.4.4" @@ -810,9 +810,9 @@ ee-first@1.1.1: integrity sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow== electron-to-chromium@^1.5.218: - version "1.5.218" - resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/electron-to-chromium/-/electron-to-chromium-1.5.218.tgz#921042a011a98a4620853c9d391ab62bcc124400" - integrity sha512-uwwdN0TUHs8u6iRgN8vKeWZMRll4gBkz+QMqdS7DDe49uiK68/UX92lFb61oiFPrpYZNeZIqa4bA7O6Aiasnzg== + version "1.5.220" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/electron-to-chromium/-/electron-to-chromium-1.5.220.tgz#a9853fa5edcf51f4c7db369144377cf31d783b8f" + integrity sha512-TWXijEwR1ggr4BdAKrb1nMNqYLTx1/4aD1fkeZU+FVJGTKu53/T7UyHKXlqEX3Ub02csyHePbHmkvnrjcaYzMA== emoji-regex@^8.0.0: version "8.0.0" @@ -858,7 +858,7 @@ engine.io@~6.6.0: enhanced-resolve@^5.17.2: version "5.18.3" - resolved "https://registry.yarnpkg.com/enhanced-resolve/-/enhanced-resolve-5.18.3.tgz#9b5f4c5c076b8787c78fe540392ce76a88855b44" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/enhanced-resolve/-/enhanced-resolve-5.18.3.tgz#9b5f4c5c076b8787c78fe540392ce76a88855b44" integrity sha512-d4lC8xfavMeBjzGr2vECC3fsGXziXZQyJxD868h2M/mBI3PwAuODxAkLkq5HYuvrPYcUtiLzsTo8U3PgX3Ocww== dependencies: graceful-fs "^4.2.4" @@ -1486,7 +1486,7 @@ karma-webpack@5.0.1: karma@6.4.4: version "6.4.4" - resolved "https://registry.yarnpkg.com/karma/-/karma-6.4.4.tgz#dfa5a426cf5a8b53b43cd54ef0d0d09742351492" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/karma/-/karma-6.4.4.tgz#dfa5a426cf5a8b53b43cd54ef0d0d09742351492" integrity sha512-LrtUxbdvt1gOpo3gxG+VAJlJAEMhbWlM4YrFQgql98FwF7+K8K12LYO4hnDdUkNjeztYrOXEMqgTajSWgmtI/w== dependencies: "@colors/colors" "1.5.0" @@ -1521,7 +1521,7 @@ kind-of@^6.0.2: kotlin-web-helpers@2.1.0: version "2.1.0" - resolved "https://registry.yarnpkg.com/kotlin-web-helpers/-/kotlin-web-helpers-2.1.0.tgz#6cd4b0f0dc3baea163929c8638155b8d19c55a74" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/kotlin-web-helpers/-/kotlin-web-helpers-2.1.0.tgz#6cd4b0f0dc3baea163929c8638155b8d19c55a74" integrity sha512-NAJhiNB84tnvJ5EQx7iER3GWw7rsTZkX9HVHZpe7E3dDBD/dhTzqgSwNU3MfQjniy2rB04bP24WM9Z32ntUWRg== dependencies: format-util "^1.0.5" @@ -1654,7 +1654,7 @@ mkdirp@^0.5.5: mocha@11.7.1: version "11.7.1" - resolved "https://registry.yarnpkg.com/mocha/-/mocha-11.7.1.tgz#91948fecd624fb4bd154ed260b7e1ad3910d7c7a" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/mocha/-/mocha-11.7.1.tgz#91948fecd624fb4bd154ed260b7e1ad3910d7c7a" integrity sha512-5EK+Cty6KheMS/YLPPMJC64g5V61gIR25KsRItHw6x4hEKT6Njp1n9LOlH4gpevuwMVS66SXaBBpg+RWZkza4A== dependencies: browser-stdout "^1.3.1" @@ -2496,7 +2496,7 @@ webpack-sources@^3.3.3: webpack@5.100.2: version "5.100.2" - resolved "https://registry.yarnpkg.com/webpack/-/webpack-5.100.2.tgz#e2341facf9f7de1d702147c91bcb65b693adf9e8" + resolved "https://packages.jetbrains.team/npm/p/krpc/build-deps/webpack/-/webpack-5.100.2.tgz#e2341facf9f7de1d702147c91bcb65b693adf9e8" integrity sha512-QaNKAvGCDRh3wW1dsDjeMdDXwZm2vqq3zn6Pvq4rHOEOGSaUMgOOjG2Y9ZbIGzpfkJk9ZYTHpDqgDfeBDcnLaw== dependencies: "@types/eslint-scope" "^3.7.7" From d3e8fda7924e1aa25c13e5bdc13bca4a99a4c5c0 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 15:46:16 +0200 Subject: [PATCH 6/7] Fixed compiler tests --- .../src/testData/box/customParameterTypes.kt | 3 +++ tests/compiler-plugin-tests/src/testData/box/flowParameter.kt | 2 ++ tests/compiler-plugin-tests/src/testData/box/multiModule.kt | 2 ++ tests/compiler-plugin-tests/src/testData/box/simple.kt | 2 ++ .../src/testData/diagnostics/checkedAnnotation.kt | 3 +++ .../src/testData/diagnostics/rpcChecked.kt | 3 +++ .../src/testData/diagnostics/rpcService.kt | 3 +++ .../src/testData/diagnostics/strictMode.kt | 3 +++ 8 files changed, 21 insertions(+) diff --git a/tests/compiler-plugin-tests/src/testData/box/customParameterTypes.kt b/tests/compiler-plugin-tests/src/testData/box/customParameterTypes.kt index 9243a1b15..fea075898 100644 --- a/tests/compiler-plugin-tests/src/testData/box/customParameterTypes.kt +++ b/tests/compiler-plugin-tests/src/testData/box/customParameterTypes.kt @@ -25,3 +25,6 @@ fun box(): String = runBlocking { if (test1 == "call_42" && test2 == "call_42") "OK" else "Fail: test1=$test1, test2=$test2" } + +/* GENERATED_FIR_TAGS: classDeclaration, data, functionDeclaration, interfaceDeclaration, primaryConstructor, +propertyDeclaration, suspend */ diff --git a/tests/compiler-plugin-tests/src/testData/box/flowParameter.kt b/tests/compiler-plugin-tests/src/testData/box/flowParameter.kt index 374c8f531..7a77b2275 100644 --- a/tests/compiler-plugin-tests/src/testData/box/flowParameter.kt +++ b/tests/compiler-plugin-tests/src/testData/box/flowParameter.kt @@ -20,3 +20,5 @@ fun box(): String = runBlocking { if (result == "call_42") "OK" else "Fail: $result" } + +/* GENERATED_FIR_TAGS: functionDeclaration, interfaceDeclaration, suspend */ diff --git a/tests/compiler-plugin-tests/src/testData/box/multiModule.kt b/tests/compiler-plugin-tests/src/testData/box/multiModule.kt index 4d3e59a2d..5b5728408 100644 --- a/tests/compiler-plugin-tests/src/testData/box/multiModule.kt +++ b/tests/compiler-plugin-tests/src/testData/box/multiModule.kt @@ -24,3 +24,5 @@ fun box(): String = runBlocking { if (result == "call_42") "OK" else "Fail: $result" } + +/* GENERATED_FIR_TAGS: functionDeclaration, interfaceDeclaration, suspend */ diff --git a/tests/compiler-plugin-tests/src/testData/box/simple.kt b/tests/compiler-plugin-tests/src/testData/box/simple.kt index 1b67ecdf4..cdf8506ff 100644 --- a/tests/compiler-plugin-tests/src/testData/box/simple.kt +++ b/tests/compiler-plugin-tests/src/testData/box/simple.kt @@ -19,3 +19,5 @@ fun box(): String = runBlocking { if (result == "call_42") "OK" else "Fail: $result" } + +/* GENERATED_FIR_TAGS: functionDeclaration, interfaceDeclaration, suspend */ diff --git a/tests/compiler-plugin-tests/src/testData/diagnostics/checkedAnnotation.kt b/tests/compiler-plugin-tests/src/testData/diagnostics/checkedAnnotation.kt index 9cd4d5c97..d365c6f3f 100644 --- a/tests/compiler-plugin-tests/src/testData/diagnostics/checkedAnnotation.kt +++ b/tests/compiler-plugin-tests/src/testData/diagnostics/checkedAnnotation.kt @@ -306,3 +306,6 @@ fun unknownTypeFail9(arg: T) { fun unknownTypeFail10(arg: T) { deeplyNestedTwoWithArgCheckedOk(GenericClass(GenericClass(CheckedClass(arg)))) } + +/* GENERATED_FIR_TAGS: annotationDeclaration, classDeclaration, functionDeclaration, nullableType, primaryConstructor, +starProjection, typeConstraint, typeParameter */ diff --git a/tests/compiler-plugin-tests/src/testData/diagnostics/rpcChecked.kt b/tests/compiler-plugin-tests/src/testData/diagnostics/rpcChecked.kt index fac0c9740..d11536446 100644 --- a/tests/compiler-plugin-tests/src/testData/diagnostics/rpcChecked.kt +++ b/tests/compiler-plugin-tests/src/testData/diagnostics/rpcChecked.kt @@ -59,3 +59,6 @@ class WrongGrpcTarget @Rpc class WrongRpcTarget + +/* GENERATED_FIR_TAGS: annotationDeclaration, annotationUseSiteTargetFile, classDeclaration, classReference, +functionDeclaration, inline, interfaceDeclaration, lambdaLiteral, reified, suspend, typeConstraint, typeParameter */ diff --git a/tests/compiler-plugin-tests/src/testData/diagnostics/rpcService.kt b/tests/compiler-plugin-tests/src/testData/diagnostics/rpcService.kt index aa14e3427..e5db6e328 100644 --- a/tests/compiler-plugin-tests/src/testData/diagnostics/rpcService.kt +++ b/tests/compiler-plugin-tests/src/testData/diagnostics/rpcService.kt @@ -38,3 +38,6 @@ interface MyServiceT @Rpc interface MyServiceT2 + +/* GENERATED_FIR_TAGS: annotationUseSiteTargetFile, classReference, functionDeclaration, interfaceDeclaration, +nullableType, suspend, typeConstraint, typeParameter */ diff --git a/tests/compiler-plugin-tests/src/testData/diagnostics/strictMode.kt b/tests/compiler-plugin-tests/src/testData/diagnostics/strictMode.kt index 63293be73..17353ed18 100644 --- a/tests/compiler-plugin-tests/src/testData/diagnostics/strictMode.kt +++ b/tests/compiler-plugin-tests/src/testData/diagnostics/strictMode.kt @@ -66,3 +66,6 @@ interface MyService { fun nonSuspendNoFlowString(): String suspend fun complex(filter: ComplexFilter): String // doesn't fail on circular dependency } + +/* GENERATED_FIR_TAGS: annotationUseSiteTargetFile, classDeclaration, classReference, data, functionDeclaration, +interfaceDeclaration, nullableType, primaryConstructor, propertyDeclaration, suspend, typeParameter */ From 4ac9ef98fae68e855c03fda0094aeb7017583473 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 17 Sep 2025 16:11:47 +0200 Subject: [PATCH 7/7] Fix flaky tests --- .../kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt | 7 +++++++ .../kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt | 4 ++-- .../krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt | 6 +++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt index 394e25699..ddbeeef3f 100644 --- a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt +++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt @@ -9,6 +9,7 @@ package kotlinx.rpc.krpc.ktor import io.ktor.client.* import io.ktor.client.engine.cio.* import io.ktor.client.plugins.HttpRequestRetry +import io.ktor.client.plugins.HttpTimeout import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.server.application.* @@ -188,6 +189,9 @@ class KtorTransportTest { retryOnServerErrors(maxRetries = 5) exponentialDelay() } + install(HttpTimeout) { + requestTimeoutMillis = 10000 + } installKrpc { serialization { @@ -296,6 +300,9 @@ class KtorTransportTest { retryOnServerErrors(maxRetries = 5) exponentialDelay() } + install(HttpTimeout) { + requestTimeoutMillis = 10000 + } installKrpc { serialization { diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt index 4297462c6..6b1e46bc6 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt @@ -90,7 +90,7 @@ class BackPressureTest : BackPressureTestBase() { abstract class BackPressureTestBase { protected fun runServerTest( perCallBufferSize: Int, - timeout: Duration = 30.seconds, + timeout: Duration = 120.seconds, ) = runTest(perCallBufferSize, timeout) { service, impl -> var counter = 0 val flowList = async { @@ -123,7 +123,7 @@ abstract class BackPressureTestBase { protected fun runClientTest( perCallBufferSize: Int, - timeout: Duration = 30.seconds, + timeout: Duration = 120.seconds, ) = runTest(perCallBufferSize, timeout) { service, impl -> var counter = 0 val flowList = async { diff --git a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt index 649a7ee44..5dc70aad3 100644 --- a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt +++ b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt @@ -104,7 +104,7 @@ abstract class KrpcProtocolCompatibilityTestsBase { private fun testOldClientWithNewServer( perCallBufferSize: Int = 1, - timeout: Duration = 10.seconds, + timeout: Duration = 60.seconds, exclude: List, body: suspend TestEnv.(CompatService, CompatServiceImpl) -> Unit, ) = runTest(Role.Client, exclude, timeout) { @@ -117,7 +117,7 @@ abstract class KrpcProtocolCompatibilityTestsBase { private fun testOldServersWithNewClient( perCallBufferSize: Int = 1, - timeout: Duration = 10.seconds, + timeout: Duration = 60.seconds, exclude: List, body: suspend TestEnv.(CompatService, CompatServiceImpl) -> Unit, ) = runTest(Role.Server, exclude, timeout) { @@ -130,7 +130,7 @@ abstract class KrpcProtocolCompatibilityTestsBase { protected fun matrixTest( perCallBufferSize: Int = 1, - timeout: Duration = 10.seconds, + timeout: Duration = 60.seconds, exclude: List = emptyList(), body: suspend TestEnv.(CompatService, CompatServiceImpl) -> Unit, ): Stream {