Skip to content

Commit d36ae98

Browse files
committed
Tests:
- Move stressActing into a separate test suite - and logging for fastProducer - relax iterations count for fastProducer and stressActing
1 parent 059aaf9 commit d36ae98

File tree

5 files changed

+148
-98
lines changed

5 files changed

+148
-98
lines changed

krpc/krpc-core/build.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,10 @@ kotlin {
5151
tasks.withType<KotlinJvmTest> {
5252
// lincheck agent
5353
jvmArgs("-XX:+EnableDynamicAgentLoading")
54+
55+
if (project.hasProperty("stressTests") && project.property("stressTests") == "true") {
56+
include("kotlinx/rpc/krpc/stress/**")
57+
} else {
58+
exclude("kotlinx/rpc/krpc/stress/**")
59+
}
5460
}

krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.kt

Lines changed: 0 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -107,101 +107,6 @@ internal class KrpcReceiveHandlerTest : KrpcReceiveHandlerBaseTest() {
107107
val message2 = (channel.receive() as KrpcCallMessage.CallException).cause.deserialize().message.orEmpty()
108108
assertTrue(message2.contains("1 messages were unprocessed"), message2)
109109
}
110-
111-
@OptIn(ExperimentalCoroutinesApi::class)
112-
@Test
113-
fun stressActing() {
114-
val actorJob = Job()
115-
val collected = mutableListOf<KrpcMessage>()
116-
val bufferSize = stressBufferSize
117-
118-
runActingTest(
119-
callTimeOut = 10.seconds,
120-
bufferSize = bufferSize,
121-
callHandler = { collected.add(it) },
122-
timeout = 360.seconds,
123-
) { acting ->
124-
val sendChannel = Channel<KrpcTransportMessage>(Channel.UNLIMITED)
125-
val sender = KrpcSendHandler(sendChannel)
126-
sender.updateWindowSize(bufferSize)
127-
128-
val windowJob = launch {
129-
while (true) {
130-
val window = when (val message = channel.receive()) {
131-
is KrpcCallMessage.CallException -> fail(
132-
"Unexpected call exception",
133-
message.cause.deserialize()
134-
)
135-
136-
is KrpcGenericMessage -> decodeWindow(message)
137-
else -> fail("Unexpected message: $message")
138-
}
139-
140-
sender.updateWindowSize((window as WindowResult.Success).update)
141-
}
142-
}
143-
144-
val senderJob = launch {
145-
while (true) {
146-
val message = sendChannel.receive() as KrpcTransportMessage.StringMessage
147-
148-
acting.handle(message.value.asCallMessage("1")) {
149-
fail(
150-
"Unexpected onMessageFailure call, " +
151-
"window: ${sender.window}, collected: ${collected.size}\"",
152-
it
153-
)
154-
}.onFailure {
155-
fail(
156-
"Unexpected onFailure call, " +
157-
"window: ${sender.window}, collected: ${collected.size}"
158-
)
159-
}.onClosed {
160-
fail(
161-
"Unexpected onClosed call, " +
162-
"window: ${sender.window}, collected: ${collected.size}\"",
163-
it
164-
)
165-
}
166-
}
167-
}
168-
169-
val counter = Counter()
170-
val printJob = launch {
171-
while (true) {
172-
withContext(Dispatchers.Default) {
173-
delay(5.seconds)
174-
}
175-
println(
176-
"Collected: ${collected.size}, " +
177-
"launches: ${counter.launches.value}, " +
178-
"total: ${counter.total.value}"
179-
)
180-
}
181-
}
182-
183-
val iterations = stressIterations
184-
List(iterations) {
185-
launch {
186-
repeat(100) {
187-
sender.sendMessage(KrpcTransportMessage.StringMessage("Hello"))
188-
counter.total.incrementAndGet()
189-
}
190-
counter.launches.incrementAndGet()
191-
}
192-
}.joinAll()
193-
194-
while (!buffer.channel.isEmpty && sender.window != bufferSize) {
195-
yield()
196-
}
197-
198-
assertEquals(iterations * 100, collected.size)
199-
actorJob.cancelAndJoin()
200-
senderJob.cancelAndJoin()
201-
windowJob.cancelAndJoin()
202-
printJob.cancelAndJoin()
203-
}
204-
}
205110
}
206111

