Skip to content

Commit e713b37

Browse files
committed
Added lincheck tests template
1 parent f4908ff commit e713b37

File tree

8 files changed

+244
-96
lines changed

8 files changed

+244
-96
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,11 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
221221
message.errorMessage
222222
}
223223

224-
serverSupportedPlugins.completeExceptionally(
225-
IllegalStateException("Server failed to process protocol message: ${message.failedMessage}")
226-
)
224+
if (!serverSupportedPlugins.isCompleted) {
225+
serverSupportedPlugins.completeExceptionally(
226+
IllegalStateException("Server failed to process protocol message: ${message.failedMessage}")
227+
)
228+
}
227229
}
228230
}
229231
}

krpc/krpc-test/build.gradle.kts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode
66
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
77
import org.jetbrains.kotlin.gradle.targets.js.testing.KotlinJsTest
88
import org.jetbrains.kotlin.gradle.targets.jvm.tasks.KotlinJvmTest
9-
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
109
import java.nio.file.Files
1110

1211
plugins {
@@ -71,13 +70,19 @@ kotlin {
7170
implementation(libs.slf4j.api)
7271
implementation(libs.logback.classic)
7372
implementation(libs.coroutines.debug)
73+
implementation(libs.lincheck)
7474
}
7575
}
7676
}
7777

7878
explicitApi = ExplicitApiMode.Disabled
7979
}
8080

