Skip to content

Commit db65a50

Browse files
committed
Made more rigorous wait counter for tests
1 parent a9d7b63 commit db65a50

File tree

12 files changed

+109
-97
lines changed

12 files changed

+109
-97
lines changed

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/BackPressureTest.kt

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.rpc.krpc.test
66

7-
import kotlinx.atomicfu.atomic
87
import kotlinx.coroutines.CompletableDeferred
98
import kotlinx.coroutines.async
109
import kotlinx.coroutines.cancelAndJoin
@@ -14,13 +13,13 @@ import kotlinx.coroutines.flow.map
1413
import kotlinx.coroutines.flow.toList
1514
import kotlinx.coroutines.job
1615
import kotlinx.coroutines.test.TestScope
17-
import kotlinx.coroutines.yield
1816
import kotlinx.rpc.annotations.Rpc
1917
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
2018
import kotlinx.rpc.krpc.rpcClientConfig
2119
import kotlinx.rpc.krpc.rpcServerConfig
2220
import kotlinx.rpc.krpc.serialization.json.json
2321
import kotlinx.rpc.registerService
22+
import kotlinx.rpc.test.WaitCounter
2423
import kotlinx.rpc.test.runTestWithCoroutinesProbes
2524
import kotlinx.rpc.withService
2625
import kotlin.test.Test
@@ -38,26 +37,20 @@ interface BackPressure {
3837
}
3938

