Skip to content

Commit d22358d

Browse files
committed
Added better debug logs, removed multiple client streams from several tests, fixed exception ping-pong
1 parent 6f01c0c commit d22358d

File tree

16 files changed

+112
-70
lines changed

16 files changed

+112
-70
lines changed

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,14 @@ abstract class KrpcTransportTestBase {
333333
@Test
334334
fun RPC_should_be_able_to_receive_100_000_ints_in_reasonable_time() = runTest(timeout = JS_EXTENDED_TIMEOUT) {
335335
val n = 100_000
336-
assertEquals(client.getNInts(n).last(), n)
336+
var counter = 0
337+
val last = client.getNInts(n).onEach {
338+
counter++
339+
if (counter % 1000 == 0) {
340+
println("Iteration: $counter")
341+
}
342+
}.last()
343+
assertEquals(n, last)
337344
}
338345

339346
@Test

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,17 @@ abstract class BackPressureTestBase {
9999
perCallBufferSize: Int,
100100
timeout: Duration = 10.seconds,
101101
) = runTest(perCallBufferSize, timeout) { service, impl ->
102+
var counter = 0
102103
val flowList = async {
103104
service.serverStream(1000).map {
104105
if (it == 0) {
105106
impl.entered.complete(Unit)
106107
impl.fence.await()
107108
}
109+
counter++
110+
if (counter % 10 == 0) {
111+
println("Iteration: $counter")
112+
}
108113
}.toList()
109114
}
110115

@@ -127,12 +132,17 @@ abstract class BackPressureTestBase {
127132
perCallBufferSize: Int,
128133
timeout: Duration = 10.seconds,
129134
) = runTest(perCallBufferSize, timeout) { service, impl ->
135+
var counter = 0
130136
val flowList = async {
131137
service.clientStream(flow {
132138
repeat(1000) {
133139
impl.clientStreamCounter.incrementAndGet()
134140
emit(it)
135141
}
142+
counter++
143+
if (counter % 10 == 0) {
144+
println("Iteration: $counter")
145+
}
136146
})
137147
}
138148

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ package kotlinx.rpc.krpc.test
77
import kotlinx.coroutines.cancelAndJoin
88
import kotlinx.coroutines.currentCoroutineContext
99
import kotlinx.coroutines.job
10-
import kotlinx.coroutines.test.runTest
1110
import kotlinx.coroutines.withContext
1211
import kotlinx.rpc.krpc.rpcClientConfig
1312
import kotlinx.rpc.krpc.rpcServerConfig
1413
import kotlinx.rpc.krpc.serialization.json.json
14+
import kotlinx.rpc.test.runTestWithCoroutinesProbes
1515
import kotlinx.rpc.withService
1616
import kotlin.coroutines.CoroutineContext
1717
import kotlin.test.Test
1818
import kotlin.test.assertEquals
19+
import kotlin.time.Duration.Companion.seconds
1920

2021
class CoroutineContextPropagationTest {
2122
private val rpcServerConfig = rpcServerConfig {
@@ -38,7 +39,7 @@ class CoroutineContextPropagationTest {
3839
}
3940

4041
@Test
41-
fun test() = runTest {
42+
fun test() = runTestWithCoroutinesProbes(timeout = 60.seconds) {
4243
var actualContext: CoroutineElement? = null
4344
val transport = LocalTransport(CoroutineElement("transport"))
4445
val server = KrpcTestServer(rpcServerConfig, transport.server)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/TransportTest.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,20 @@ class TransportTest {
6262
serialization {
6363
json()
6464
}
65+
66+
connector {
67+
waitTimeout = Duration.INFINITE
68+
}
6569
}
6670

6771
private val serverConfig = rpcServerConfig {
6872
serialization {
6973
json()
7074
}
75+
76+
connector {
77+
waitTimeout = Duration.INFINITE
78+
}
7179
}
7280

7381
private fun clientOf(localTransport: LocalTransport): RpcClient {

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/BaseServiceTest.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ abstract class BaseServiceTest {
1717
class Env(
1818
val service: TestService,
1919
val impl: TestServiceImpl,
20-
val client: KrpcTestClient,
21-
val server: KrpcTestServer,
2220
val transport: LocalTransport,
2321
testScope: CoroutineScope,
2422
) : CoroutineScope by testScope
@@ -57,7 +55,7 @@ abstract class BaseServiceTest {
5755
val impl = TestServiceImpl()
5856
server.registerService<TestService> { impl }
5957

60-
val env = Env(service, impl, client, server, transport, this)
58+
val env = Env(service, impl, transport, this)
6159
try {
6260
body(env)
6361
} finally {

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/TestService.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ import kotlinx.atomicfu.atomic
88
import kotlinx.coroutines.flow.Flow
99
import kotlinx.coroutines.flow.asFlow
1010
import kotlinx.coroutines.flow.toList
11-
import kotlinx.coroutines.flow.zip
1211
import kotlinx.rpc.annotations.Rpc
1312

1413
@Rpc
1514
interface TestService {
1615
suspend fun unary(n: Int): Int
1716
fun serverStreaming(num: Int): Flow<Int>
1817
suspend fun clientStreaming(n: Flow<Int>): Int
19-
fun bidiStreaming(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int>
18+
fun bidiStreaming(flow: Flow<Int>): Flow<Int>
2019
}
2120

2221
class TestServiceImpl : TestService {
@@ -40,8 +39,8 @@ class TestServiceImpl : TestService {
4039
return n.toList().sum()
4140
}
4241

43-
override fun bidiStreaming(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int> {
42+
override fun bidiStreaming(flow: Flow<Int>): Flow<Int> {
4443
bidiStreamingInvocations.incrementAndGet()
45-
return flow1.zip(flow2) { a, b -> a + b }
44+
return flow
4645
}
4746
}

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/stress/StressTest.kt

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@
44

55
package kotlinx.rpc.krpc.test.stress
66

7+
import kotlinx.atomicfu.atomic
8+
import kotlinx.coroutines.Dispatchers
9+
import kotlinx.coroutines.cancelAndJoin
10+
import kotlinx.coroutines.delay
711
import kotlinx.coroutines.flow.asFlow
812
import kotlinx.coroutines.flow.toList
913
import kotlinx.coroutines.joinAll
1014
import kotlinx.coroutines.launch
15+
import kotlinx.coroutines.withContext
1116
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
1217
import kotlinx.rpc.krpc.test.BaseServiceTest
18+
import kotlinx.rpc.test.runTestWithCoroutinesProbes
1319
import kotlin.test.Test
1420
import kotlin.test.assertEquals
1521
import kotlin.time.Duration
@@ -18,54 +24,54 @@ import kotlin.time.Duration.Companion.seconds
1824
class StressTest : BaseStressTest() {
1925
// ~30 sec, 300_000 messages
2026
@Test
21-
fun `unary, buffer 100, launches 3_000 x 100`() = testUnary(100, 240.seconds, 3_000, 100)
27+
fun `unary, buffer 100, launches 3_000 x 100`() = testUnary(100, 120.seconds, 3_000, 100)
2228

2329
// ~10 sec, 100_000 messages
2430
@Test
25-
fun `unary, buffer 1, launches 10_000 x 10`() = testUnary(1, 240.seconds, 10_000, 10)
31+
fun `unary, buffer 1, launches 10_000 x 10`() = testUnary(1, 120.seconds, 10_000, 10)
2632

2733
// ~51 min, 500_000 messages
2834
@Test
29-
fun `unary, 1000 buffer, launches 50_000 x 10`() = testUnary(1000, 360.seconds, 50_000, 10)
35+
fun `unary, 1000 buffer, launches 50_000 x 10`() = testUnary(1000, 180.seconds, 50_000, 10)
3036

3137

3238
// ~15 sec, 4_000_000 messages
3339
@Test
34-
fun `server streaming, buffer 30 buf, launches 200 x 10`() = testServerStreaming(30, 240.seconds, 200, 10)
40+
fun `server streaming, buffer 30 buf, launches 200 x 10`() = testServerStreaming(30, 120.seconds, 200, 10)
3541

3642
// ~19 sec, 4_000_000 messages
3743
@Test
38-
fun `server streaming, buffer 1, launches 200 x 10`() = testServerStreaming(1, 240.seconds, 200, 10)
44+
fun `server streaming, buffer 1, launches 200 x 10`() = testServerStreaming(1, 120.seconds, 200, 10)
3945

4046
// ~14 sec, 4_000_000 messages
4147
@Test
42-
fun `server streaming, buffer 2000, launches 200 x 10`() = testServerStreaming(2000, 360.seconds, 200, 10)
48+
fun `server streaming, buffer 2000, launches 200 x 10`() = testServerStreaming(2000, 180.seconds, 200, 10)
4349

4450

4551
// ~15 sec, 4_000_000 messages
4652
@Test
47-
fun `client streaming, buffer 30, launches 200 x 10`() = testClientStreaming(30, 240.seconds, 200, 10)
53+
fun `client streaming, buffer 30, launches 200 x 10`() = testClientStreaming(30, 120.seconds, 200, 10)
4854

4955
// ~19 sec, 4_000_000 messages
5056
@Test
51-
fun `client streaming, buffer 1, launches 200 x 10`() = testClientStreaming(1, 240.seconds, 200, 10)
57+
fun `client streaming, buffer 1, launches 200 x 10`() = testClientStreaming(1, 120.seconds, 200, 10)
5258

5359
// ~15 sec, 4_000_000 messages
5460
@Test
55-
fun `client streaming, buffer 2000, launches 200 x 10`() = testClientStreaming(2000, 360.seconds, 200, 10)
61+
fun `client streaming, buffer 2000, launches 200 x 10`() = testClientStreaming(2000, 180.seconds, 200, 10)
5662

5763

58-
// ~30 sec, 4_500_000 messages
64+
// ~23 sec, 4_500_000 messages
5965
@Test
60-
fun `bidi streaming, buffer 30, launches 150 x 10`() = testBidiStreaming(30, 240.seconds, 150, 10)
66+
fun `bidi streaming, buffer 30, launches 150 x 10`() = testBidiStreaming(30, 120.seconds, 150, 10)
6167

62-
// ~33 sec, 4_500_000 messages
68+
// ~24 sec, 4_500_000 messages
6369
@Test
64-
fun `bidi streaming, buffer 1, launches 150 x 10`() = testBidiStreaming(1, 240.seconds, 150, 10)
70+
fun `bidi streaming, buffer 1, launches 150 x 10`() = testBidiStreaming(1, 120.seconds, 150, 10)
6571

66-
// ~29 sec, 4_500_000 messages
72+
// ~20 sec, 4_500_000 messages
6773
@Test
68-
fun `bidi streaming, buffer 2000, launches 150 x 10`() = testBidiStreaming(2000, 240.seconds, 150, 10)
74+
fun `bidi streaming, buffer 2000, launches 150 x 10`() = testBidiStreaming(2000, 120.seconds, 150, 10)
6975
}
7076

7177

@@ -76,20 +82,19 @@ abstract class BaseStressTest : BaseServiceTest() {
7682
timeout: Duration,
7783
launches: Int,
7884
iterationsPerLaunch: Int,
79-
) = runTest(perCallBufferSize, timeout) {
85+
) = runTest(perCallBufferSize, timeout) { counter ->
8086
List(launches) { id ->
8187
val i = id + 1
8288
launch {
8389
repeat(iterationsPerLaunch) { iter ->
8490
val j = iter + 1
8591
assertEquals(
86-
expected = (1 + i * j) * (i * j),
87-
actual = service.bidiStreaming(
88-
(1..i * j).asFlow(),
89-
(1..i * j).asFlow(),
90-
).toList().sum(),
92+
expected = (1 + i * j) * (i * j) / 2,
93+
actual = service.bidiStreaming((1..i * j).asFlow()).toList().sum(),
9194
)
95+
counter.total.incrementAndGet()
9296
}
97+
counter.launches.incrementAndGet()
9398
}
9499
}.joinAll()
95100

@@ -102,7 +107,7 @@ abstract class BaseStressTest : BaseServiceTest() {
102107
timeout: Duration,
103108
launches: Int,
104109
iterationsPerLaunch: Int,
105-
) = runTest(perCallBufferSize, timeout) {
110+
) = runTest(perCallBufferSize, timeout) { counter ->
106111
List(launches) { id ->
107112
val i = id + 1
108113
launch {
@@ -112,7 +117,9 @@ abstract class BaseStressTest : BaseServiceTest() {
112117
expected = (1 + i * j) * (i * j) / 2,
113118
actual = service.clientStreaming((1..i * j).asFlow()),
114119
)
120+
counter.total.incrementAndGet()
115121
}
122+
counter.launches.incrementAndGet()
116123
}
117124
}.joinAll()
118125

@@ -125,7 +132,7 @@ abstract class BaseStressTest : BaseServiceTest() {
125132
timeout: Duration,
126133
launches: Int,
127134
iterationsPerLaunch: Int,
128-
) = runTest(perCallBufferSize, timeout) {
135+
) = runTest(perCallBufferSize, timeout) { counter ->
129136
List(launches) { id ->
130137
val i = id + 1
131138
launch {
@@ -136,7 +143,9 @@ abstract class BaseStressTest : BaseServiceTest() {
136143
actual = service.serverStreaming(i * j).toList().sum(),
137144
message = "i=$i, j=$j",
138145
)
146+
counter.total.incrementAndGet()
139147
}
148+
counter.launches.incrementAndGet()
140149
}
141150
}.joinAll()
142151

@@ -149,24 +158,45 @@ abstract class BaseStressTest : BaseServiceTest() {
149158
timeout: Duration,
150159
launches: Int,
151160
iterationsPerLaunch: Int,
152-
) = runTest(perCallBufferSize, timeout) {
161+
) = runTest(perCallBufferSize, timeout) { counter ->
153162
List(launches) { id ->
154163
launch {
155164
repeat(iterationsPerLaunch) { iter ->
156165
assertEquals(id * iter, service.unary(id * iter))
166+
counter.total.incrementAndGet()
157167
}
168+
counter.launches.incrementAndGet()
158169
}
159170
}.joinAll()
160171

161172
assertEquals(launches * iterationsPerLaunch, impl.unaryInvocations.value)
162173
}
163174

175+
class Counter {
176+
val launches = atomic(0)
177+
val total = atomic(0)
178+
}
179+
164180
private fun runTest(
165181
perCallBufferSize: Int = 100,
166182
timeout: Duration = 120.seconds,
167-
body: suspend Env.() -> Unit,
168-
) = kotlinx.coroutines.test.runTest(timeout = timeout) {
183+
body: suspend Env.(Counter) -> Unit,
184+
) = runTestWithCoroutinesProbes(timeout = timeout) {
169185
RpcInternalDumpLoggerContainer.set(null)
170-
runServiceTest(coroutineContext, perCallBufferSize, body)
186+
runServiceTest(coroutineContext, perCallBufferSize) {
187+
val counter = Counter()
188+
val counterJob = launch {
189+
while (true) {
190+
withContext(Dispatchers.Default) {
191+
delay(5.seconds)
192+
println("Launches: ${counter.launches.value}, total: ${counter.total.value}")
193+
}
194+
}
195+
}
196+
197+
body(this, counter)
198+
199+
counterJob.cancelAndJoin()
200+
}
171201
}
172202
}

tests/krpc-protocol-compatibility-tests/src/main/kotlin/kotlinx/rpc/krpc/test/compat/TestApi.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ interface CompatService {
2222
suspend fun unary(n: Int): Int
2323
fun serverStreaming(num: Int): Flow<Int>
2424
suspend fun clientStreaming(n: Flow<Int>): Int
25-
fun bidiStreaming(flow1: Flow<Int>, flow2: Flow<Int>): Flow<Int>
25+
fun bidiStreaming(flow: Flow<Int>): Flow<Int>
2626

2727
suspend fun requestCancellation()
2828
fun serverStreamCancellation(): Flow<Int>

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
6262
@TestFactory
6363
fun bidiStreamCalls() = matrixTest { service, _ ->
6464
assertEquals(
65-
2,
66-
service.bidiStreaming((1..1).asFlow(), (1..1).asFlow()).toList().sum()
65+
1,
66+
service.bidiStreaming((1..1).asFlow()).toList().sum()
6767
)
6868

6969
List(100) {
7070
launch {
7171
assertEquals(
72-
(it + 1) * (it + 2),
73-
service.bidiStreaming((1..it + 1).asFlow(), (1..it + 1).asFlow()).toList().sum(),
72+
(it + 1) * (it + 2) / 2,
73+
service.bidiStreaming((1..it + 1).asFlow()).toList().sum(),
7474
)
7575
}
7676
}.joinAll()

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.ExperimentalCoroutinesApi
1010
import kotlinx.coroutines.debug.DebugProbes
1111
import kotlinx.coroutines.test.TestScope
12+
import kotlinx.rpc.test.runTestWithCoroutinesProbes
1213
import org.junit.jupiter.api.DynamicTest
1314
import org.slf4j.LoggerFactory
1415
import java.net.URLClassLoader
@@ -81,7 +82,7 @@ abstract class KrpcProtocolCompatibilityTestsBase {
8182
): Stream<DynamicTest> {
8283
return prepareStarters(exclude).map {
8384
DynamicTest.dynamicTest("$role ${it.version}") {
84-
kotlinx.coroutines.test.runTest(timeout = timeout) {
85+
runTestWithCoroutinesProbes(timeout = timeout) {
8586
DebugProbes.withDebugProbes {
8687
val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
8788
val testAppender = root.getAppender("TEST") as TestLogAppender

0 commit comments

Comments
 (0)