81+
tasks.jvmTest {
82+
// lincheck agent
83+
jvmArgs("-XX:+EnableDynamicAgentLoading")
84+
}
85+
8186
tasks.withType<KotlinJvmTest> {
8287
environment("LIBRARY_VERSION", libs.versions.kotlinx.rpc.get())
8388
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.test
6+
7+
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.coroutines.cancelAndJoin
9+
import kotlinx.rpc.krpc.rpcClientConfig
10+
import kotlinx.rpc.krpc.rpcServerConfig
11+
import kotlinx.rpc.krpc.serialization.json.json
12+
import kotlinx.rpc.registerService
13+
import kotlinx.rpc.withService
14+
import kotlin.coroutines.CoroutineContext
15+
16+
abstract class BaseServiceTest {
17+
class Env(
18+
val service: TestService,
19+
val impl: TestServiceImpl,
20+
val client: KrpcTestClient,
21+
val server: KrpcTestServer,
22+
val transport: LocalTransport,
23+
testScope: CoroutineScope,
24+
) : CoroutineScope by testScope
25+
26+
protected suspend fun CoroutineScope.runServiceTest(
27+
parentContext: CoroutineContext,
28+
perCallBufferSize: Int = 100,
29+
body: suspend Env.() -> Unit,
30+
) {
31+
val transport = LocalTransport(parentContext, recordTimestamps = false)
32+
33+
val clientConfig = rpcClientConfig {
34+
serialization {
35+
json()
36+
}
37+
38+
connector {
39+
this.perCallBufferSize = perCallBufferSize
40+
}
41+
}
42+
43+
val serverConfig = rpcServerConfig {
44+
serialization {
45+
json()
46+
}
47+
48+
connector {
49+
this.perCallBufferSize = perCallBufferSize
50+
}
51+
}
52+
53+
val client = KrpcTestClient(clientConfig, transport.client)
54+
val service = client.withService<TestService>()
55+
56+
val server = KrpcTestServer(serverConfig, transport.server)
57+
val impl = TestServiceImpl()
58+
server.registerService<TestService> { impl }
59+
60+
val env = Env(service, impl, client, server, transport, this)
61+
body(env)
62+
63+
client.close()
64+
server.close()
65+
client.awaitCompletion()
66+
server.awaitCompletion()
67+
transport.coroutineContext.cancelAndJoin()
68+
}
69+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.test
6+
7+
import ch.qos.logback.classic.Level
8+
import ch.qos.logback.classic.spi.ILoggingEvent
9+
import ch.qos.logback.core.AppenderBase
10+
11+
class TestLogAppender : AppenderBase<ILoggingEvent>() {
12+
init {
13+
start()
14+
}
15+
16+
val events = mutableListOf<ILoggingEvent>()
17+
val errors get() = events.filter { it.level == Level.ERROR }
18+
val warnings get() = events.filter { it.level == Level.WARN }
19+
20+
override fun append(eventObject: ILoggingEvent) {
21+
events.add(eventObject)
22+
}
23+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.test
6+
7+
import kotlinx.atomicfu.atomic
8+
import kotlinx.coroutines.flow.Flow
9+
import kotlinx.coroutines.flow.asFlow
10+
import kotlinx.coroutines.flow.toList
11+
import kotlinx.coroutines.flow.zip
12+
import kotlinx.rpc.annotations.Rpc
13+
14+
@Rpc
15+
interface TestService {
16+
suspend fun unary(n: Int): Int
17+
fun serverStreaming(num: Int): Flow<Int>
18+
suspend fun clientStreaming(n: Flow<Int>): Int
19+
fun bidiStreaming(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int>
20+
}
21+
22+
class TestServiceImpl : TestService {
23+
val unaryInvocations = atomic(0)
24+
val serverStreamingInvocations = atomic(0)
25+
val clientStreamingInvocations = atomic(0)
26+
val bidiStreamingInvocations = atomic(0)
27+
28+
override suspend fun unary(n: Int): Int {
29+
unaryInvocations.incrementAndGet()
30+
return n
31+
}
32+
33+
override fun serverStreaming(num: Int): Flow<Int> {
34+
serverStreamingInvocations.incrementAndGet()
35+
return (1..num).asFlow()
36+
}
37+
38+
override suspend fun clientStreaming(n: Flow<Int>): Int {
39+
clientStreamingInvocations.incrementAndGet()
40+
return n.toList().sum()
41+
}
42+
43+
override fun bidiStreaming(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
44+
bidiStreamingInvocations.incrementAndGet()
45+
return flow1.zip(flow2) { a, b -> a + b }
46+
}
47+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.test.lincheck
6+
7+
import ch.qos.logback.classic.Logger
8+
import kotlinx.coroutines.ExecutorCoroutineDispatcher
9+
import kotlinx.coroutines.asCoroutineDispatcher
10+
import kotlinx.coroutines.launch
11+
import kotlinx.coroutines.runBlocking
12+
import kotlinx.rpc.krpc.test.BaseServiceTest
13+
import kotlinx.rpc.krpc.test.TestLogAppender
14+
import org.jetbrains.lincheck.Lincheck
15+
import org.jetbrains.lincheck.datastructures.CTestConfiguration
16+
import org.slf4j.LoggerFactory
17+
import java.util.concurrent.Executors
18+
import kotlin.test.Ignore
19+
import kotlin.test.Test
20+
import kotlin.test.assertEquals
21+
import kotlin.test.assertTrue
22+
import kotlin.test.fail
23+
24+
class LincheckTest : BaseLincheckTest() {
25+
@Test
26+
@Ignore("Has some lincheck issues, waiting for a fix from the team")
27+
fun simpleConcurrentRequests() = runTest { testLog ->
28+
launch {
29+
assertEquals(1, service.unary(1))
30+
}
31+
32+
launch {
33+
assertEquals(2, service.unary(2))
34+
}
35+
36+
assertTrue(testLog.warnings.isEmpty())
37+
assertTrue(testLog.errors.isEmpty())
38+
}
39+
}
40+
41+
abstract class BaseLincheckTest() : BaseServiceTest() {
42+
fun createDispatcher(nThreads: Int): ExecutorCoroutineDispatcher = Executors
43+
.newFixedThreadPool(nThreads)
44+
.asCoroutineDispatcher()
45+
46+
protected fun runTest(
47+
shouldFail: Boolean = false,
48+
invocations: Int = CTestConfiguration.DEFAULT_INVOCATIONS,
49+
nThreads: Int = Runtime.getRuntime().availableProcessors(),
50+
perCallBufferSize: Int = 100,
51+
block: suspend Env.(TestLogAppender) -> Unit,
52+
) {
53+
val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
54+
val testAppender = root.getAppender("TEST") as TestLogAppender
55+
testAppender.events.clear()
56+
57+
val result = runCatching {
58+
Lincheck.runConcurrentTest(invocations) {
59+
createDispatcher(nThreads).use { dispatcher ->
60+
runBlocking(dispatcher) {
61+
runServiceTest(coroutineContext, perCallBufferSize) {
62+
block(testAppender)
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
testAppender.events.clear()
70+
71+
if (result.isFailure != shouldFail) {
72+
val exceptionOrNull = result.exceptionOrNull()
73+
val message = if (shouldFail) {
74+
"Should've failed but succeeded"
75+
} else {
76+
"Should've succeeded but failed"
77+
}
78+
79+
fail(message, exceptionOrNull)
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)