4039
class BackPressureImpl : BackPressure {
41-
val plainCounter = atomic(0)
42-
val serverStreamCounter = atomic(0)
43-
val clientStreamCounter = atomic(0)
40+
val plainCounter = WaitCounter()
41+
val serverStreamCounter = WaitCounter()
42+
val clientStreamCounter = WaitCounter()
4443
val entered = CompletableDeferred<Unit>()
4544
val fence = CompletableDeferred<Unit>()
4645

47-
suspend fun awaitCounter(value: Int, counter: BackPressureImpl.() -> Int) {
48-
while (counter() != value) {
49-
yield()
50-
}
51-
}
52-
5346
override suspend fun plain() {
54-
plainCounter.incrementAndGet()
47+
plainCounter.increment()
5548
}
5649

5750
override fun serverStream(num: Int): Flow<Int> {
5851
return flow {
5952
repeat(num) {
60-
serverStreamCounter.incrementAndGet()
53+
serverStreamCounter.increment()
6154
emit(it)
6255
}
6356
}
@@ -114,17 +107,17 @@ abstract class BackPressureTestBase {
114107
}
115108

116109
impl.entered.await()
117-
impl.awaitCounter(perCallBufferSize + 2) { serverStreamCounter.value }
110+
impl.serverStreamCounter.await(perCallBufferSize + 2)
118111

119112
repeat(1000) {
120113
service.plain()
121114
}
122115

123-
impl.awaitCounter(1000) { plainCounter.value }
116+
impl.plainCounter.await(1000)
124117

125118
assertEquals(perCallBufferSize + 2, impl.serverStreamCounter.value)
126119
impl.fence.complete(Unit)
127-
impl.awaitCounter(1000) { serverStreamCounter.value }
120+
impl.serverStreamCounter.await(1000)
128121
assertEquals(1000, flowList.await().size)
129122
}
130123

@@ -136,7 +129,7 @@ abstract class BackPressureTestBase {
136129
val flowList = async {
137130
service.clientStream(flow {
138131
repeat(1000) {
139-
impl.clientStreamCounter.incrementAndGet()
132+
impl.clientStreamCounter.increment()
140133
emit(it)
141134
counter++
142135
if (counter % 10 == 0) {
@@ -147,18 +140,18 @@ abstract class BackPressureTestBase {
147140
}
148141

149142
impl.entered.await()
150-
impl.awaitCounter(perCallBufferSize + 2) { clientStreamCounter.value }
143+
impl.clientStreamCounter.await(perCallBufferSize + 2)
151144

152145
repeat(1000) {
153146
service.plain()
154147
}
155148

156-
impl.awaitCounter(1000) { plainCounter.value }
149+
impl.plainCounter.await(1000)
157150

158151
assertEquals(0, impl.consumed.size)
159152
assertEquals(perCallBufferSize + 2, impl.clientStreamCounter.value)
160153
impl.fence.complete(Unit)
161-
impl.awaitCounter(1000) { clientStreamCounter.value }
154+
impl.clientStreamCounter.await(1000)
162155
flowList.await()
163156
assertEquals(1000, impl.consumed.size)
164157
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package kotlinx.rpc.krpc.test.cancellation
66

7-
import kotlinx.atomicfu.atomic
87
import kotlinx.coroutines.*
98
import kotlinx.coroutines.flow.*
109
import kotlinx.rpc.annotations.Rpc
10+
import kotlinx.rpc.test.WaitCounter
1111

1212
@Rpc
1313
interface CancellationService {
@@ -33,15 +33,9 @@ interface CancellationService {
3333
}
3434

3535
class CancellationServiceImpl : CancellationService {
36-
val waitCounter = atomic(0)
37-
val successCounter = atomic(0)
38-
val cancellationsCounter = atomic(0)
39-
40-
suspend fun awaitCounter(value: Int, counter: CancellationServiceImpl.() -> Int) {
41-
while (counter() != value) {
42-
yield()
43-
}
44-
}
36+
val waitCounter = WaitCounter()
37+
val successCounter = WaitCounter()
38+
val cancellationsCounter = WaitCounter()
4539

4640
val consumedIncomingValues = mutableListOf<Int>()
4741
val firstIncomingConsumed = CompletableDeferred<Int>()
@@ -51,11 +45,11 @@ class CancellationServiceImpl : CancellationService {
5145
override suspend fun longRequest() {
5246
try {
5347
firstIncomingConsumed.complete(0)
54-
waitCounter.incrementAndGet()
48+
waitCounter.increment()
5549
fence.await()
56-
successCounter.incrementAndGet()
50+
successCounter.increment()
5751
} catch (e: CancellationException) {
58-
cancellationsCounter.incrementAndGet()
52+
cancellationsCounter.increment()
5953
throw e
6054
}
6155
}
@@ -93,7 +87,7 @@ class CancellationServiceImpl : CancellationService {
9387
}
9488
}
9589
} catch (e: CancellationException) {
96-
cancellationsCounter.incrementAndGet()
90+
cancellationsCounter.increment()
9791
throw e
9892
}
9993
}
@@ -119,7 +113,7 @@ class CancellationServiceImpl : CancellationService {
119113

120114
fence.await()
121115
} catch (e: CancellationException) {
122-
cancellationsCounter.incrementAndGet()
116+
cancellationsCounter.increment()
123117
throw e
124118
}
125119
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ class CancellationTest {
3131
service.longRequest()
3232
}
3333

34-
serverInstance().awaitCounter(2) { waitCounter.value }
34+
serverInstance().waitCounter.await(2)
3535
cancellingRequestJob.cancelAndJoin()
36-
serverInstance().awaitCounter(1) { cancellationsCounter.value }
36+
serverInstance().cancellationsCounter.await(1)
3737
serverInstance().fence.complete(Unit)
3838
aliveRequestJob.join()
3939

@@ -162,20 +162,19 @@ class CancellationTest {
162162
secondService.longRequest()
163163
}
164164

165-
serverInstance().awaitCounter(2) { waitCounter.value }
165+
serverInstance().waitCounter.await(2)
166166
client.close()
167+
client.awaitCompletion()
168+
server.awaitCompletion()
167169
firstRequestJob.join()
168170
secondRequestJob.join()
169-
serverInstance().awaitCounter(2) { cancellationsCounter.value }
171+
serverInstance().cancellationsCounter.await(2)
170172

171173
assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled")
172174
assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled")
173175

174176
assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed")
175177

176-
client.awaitCompletion()
177-
server.awaitCompletion()
178-
179178
checkAlive(clientAlive = false, serverAlive = false)
180179
stopAllAndJoin()
181180

@@ -194,20 +193,19 @@ class CancellationTest {
194193
secondService.longRequest()
195194
}
196195

197-
serverInstance().awaitCounter(2) { waitCounter.value } // wait for requests to reach server
196+
serverInstance().waitCounter.await(2) // wait for requests to reach server
198197
server.close()
198+
server.awaitCompletion()
199+
client.awaitCompletion()
199200
firstRequestJob.join()
200201
secondRequestJob.join()
201-
serverInstance().awaitCounter(2) { cancellationsCounter.value }
202+
serverInstance().cancellationsCounter.await(2)
202203

203204
assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled")
204205
assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled")
205206

206207
assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed")
207208

208-
client.awaitCompletion()
209-
server.awaitCompletion()
210-
211209
checkAlive(clientAlive = false, serverAlive = false)
212210
stopAllAndJoin()
213211

@@ -284,7 +282,7 @@ class CancellationTest {
284282
// close by request cancel and not scope closure
285283
serverInstance().consumedAll.await()
286284

287-
serverInstance().awaitCounter(1) { cancellationsCounter.value }
285+
serverInstance().cancellationsCounter.await(1)
288286

289287
assertContentEquals(listOf(0), serverInstance().consumedIncomingValues)
290288

@@ -310,7 +308,7 @@ class CancellationTest {
310308
// close by request cancel and not scope closure
311309
serverInstance().consumedAll.await()
312310

313-
serverInstance().awaitCounter(1) { cancellationsCounter.value }
311+
serverInstance().cancellationsCounter.await(1)
314312

315313
val result = flow.toList()
316314

tests/krpc-protocol-compatibility-tests/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ fun DependencyHandlerScope.versioned(configuration: Configuration, version: Stri
5858
add(configuration.name, "org.jetbrains.kotlinx:kotlinx-rpc-krpc-server:$version")
5959
add(configuration.name, "org.jetbrains.kotlinx:kotlinx-rpc-krpc-serialization-json:$version")
6060
add(configuration.name, libs.atomicfu)
61+
add(configuration.name, projects.tests.testUtils)
6162
}
6263

6364
dependencies {
6465
api(libs.atomicfu)
66+
api(projects.tests.testUtils)
6567
implementation(libs.serialization.core)
6668
implementation(libs.coroutines.core)
6769
implementation(libs.kotlin.reflect)

tests/krpc-protocol-compatibility-tests/src/main/kotlin/kotlinx/rpc/krpc/test/compat/TestApi.kt

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package kotlinx.rpc.krpc.test.compat
77
import kotlinx.coroutines.CompletableDeferred
88
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.flow.Flow
10-
import kotlinx.coroutines.yield
10+
import kotlinx.rpc.test.WaitCounter
1111

1212
interface CompatTransport : CoroutineScope {
1313
suspend fun send(message: String)
@@ -32,16 +32,10 @@ interface CompatService {
3232
}
3333

3434
interface CompatServiceImpl {
35-
val exitMethod: Int
36-
val cancelled: Int
35+
val exitMethod: WaitCounter
36+
val cancelled: WaitCounter
3737
val entered: CompletableDeferred<Unit>
3838
val fence: CompletableDeferred<Unit>
39-
40-
suspend fun awaitCounter(num: Int, counter: CompatServiceImpl.() -> Int) {
41-
while (counter() != num) {
42-
yield()
43-
}
44-
}
4539
}
4640

4741
interface Starter {

tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,18 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
8686

8787
impl.entered.await()
8888
job.cancelAndJoin()
89-
impl.awaitCounter(1) { cancelled }
90-
assertEquals(0, impl.exitMethod)
89+
impl.cancelled.await(1)
90+
assertEquals(0, impl.exitMethod.value)
9191

9292
val followup = launch {
9393
service.requestCancellation()
9494
}
9595
impl.fence.complete(Unit)
9696
followup.join()
97-
assertEquals(1, impl.exitMethod)
97+
assertEquals(1, impl.exitMethod.value)
9898

9999
assertNoErrorsInLogs()
100-
assertEquals(1, impl.cancelled)
100+
assertEquals(1, impl.cancelled.value)
101101
}
102102

103103
@TestFactory
@@ -108,8 +108,8 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
108108

109109
impl.entered.await()
110110
job.cancelAndJoin()
111-
impl.awaitCounter(1) { cancelled }
112-
assertEquals(0, impl.exitMethod)
111+
impl.cancelled.await(1)
112+
assertEquals(0, impl.exitMethod.value)
113113

114114
val followup = async {
115115
service.serverStreamCancellation().toList()
@@ -118,7 +118,7 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
118118
assertEquals(listOf(1, 2), followup.await())
119119

120120
assertNoErrorsInLogs()
121-
assertEquals(1, impl.cancelled)
121+
assertEquals(1, impl.cancelled.value)
122122
}
123123

124124
@TestFactory
@@ -132,7 +132,7 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
132132

133133
impl.entered.await()
134134
job.cancelAndJoin()
135-
impl.awaitCounter(1) { cancelled }
135+
impl.cancelled.await(1)
136136

137137
assertNoErrorsInLogs()
138138
}

0 commit comments

Comments
 (0)