207112
internal abstract class KrpcReceiveHandlerBaseTest {
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.rpc.krpc.stress
6+
7+
import kotlinx.coroutines.Dispatchers
8+
import kotlinx.coroutines.ExperimentalCoroutinesApi
9+
import kotlinx.coroutines.Job
10+
import kotlinx.coroutines.cancelAndJoin
11+
import kotlinx.coroutines.channels.Channel
12+
import kotlinx.coroutines.delay
13+
import kotlinx.coroutines.joinAll
14+
import kotlinx.coroutines.launch
15+
import kotlinx.coroutines.withContext
16+
import kotlinx.coroutines.yield
17+
import kotlinx.rpc.krpc.KrpcReceiveHandlerBaseTest
18+
import kotlinx.rpc.krpc.KrpcTransportMessage
19+
import kotlinx.rpc.krpc.internal.KrpcCallMessage
20+
import kotlinx.rpc.krpc.internal.KrpcGenericMessage
21+
import kotlinx.rpc.krpc.internal.KrpcMessage
22+
import kotlinx.rpc.krpc.internal.KrpcSendHandler
23+
import kotlinx.rpc.krpc.internal.WindowResult
24+
import kotlinx.rpc.krpc.internal.decodeWindow
25+
import kotlinx.rpc.krpc.internal.deserialize
26+
import kotlinx.rpc.krpc.internal.onClosed
27+
import kotlinx.rpc.krpc.internal.onFailure
28+
import kotlinx.rpc.krpc.stressBufferSize
29+
import kotlinx.rpc.krpc.stressIterations
30+
import kotlin.test.Test
31+
import kotlin.test.assertEquals
32+
import kotlin.test.fail
33+
import kotlin.time.Duration.Companion.minutes
34+
import kotlin.time.Duration.Companion.seconds
35+
36+
internal class KrpcReceiveHandlerStressTest : KrpcReceiveHandlerBaseTest() {
37+
@OptIn(ExperimentalCoroutinesApi::class)
38+
@Test
39+
fun stressActing() {
40+
val actorJob = Job()
41+
val collected = mutableListOf<KrpcMessage>()
42+
val bufferSize = stressBufferSize
43+
44+
runActingTest(
45+
callTimeOut = 10.seconds,
46+
bufferSize = bufferSize,
47+
callHandler = { collected.add(it) },
48+
timeout = 10.minutes,
49+
) { acting ->
50+
val sendChannel = Channel<KrpcTransportMessage>(Channel.UNLIMITED)
51+
val sender = KrpcSendHandler(sendChannel)
52+
sender.updateWindowSize(bufferSize)
53+
54+
val windowJob = launch {
55+
while (true) {
56+
val window = when (val message = channel.receive()) {
57+
is KrpcCallMessage.CallException -> fail(
58+
"Unexpected call exception",
59+
message.cause.deserialize()
60+
)
61+
62+
is KrpcGenericMessage -> decodeWindow(message)
63+
else -> fail("Unexpected message: $message")
64+
}
65+
66+
sender.updateWindowSize((window as WindowResult.Success).update)
67+
}
68+
}
69+
70+
val senderJob = launch {
71+
while (true) {
72+
val message = sendChannel.receive() as KrpcTransportMessage.StringMessage
73+
74+
acting.handle(message.value.asCallMessage("1")) {
75+
fail(
76+
"Unexpected onMessageFailure call, " +
77+
"window: ${sender.window}, collected: ${collected.size}\"",
78+
it
79+
)
80+
}.onFailure {
81+
fail(
82+
"Unexpected onFailure call, " +
83+
"window: ${sender.window}, collected: ${collected.size}"
84+
)
85+
}.onClosed {
86+
fail(
87+
"Unexpected onClosed call, " +
88+
"window: ${sender.window}, collected: ${collected.size}\"",
89+
it
90+
)
91+
}
92+
}
93+
}
94+
95+
val counter = Counter()
96+
val printJob = launch {
97+
while (true) {
98+
withContext(Dispatchers.Default) {
99+
delay(5.seconds)
100+
}
101+
println(
102+
"Collected: ${collected.size}, " +
103+
"launches: ${counter.launches.value}, " +
104+
"total: ${counter.total.value}"
105+
)
106+
}
107+
}
108+
109+
val iterations = stressIterations
110+
List(iterations) {
111+
launch {
112+
repeat(100) {
113+
sender.sendMessage(KrpcTransportMessage.StringMessage("Hello"))
114+
counter.total.incrementAndGet()
115+
}
116+
counter.launches.incrementAndGet()
117+
}
118+
}.joinAll()
119+
120+
while (!buffer.channel.isEmpty && sender.window != bufferSize) {
121+
yield()
122+
}
123+
124+
assertEquals(iterations * 100, collected.size)
125+
actorJob.cancelAndJoin()
126+
senderJob.cancelAndJoin()
127+
windowJob.cancelAndJoin()
128+
printJob.cancelAndJoin()
129+
}
130+
}
131+
}

krpc/krpc-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.jvm.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44

55
package kotlinx.rpc.krpc
66

7-
internal actual val stressIterations: Int = 10_000
8-
internal actual val stressBufferSize: Int = 500
7+
internal actual val stressIterations: Int = 8_000
8+
internal actual val stressBufferSize: Int = 1000

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import kotlinx.coroutines.flow.toList
1313
import kotlinx.coroutines.joinAll
1414
import kotlinx.coroutines.launch
1515
import org.junit.jupiter.api.TestFactory
16+
import org.slf4j.Logger
17+
import org.slf4j.LoggerFactory
1618
import kotlin.test.assertEquals
1719
import kotlin.time.Duration.Companion.seconds
1820

@@ -139,17 +141,23 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
139141

140142
@TestFactory
141143
fun fastProducer() = matrixTest(timeout = 60.seconds) { service, impl ->
144+
val root = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME) as ch.qos.logback.classic.Logger
145+
142146
val async = async {
143147
service.fastServerProduce(1000).map {
144148
// long produce
145149
impl.entered.complete(Unit)
146150
impl.fence.await()
151+
root.info("Consumed $it")
147152
it * it
148153
}.toList()
149154
}
150155

151156
impl.entered.await()
152-
repeat(10_000) {
157+
repeat(5_000) {
158+
if (it % 10 == 0) {
159+
root.info("Parallel iteration #$it")
160+
}
153161
assertEquals(1, service.unary(1))
154162
assertEquals(55, service.serverStreaming(10).toList().sum())
155163
}

0 commit comments

Comments
 (0)