Skip to content

Commit 1e7d5c6

Browse files
committed
More tests for stream cancelling, simple fixes for other staff
1 parent b5f3b24 commit 1e7d5c6

File tree

6 files changed

+101
-17
lines changed

6 files changed

+101
-17
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import kotlinx.coroutines.SupervisorJob
1616
import kotlinx.coroutines.cancel
1717
import kotlinx.coroutines.cancelAndJoin
1818
import kotlinx.coroutines.channels.Channel
19+
import kotlinx.coroutines.channels.ClosedSendChannelException
1920
import kotlinx.coroutines.coroutineScope
2021
import kotlinx.coroutines.currentCoroutineContext
2122
import kotlinx.coroutines.ensureActive
@@ -476,7 +477,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
476477
connectionId = outgoingStream.connectionId,
477478
serviceId = outgoingStream.serviceId,
478479
)
479-
sender.sendMessage(message)
480+
sendException(message)
480481

481482
// stop the flow and its coroutine, other flows are not affected
482483
throw e
@@ -490,7 +491,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
490491
connectionId = outgoingStream.connectionId,
491492
serviceId = outgoingStream.serviceId,
492493
)
493-
sender.sendMessage(message)
494+
sendException(message)
494495

495496
throw cause
496497
}
@@ -506,6 +507,14 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
506507
sender.sendMessage(message)
507508
}
508509

510+
private suspend fun sendException(message: KrpcMessage) {
511+
try {
512+
sender.sendMessage(message)
513+
} catch (_: ClosedSendChannelException) {
514+
// ignore, we are already cancelled and have a cause
515+
}
516+
}
517+
509518
private suspend fun collectAndSendOutgoingStream(
510519
serialFormat: SerialFormat,
511520
flow: Flow<*>,
@@ -543,7 +552,11 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
543552
}
544553
}
545554

546-
sender.sendMessage(message)
555+
try {
556+
sender.sendMessage(message)
557+
} catch (e: ClosedSendChannelException) {
558+
throw CancellationException("Request cancelled", e)
559+
}
547560
}
548561
}
549562

krpc/krpc-logging/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/logging/RpcInternalCommonLogger.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface RpcInternalCommonLogger {
4646
}
4747

4848
public fun <T : Any> logger(kClass: KClass<T>): RpcInternalCommonLogger {
49-
return logger(kClass.qualifiedName ?: kClass.toString())
49+
return logger(kClass.simpleName ?: kClass.toString())
5050
}
5151

5252
public inline fun <reified T : Any> logger(): RpcInternalCommonLogger {

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public abstract class KrpcServer(
135135

136136
final override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass<Service>) {
137137
connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName)
138-
rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)
138+
rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)?.close()
139139
}
140140

141141
private fun <@Rpc Service : Any> createNewServiceInstance(
@@ -159,6 +159,11 @@ public abstract class KrpcServer(
159159
cancelledByClient = true
160160

161161
internalScope.cancel("Server cancelled by client")
162+
163+
rpcServices.values.forEach { service ->
164+
service.close()
165+
}
166+
162167
rpcServices.clear()
163168
}
164169

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.rpc.krpc.server.internal
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.ClosedSendChannelException
89
import kotlinx.coroutines.flow.Flow
910
import kotlinx.rpc.annotations.Rpc
1011
import kotlinx.rpc.descriptor.RpcInvokator
@@ -200,7 +201,11 @@ internal class KrpcServerService<@Rpc T : Any>(
200201
serviceId = callData.serviceId,
201202
)
202203

203-
connector.sendMessage(exceptionMessage)
204+
try {
205+
connector.sendMessage(exceptionMessage)
206+
} catch (_: ClosedSendChannelException) {
207+
// ignore, the client probably already disconnected
208+
}
204209

205210
closeReceiving(callId, "Server request failed", failure, fromJob = true)
206211
} else {
@@ -324,7 +329,12 @@ internal class KrpcServerService<@Rpc T : Any>(
324329
}
325330
}
326331

