Skip to content

Commit 7b9566f

Browse files
committed
Added cancellation tests and GC tests, checked exceptions
1 parent 7de8f5a commit 7b9566f

File tree

6 files changed

+166
-46
lines changed

6 files changed

+166
-46
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -340,65 +340,72 @@ public abstract class KrpcClient(
340340

341341
connector.sendMessage(request)
342342

343-
connector.subscribeToCallResponse(call.descriptor.fqName, callId) { message ->
344-
when (message) {
345-
is KrpcCallMessage.CallData -> {
346-
error("Unexpected message")
347-
}
348-
349-
is KrpcCallMessage.CallException -> {
350-
val cause = runCatching {
351-
message.cause.deserialize()
343+
try {
344+
connector.subscribeToCallResponse(call.descriptor.fqName, callId) { message ->
345+
when (message) {
346+
is KrpcCallMessage.CallData -> {
347+
error("Unexpected message")
352348
}
353349

354-
val result = if (cause.isFailure) {
355-
cause.exceptionOrNull()!!
356-
} else {
357-
cause.getOrNull()!!
350+
is KrpcCallMessage.CallException -> {
351+
val cause = runCatching {
352+
message.cause.deserialize()
353+
}
354+
355+
val result = if (cause.isFailure) {
356+
cause.exceptionOrNull()!!
357+
} else {
358+
cause.getOrNull()!!
359+
}
360+
361+
channel.close(result)
358362
}
359363

360-
channel.close(result)
361-
}
364+
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
365+
val value = runCatching {
366+
val serializerResult =
367+
nonSuspendingSerialFormat.serializersModule.rpcSerializerForType(callable.returnType)
362368

363-
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
364-
val value = runCatching {
365-
val serializerResult =
366-
nonSuspendingSerialFormat.serializersModule.rpcSerializerForType(callable.returnType)
369+
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
370+
}
367371

368-
decodeMessageData(nonSuspendingSerialFormat, serializerResult, message)
372+
@Suppress("UNCHECKED_CAST")
373+
channel.send(value.getOrNull() as T)
369374
}
370375

371-
@Suppress("UNCHECKED_CAST")
372-
channel.send(value.getOrNull() as T)
373-
}
374-
375-
is KrpcCallMessage.StreamFinished -> {
376-
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
377-
channel.close()
378-
}
376+
is KrpcCallMessage.StreamFinished -> {
377+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
378+
channel.close()
379+
}
379380

380-
is KrpcCallMessage.StreamCancel -> {
381-
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
382-
val cause = message.cause.deserialize()
383-
channel.close(cause)
381+
is KrpcCallMessage.StreamCancel -> {
382+
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
383+
val cause = message.cause.deserialize()
384+
channel.close(cause)
385+
}
384386
}
385387
}
386-
}
387388

388-
try {
389-
while (true) {
390-
val element = channel.receiveCatching()
391-
if (element.isClosed) {
392-
val ex = element.exceptionOrNull() ?: break
393-
error(ex)
394-
}
389+
try {
390+
while (true) {
391+
val element = channel.receiveCatching()
392+
if (element.isClosed) {
393+
val ex = element.exceptionOrNull() ?: break
394+
error(ex)
395+
}
395396

396-
if (!element.isFailure) {
397-
emit(element.getOrThrow())
397+
if (!element.isFailure) {
398+
emit(element.getOrThrow())
399+
}
398400
}
401+
} catch (_: ClosedReceiveChannelException) {
402+
// ignore
399403
}
400-
} catch (_: ClosedReceiveChannelException) {
401-
// ignore
404+
} catch (e: CancellationException) {
405+
// sendCancellation is not suspending, so no need for NonCancellable
406+
sendCancellation(CancellationType.REQUEST, call.serviceId.toString(), callId)
407+
408+
throw e
402409
}
403410
}
404411
}

krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import java.time.LocalDateTime
1818
@Rpc
1919
interface KrpcTestService : RemoteService {
2020
fun nonSuspendFlow(): Flow<Int>
21+
fun nonSuspendFlowErrorOnEmit(): Flow<Int>
22+
fun nonSuspendFlowErrorOnReturn(): Flow<Int>
2123
suspend fun empty()
2224
suspend fun returnType(): String
2325
suspend fun simpleWithParams(name: String): String

krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ class KrpcTestServiceBackend(override val coroutineContext: CoroutineContext) :
3030
}
3131
}
3232

33+
override fun nonSuspendFlowErrorOnEmit(): Flow<Int> {
34+
return flow {
35+
error("nonSuspendFlowErrorOnEmit")
36+
}
37+
}
38+
39+
override fun nonSuspendFlowErrorOnReturn(): Flow<Int> {
40+
error("nonSuspendFlowErrorOnReturn")
41+
}
42+
3343
@Suppress("detekt.EmptyFunctionBlock")
3444
override suspend fun empty() {}
3545

krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,25 @@ abstract class KrpcTransportTestBase {
128128
}
129129
}
130130

131+
@Test
132+
fun nonSuspendErrorOnEmit() {
133+
runBlocking {
134+
val flow = client.nonSuspendFlowErrorOnReturn()
135+
assertFailsWith<IllegalStateException> {
136+
flow.toList()
137+
}
138+
}
139+
}
140+
141+
@Test
142+
fun nonSuspendErrorOnReturn() {
143+
runBlocking {
144+
assertFailsWith<IllegalStateException> {
145+
client.nonSuspendFlowErrorOnReturn().toList()
146+
}
147+
}
148+
}
149+
131150
@Test
132151
fun empty() {
133152
backend.cancel()

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.krpc.test.cancellation
@@ -41,6 +41,8 @@ interface CancellationService : RemoteService {
4141
suspend fun closedStreamScopeCallback()
4242

4343
suspend fun closedStreamScopeCallbackWithStream(): Flow<Int>
44+
45+
fun nonSuspendable(): Flow<Int>
4446
}
4547

4648
class CancellationServiceImpl(override val coroutineContext: CoroutineContext) : CancellationService {
@@ -169,6 +171,31 @@ class CancellationServiceImpl(override val coroutineContext: CoroutineContext) :
169171
}
170172
}
171173
}
174+
175+
var nonSuspendableSecond = false
176+
val nonSuspendableFinished = CompletableDeferred<Unit>()
177+
178+
override fun nonSuspendable(): Flow<Int> {
179+
return flow {
180+
try {
181+
repeat(2) {
182+
if (it == 1) {
183+
nonSuspendableSecond = true
184+
}
185+
186+
emit(it)
187+
188+
if (it == 0) {
189+
fence.await()
190+
}
191+
}
192+
} catch (e: CancellationException) {
193+
nonSuspendableFinished.complete(Unit)
194+
195+
throw e
196+
}
197+
}
198+
}
172199
}
173200

174201
fun resumableFlow(fence: Deferred<Unit>, onEmit: (Int) -> Unit = {}): Flow<Int> = flow {

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.rpc.krpc.test.cancellation
@@ -671,6 +671,61 @@ class CancellationTest {
671671
stopAllAndJoin()
672672
}
673673

674+
@Test
675+
fun testCancellingNonSuspendable() = runCancellationTest {
676+
val flow = service.nonSuspendable()
677+
val firstDone = CompletableDeferred<Unit>()
678+
val requestJob = launch {
679+
flow.collect {
680+
if (it == 0) {
681+
firstDone.complete(Unit)
682+
}
683+
}
684+
}
685+
686+
firstDone.await()
687+
requestJob.cancel("Cancelled by test")
688+
requestJob.join()
689+
serverInstance().nonSuspendableFinished.await()
690+
691+
assertEquals(false, serverInstance().nonSuspendableSecond)
692+
693+
checkAlive()
694+
stopAllAndJoin()
695+
}
696+
697+
@Test
698+
fun testGCNonSuspendable() = runCancellationTest {
699+
val firstDone = CompletableDeferred<Unit>()
700+
val latch = CompletableDeferred<Unit>()
701+
val requestJob = processFlowAndLeaveUnusedForGC(firstDone, latch)
702+
703+
firstDone.await()
704+
System.gc() // hint GC to collect the flow
705+
serverInstance().nonSuspendableFinished.await()
706+
707+
assertEquals(false, serverInstance().nonSuspendableSecond)
708+
latch.complete(Unit)
709+
requestJob.join()
710+
711+
checkAlive()
712+
stopAllAndJoin()
713+
}
714+
715+
private fun CancellationToolkit.processFlowAndLeaveUnusedForGC(
716+
firstDone: CompletableDeferred<Unit>,
717+
latch: CompletableDeferred<Unit>
718+
): Job {
719+
val flow = service.nonSuspendable()
720+
val requestJob = launch {
721+
flow.first()
722+
firstDone.complete(Unit)
723+
latch.await()
724+
}
725+
726+
return requestJob
727+
}
728+
674729
private fun CancellationToolkit.checkAlive(
675730
serviceAlive: Boolean = true,
676731
clientAlive: Boolean = true,

0 commit comments

Comments
 (0)