File tree Expand file tree Collapse file tree 3 files changed +25
-5
lines changed
krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc
krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat
test-utils/src/commonMain/kotlin/kotlinx/rpc/test Expand file tree Collapse file tree 3 files changed +25
-5
lines changed Original file line number Diff line number Diff line change @@ -111,7 +111,7 @@ internal abstract class KrpcSendHandlerBaseTest {
111111 }
112112
113113 protected fun runTest (
114- timeout : Duration = 10 .seconds,
114+ timeout : Duration = 30 .seconds,
115115 body : suspend TestScope .(Channel <KrpcTransportMessage >, KrpcSendHandler ) -> Unit ,
116116 ) = runTestWithCoroutinesProbes(timeout = timeout) {
117117 val channel = Channel <KrpcTransportMessage >(
Original file line number Diff line number Diff line change @@ -124,23 +124,37 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
124124 }
125125
126126 @TestFactory
127- fun clientStreamCancellation () = matrixTest { service, impl ->
127+ fun clientStreamCancellation () = matrixTest(
128+ exclude = listOf (
129+ Versions .v0_9.client,
130+ Versions .v0_9.server,
131+ Versions .v0_8.client,
132+ Versions .v0_8.server,
133+ ),
134+ ) { service, impl ->
128135 val job = launch {
136+ println (" [clientStreamCancellation] launching" )
129137 service.clientStreamCancellation(flow {
130138 emit(1 )
139+ println (" [clientStreamCancellation] emit 1" )
131140 impl.fence.await()
141+ println (" [clientStreamCancellation] after fence" )
132142 })
143+ println (" [clientStreamCancellation] after service call" )
133144 }
134145
135146 impl.entered.await()
147+ println (" [clientStreamCancellation] entered" )
136148 job.cancelAndJoin()
149+ println (" [clientStreamCancellation] cancelled" )
137150 impl.cancelled.await(1 )
151+ println (" [clientStreamCancellation] awaited cancellation" )
138152
139153 assertNoErrorsInLogs()
140154 }
141155
142156 @TestFactory
143- fun fastProducer () = matrixTest(timeout = 60 .seconds) { service, impl ->
157+ fun fastProducer () = matrixTest(timeout = 240 .seconds) { service, impl ->
144158 val root = LoggerFactory .getLogger(Logger .ROOT_LOGGER_NAME ) as ch.qos.logback.classic.Logger
145159
146160 val async = async {
Original file line number Diff line number Diff line change @@ -22,13 +22,19 @@ class WaitCounter {
2222 fun increment () {
2323 lock.withLock {
2424 val current = counter.incrementAndGet()
25- waiters[current]?.forEach { it.resume(Unit ) }
25+ (0 .. current).forEach {
26+ waiters[it]?.forEach { continuation ->
27+ continuation.resume(Unit )
28+ }
29+
30+ waiters.remove(it)
31+ }
2632 }
2733 }
2834
2935 suspend fun await (value : Int ) = suspendCancellableCoroutine {
3036 lock.withLock {
31- if (counter.value = = value) {
37+ if (counter.value > = value) {
3238 it.resume(Unit )
3339 } else {
3440 waiters[value] = waiters[value].orEmpty() + it
You can’t perform that action at this time.
0 commit comments