327-
suspend fun closeReceiving(
332+
fun close() {
333+
requestMap.entries.forEach { (callId, request) -> request.cancelAndClose(callId) }
334+
requestMap.clear()
335+
}
336+
337+
fun closeReceiving(
328338
callId: String,
329339
message: String? = null,
330340
cause: Throwable? = null,
@@ -377,7 +387,7 @@ internal class KrpcServerService<@Rpc T : Any>(
377387
}
378388

379389
internal class RpcRequest(val handlerJob: Job, val streamContext: ServerStreamContext) {
380-
suspend fun cancelAndClose(
390+
fun cancelAndClose(
381391
callId: String,
382392
message: String? = null,
383393
cause: Throwable? = null,
@@ -389,8 +399,6 @@ internal class RpcRequest(val handlerJob: Job, val streamContext: ServerStreamCo
389399
message != null -> handlerJob.cancel(message)
390400
else -> handlerJob.cancel()
391401
}
392-
393-
handlerJob.join()
394402
}
395403

396404
streamContext.removeCall(callId, cause)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class CancellationServiceImpl : CancellationService {
6363
}
6464

6565
override fun incomingStream(): Flow<Int> {
66+
waitCounter.increment()
6667
return resumableFlow(fence)
6768
}
6869

@@ -95,7 +96,13 @@ class CancellationServiceImpl : CancellationService {
9596
}
9697

9798
override suspend fun outgoingStream(stream: Flow<Int>) {
98-
consume(stream)
99+
try {
100+
waitCounter.increment()
101+
consume(stream)
102+
} catch (e: CancellationException) {
103+
cancellationsCounter.increment()
104+
throw e
105+
}
99106
}
100107

101108
@OptIn(DelicateCoroutinesApi::class)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class CancellationTest {
150150
assertEquals(1, serverInstance().cancellationsCounter.value, "Expected 1 request to be cancelled")
151151
}
152152

153+
@OptIn(ExperimentalCoroutinesApi::class)
153154
@Test
154155
fun testCancelClient() = runCancellationTest {
155156
val firstRequestJob = launch {
@@ -162,23 +163,48 @@ class CancellationTest {
162163
secondService.longRequest()
163164
}
164165

165-
serverInstance().waitCounter.await(2)
166+
val clientFlowJob = launch {
167+
service.outgoingStream(flow {
168+
emit(0)
169+
serverInstance().fence.await()
170+
emit(1)
171+
})
172+
}
173+
174+
val serverFlowJob = launch {
175+
try {
176+
secondService.incomingStream().toList()
177+
fail("Expected cancellation of the client to cancel the server flow")
178+
} catch (e : CancellationException) {
179+
serverInstance().cancellationsCounter.increment()
180+
throw e
181+
}
182+
}
183+
184+
serverInstance().waitCounter.await(4)
166185
client.close()
167186
client.awaitCompletion()
168187
server.awaitCompletion()
169188
firstRequestJob.join()
170189
secondRequestJob.join()
171-
serverInstance().cancellationsCounter.await(2)
190+
clientFlowJob.join()
191+
192+
serverInstance().fence.complete(Unit)
193+
serverFlowJob.join()
194+
195+
serverInstance().cancellationsCounter.await(4)
172196

173197
assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled")
174198
assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled")
199+
assertTrue(clientFlowJob.isCancelled, "Expected clientFlowJob to be cancelled")
200+
assertTrue(serverFlowJob.isCancelled, "Expected serverFlowJob to be cancelled")
175201

176202
assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed")
177203

178204
checkAlive(clientAlive = false, serverAlive = false)
179205
stopAllAndJoin()
180206

181-
assertEquals(2, serverInstance().cancellationsCounter.value, "Expected 2 requests to be cancelled")
207+
assertEquals(4, serverInstance().cancellationsCounter.value, "Expected 4 requests to be cancelled")
182208
}
183209

184210
@Test
@@ -193,23 +219,48 @@ class CancellationTest {
193219
secondService.longRequest()
194220
}
195221

196-
serverInstance().waitCounter.await(2) // wait for requests to reach server
222+
val clientFlowJob = launch {
223+
service.outgoingStream(flow {
224+
emit(0)
225+
serverInstance().fence.await()
226+
emit(1)
227+
})
228+
}
229+
230+
val serverFlowJob = launch {
231+
try {
232+
secondService.incomingStream().toList()
233+
fail("Expected cancellation of the client to cancel the server flow")
234+
} catch (e : CancellationException) {
235+
serverInstance().cancellationsCounter.increment()
236+
throw e
237+
}
238+
}
239+
240+
serverInstance().waitCounter.await(4) // wait for requests to reach server
197241
server.close()
198242
server.awaitCompletion()
199243
client.awaitCompletion()
200244
firstRequestJob.join()
201245
secondRequestJob.join()
202-
serverInstance().cancellationsCounter.await(2)
246+
clientFlowJob.join()
247+
248+
serverInstance().fence.complete(Unit)
249+
serverFlowJob.join()
250+
251+
serverInstance().cancellationsCounter.await(4)
203252

204253
assertTrue(firstRequestJob.isCancelled, "Expected firstRequestJob to be cancelled")
205254
assertTrue(secondRequestJob.isCancelled, "Expected secondRequestJob to be cancelled")
255+
assertTrue(clientFlowJob.isCancelled, "Expected clientFlowJob to be cancelled")
256+
assertTrue(serverFlowJob.isCancelled, "Expected serverFlowJob to be cancelled")
206257

207258
assertEquals(0, serverInstances.sumOf { it.successCounter.value }, "Expected no requests to succeed")
208259

209260
checkAlive(clientAlive = false, serverAlive = false)
210261
stopAllAndJoin()
211262

212-
assertEquals(2, serverInstance().cancellationsCounter.value, "Expected 2 requests to be cancelled")
263+
assertEquals(4, serverInstance().cancellationsCounter.value, "Expected 4 requests to be cancelled")
213264
}
214265

215266
@Test

0 commit comments

Comments
 (0)