File tree Expand file tree Collapse file tree 3 files changed +21
-5
lines changed
krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc
krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat
test-utils/src/commonMain/kotlin/kotlinx/rpc/test Expand file tree Collapse file tree 3 files changed +21
-5
lines changed Original file line number Diff line number Diff line change @@ -111,7 +111,7 @@ internal abstract class KrpcSendHandlerBaseTest {
111
111
}
112
112
113
113
protected fun runTest (
114
- timeout : Duration = 10 .seconds,
114
+ timeout : Duration = 30 .seconds,
115
115
body : suspend TestScope .(Channel <KrpcTransportMessage >, KrpcSendHandler ) -> Unit ,
116
116
) = runTestWithCoroutinesProbes(timeout = timeout) {
117
117
val channel = Channel <KrpcTransportMessage >(
Original file line number Diff line number Diff line change @@ -124,23 +124,37 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
124
124
}
125
125
126
126
@TestFactory
127
- fun clientStreamCancellation () = matrixTest { service, impl ->
127
+ fun clientStreamCancellation () = matrixTest(
128
+ exclude = listOf (
129
+ Versions .v0_9.client,
130
+ Versions .v0_9.server,
131
+ Versions .v0_8.client,
132
+ Versions .v0_8.server,
133
+ ),
134
+ ) { service, impl ->
128
135
val job = launch {
136
+ println (" [clientStreamCancellation] launching" )
129
137
service.clientStreamCancellation(flow {
130
138
emit(1 )
139
+ println (" [clientStreamCancellation] emit 1" )
131
140
impl.fence.await()
141
+ println (" [clientStreamCancellation] after fence" )
132
142
})
143
+ println (" [clientStreamCancellation] after service call" )
133
144
}
134
145
135
146
impl.entered.await()
147
+ println (" [clientStreamCancellation] entered" )
136
148
job.cancelAndJoin()
149
+ println (" [clientStreamCancellation] cancelled" )
137
150
impl.cancelled.await(1 )
151
+ println (" [clientStreamCancellation] awaited cancellation" )
138
152
139
153
assertNoErrorsInLogs()
140
154
}
141
155
142
156
@TestFactory
143
- fun fastProducer () = matrixTest(timeout = 60 .seconds) { service, impl ->
157
+ fun fastProducer () = matrixTest(timeout = 240 .seconds) { service, impl ->
144
158
val root = LoggerFactory .getLogger(Logger .ROOT_LOGGER_NAME ) as ch.qos.logback.classic.Logger
145
159
146
160
val async = async {
Original file line number Diff line number Diff line change @@ -22,13 +22,15 @@ class WaitCounter {
22
22
fun increment () {
23
23
lock.withLock {
24
24
val current = counter.incrementAndGet()
25
- waiters[current]?.forEach { it.resume(Unit ) }
25
+ (0 .. current).forEach {
26
+ waiters[it]?.forEach { continuation -> continuation.resume(Unit ) }
27
+ }
26
28
}
27
29
}
28
30
29
31
suspend fun await (value : Int ) = suspendCancellableCoroutine {
30
32
lock.withLock {
31
- if (counter.value = = value) {
33
+ if (counter.value > = value) {
32
34
it.resume(Unit )
33
35
} else {
34
36
waiters[value] = waiters[value].orEmpty() + it
You can’t perform that action at this time.
0 commit comments