Skip to content

Commit e18b96b

Browse files
committed
Updated timeouts and added more logs
1 parent 1296b75 commit e18b96b

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcSendHandlerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ internal abstract class KrpcSendHandlerBaseTest {
111111
}
112112

113113
protected fun runTest(
114-
timeout: Duration = 10.seconds,
114+
timeout: Duration = 30.seconds,
115115
body: suspend TestScope.(Channel<KrpcTransportMessage>, KrpcSendHandler) -> Unit,
116116
) = runTestWithCoroutinesProbes(timeout = timeout) {
117117
val channel = Channel<KrpcTransportMessage>(

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,37 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
124124
}
125125

126126
@TestFactory
127-
fun clientStreamCancellation() = matrixTest { service, impl ->
127+
fun clientStreamCancellation() = matrixTest(
128+
exclude = listOf(
129+
Versions.v0_9.client,
130+
Versions.v0_9.server,
131+
Versions.v0_8.client,
132+
Versions.v0_8.server,
133+
),
134+
) { service, impl ->
128135
val job = launch {
136+
println("[clientStreamCancellation] launching")
129137
service.clientStreamCancellation(flow {
130138
emit(1)
139+
println("[clientStreamCancellation] emit 1")
131140
impl.fence.await()
141+
println("[clientStreamCancellation] after fence")
132142
})
143+
println("[clientStreamCancellation] after service call")
133144
}
134145

135146
impl.entered.await()
147+
println("[clientStreamCancellation] entered")
136148
job.cancelAndJoin()
149+
println("[clientStreamCancellation] cancelled")
137150
impl.cancelled.await(1)
151+
println("[clientStreamCancellation] awaited cancellation")
138152

139153
assertNoErrorsInLogs()
140154
}
141155

142156
@TestFactory
143-
fun fastProducer() = matrixTest(timeout = 60.seconds) { service, impl ->
157+
fun fastProducer() = matrixTest(timeout = 240.seconds) { service, impl ->
144158
val root = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME) as ch.qos.logback.classic.Logger
145159

146160
val async = async {

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/service/TestService.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,20 @@ class TestServiceImpl : TestService, CompatServiceImpl {
7878
override suspend fun clientStreamCancellation(n: Flow<Int>) {
7979
try {
8080
n.collect {
81+
println("[clientStreamCancellation] collected $it")
8182
if (it != 0) {
8283
entered.complete(Unit)
8384
}
8485
}
8586
} catch (e: CancellationException) {
87+
println("[clientStreamCancellation] cancelled on server")
8688
cancelled.increment()
8789
throw e
90+
} catch (e: Throwable) {
91+
println("[clientStreamCancellation] caught $e")
92+
throw e
93+
} finally {
94+
println("[clientStreamCancellation] finally")
8895
}
8996
}
9097

tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/WaitCounter.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,19 @@ class WaitCounter {
2222
fun increment() {
2323
lock.withLock {
2424
val current = counter.incrementAndGet()
25-
waiters[current]?.forEach { it.resume(Unit) }
25+
(0..current).forEach {
26+
waiters[it]?.forEach { continuation ->
27+
continuation.resume(Unit)
28+
}
29+
30+
waiters.remove(it)
31+
}
2632
}
2733
}
2834

2935
suspend fun await(value: Int) = suspendCancellableCoroutine {
3036
lock.withLock {
31-
if (counter.value == value) {
37+
if (counter.value >= value) {
3238
it.resume(Unit)
3339
} else {
3440
waiters[value] = waiters[value].orEmpty() + it

0 commit comments

Comments
 (0)