Skip to content

Commit c09ed4c

Browse files
committed
Various small fixes
1 parent 125f9ea commit c09ed4c

File tree

6 files changed

+63
-20
lines changed

6 files changed

+63
-20
lines changed

docs/pages/kotlinx-rpc/topics/0-8-0.topic

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,9 @@
262262
class MyClient(
263263
config: KrpcConfig,
264264
transport: KrpcTransport,
265-
) : InitializedKrpcClient(transport, config)
265+
) : InitializedKrpcClient(config, transport)
266266
</code-block>
267267
</compare>
268-
<note>
269-
Notice that the parameter order is reversed in new <code>InitializedKrpcClient</code>.
270-
</note>
271268
</li>
272269
</list>
273270
</chapter>

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ import kotlin.properties.Delegates
4545
* See [KrpcClient.initializeTransport].
4646
*/
4747
public abstract class InitializedKrpcClient(
48-
private val transport: KrpcTransport,
4948
private val config: KrpcConfig.Client,
49+
private val transport: KrpcTransport,
5050
): KrpcClient() {
5151
final override suspend fun initializeTransport(): KrpcTransport {
5252
return transport
@@ -80,6 +80,20 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
8080
*/
8181
protected abstract fun initializeConfig(): KrpcConfig.Client
8282

83+
/**
84+
* Close this client, removing all the services and stopping accepting messages.
85+
*/
86+
public fun close(message: String? = null) {
87+
internalScope.cancel(message ?: "Client closed")
88+
}
89+
90+
/**
91+
* Waits until the client is closed.
92+
*/
93+
public suspend fun awaitCompletion() {
94+
internalScope.coroutineContext.job.join()
95+
}
96+
8397
/*
8498
* #####################################################################
8599
* # #

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ public abstract class KrpcServer(
3737
transport: KrpcTransport,
3838
) : RpcServer, KrpcEndpoint {
3939

40+
/**
41+
* Close this server, removing all the services and stopping accepting messages.
42+
*/
43+
public fun close(message: String? = null) {
44+
internalScope.cancel(message ?: "Server closed")
45+
}
46+
47+
/**
48+
* Waits until the server is closed.
49+
*/
50+
public suspend fun awaitCompletion() {
51+
internalScope.coroutineContext.job.join()
52+
}
53+
4054
/*
4155
* #####################################################################
4256
* # #
@@ -119,6 +133,7 @@ public abstract class KrpcServer(
119133

120134
override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass<Service>) {
121135
connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName)
136+
rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)
122137
}
123138

124139
private fun <@Rpc Service : Any> createNewServiceInstance(

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ interface CancellationService {
2121

2222
suspend fun outgoingStream(stream: Flow<Int>)
2323

24+
suspend fun outgoingStreamAsync(stream: Flow<Int>)
25+
2426
suspend fun outgoingStreamWithDelayedResponse(stream: Flow<Int>)
2527

2628
suspend fun outgoingStreamWithException(stream: Flow<Int>)
@@ -57,6 +59,14 @@ class CancellationServiceImpl : CancellationService {
5759
consume(stream)
5860
}
5961

62+
@OptIn(DelicateCoroutinesApi::class)
63+
override suspend fun outgoingStreamAsync(stream: Flow<Int>) {
64+
GlobalScope.launch {
65+
consume(stream)
66+
}
67+
firstIncomingConsumed.await()
68+
}
69+
6070
override suspend fun outgoingStreamWithDelayedResponse(stream: Flow<Int>) {
6171
consume(stream)
6272

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class CancellationTest {
7676
}
7777

7878
unskippableDelay(150) // wait for requests to reach server
79-
client.internalScope.cancel()
79+
client.close()
8080
firstRequestJob.join()
8181
secondRequestJob.join()
8282

@@ -85,8 +85,8 @@ class CancellationTest {
8585

8686
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
8787

88-
client.internalScope.join()
89-
server.internalScope.join()
88+
client.awaitCompletion()
89+
server.awaitCompletion()
9090

9191
checkAlive(clientAlive = false, serverAlive = false)
9292
stopAllAndJoin()
@@ -105,7 +105,7 @@ class CancellationTest {
105105
}
106106

107107
unskippableDelay(150) // wait for requests to reach server
108-
server.internalScope.cancel()
108+
server.close()
109109
firstRequestJob.join()
110110
secondRequestJob.join()
111111

@@ -114,8 +114,8 @@ class CancellationTest {
114114

115115
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
116116

117-
client.internalScope.join()
118-
server.internalScope.join()
117+
client.awaitCompletion()
118+
server.awaitCompletion()
119119

120120
checkAlive(clientAlive = false, serverAlive = false)
121121
stopAllAndJoin()
@@ -141,6 +141,18 @@ class CancellationTest {
141141
stopAllAndJoin()
142142
}
143143

144+
@Test
145+
fun testOutgoingFlowLifetime() = runCancellationTest {
146+
val fence = CompletableDeferred<Unit>()
147+
148+
service.outgoingStreamAsync(resumableFlow(fence))
149+
150+
serverInstance().consumedAll.await()
151+
assertContentEquals(listOf(0), serverInstance().consumedIncomingValues)
152+
153+
stopAllAndJoin()
154+
}
155+
144156
@Test
145157
fun testStreamIncoming() = runCancellationTest {
146158
var first: Int = -1
@@ -220,8 +232,8 @@ class CancellationTest {
220232

221233
serverInstance().firstIncomingConsumed.await()
222234

223-
client.internalScope.cancel("Test request cancelled")
224-
client.internalScope.join()
235+
client.close("Test request cancelled")
236+
client.awaitCompletion()
225237

226238
serverInstance().consumedAll.await()
227239

@@ -239,7 +251,7 @@ class CancellationTest {
239251
caught = it
240252
}.collect {
241253
if (it == 0) {
242-
client.internalScope.cancel()
254+
client.close()
243255
} else {
244256
fail("Expected the request to fail with cancellation of the client")
245257
}
@@ -254,7 +266,7 @@ class CancellationTest {
254266
fun testCancelledClientCancelsRequest() = runCancellationTest {
255267
launch {
256268
serverInstance().firstIncomingConsumed.await()
257-
client.internalScope.cancel("Cancelled by test")
269+
client.close("Cancelled by test")
258270
}
259271

260272
try {
@@ -347,8 +359,5 @@ class CancellationTest {
347359
private val CoroutineScope.isCompleted get() = coroutineContext.job.isCompleted
348360
private val CoroutineScope.isCancelled get() = coroutineContext.job.isCancelled
349361

350-
@Suppress("SuspendFunctionOnCoroutineScope")
351-
private suspend fun CoroutineScope.join() = apply { coroutineContext.job.join() }
352-
353362
private suspend fun CancellationToolkit.stopAllAndJoin() = transport.coroutineContext.job.cancelAndJoin()
354363
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResul
2626
return runTest(timeout = 15.seconds) {
2727
debugCoroutines()
2828
CancellationToolkit(this).apply {
29-
client.initializeTransport()
30-
3129
body()
3230
}
3331
}

0 commit comments

Comments
 (0)