Skip to content

Commit 6b98c0b

Browse files
authored
kRPC client initialization is not single shot (#385)
1 parent bc90d56 commit 6b98c0b

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import kotlinx.coroutines.flow.Flow
1111
import kotlinx.coroutines.flow.FlowCollector
1212
import kotlinx.coroutines.flow.first
1313
import kotlinx.coroutines.flow.flow
14+
import kotlinx.coroutines.sync.Mutex
15+
import kotlinx.coroutines.sync.withLock
1416
import kotlinx.rpc.RpcCall
1517
import kotlinx.rpc.RpcClient
1618
import kotlinx.rpc.annotations.Rpc
@@ -47,7 +49,7 @@ import kotlin.properties.Delegates
4749
public abstract class InitializedKrpcClient(
4850
private val config: KrpcConfig.Client,
4951
private val transport: KrpcTransport,
50-
): KrpcClient() {
52+
) : KrpcClient() {
5153
final override suspend fun initializeTransport(): KrpcTransport {
5254
return transport
5355
}
@@ -179,18 +181,28 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
179181
// callId to serviceTypeString
180182
private val cancellingRequests = RpcInternalConcurrentHashMap<String, String>()
181183

184+
private val transportInitializationLock = Mutex()
185+
182186
/**
183187
* Starts the handshake process and awaits for completion.
184188
* If the handshake was completed before, nothing happens.
185189
*/
186190
private suspend fun initializeAndAwaitHandshakeCompletion() {
187-
transport = initializeTransport()
188-
isTransportReady = true
191+
if (!isTransportReady) {
192+
transportInitializationLock.withLock {
193+
if (isTransportReady) {
194+
return@withLock
195+
}
189196

190-
connector.subscribeToGenericMessages(::handleGenericMessage)
191-
connector.subscribeToProtocolMessages(::handleProtocolMessage)
197+
transport = initializeTransport()
198+
isTransportReady = true
192199

193-
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
200+
connector.subscribeToGenericMessages(::handleGenericMessage)
201+
connector.subscribeToProtocolMessages(::handleProtocolMessage)
202+
203+
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
204+
}
205+
}
194206

195207
serverSupportedPlugins.await()
196208
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/TransportTest.kt

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import kotlinx.coroutines.test.TestResult
1010
import kotlinx.coroutines.test.TestScope
1111
import kotlinx.rpc.*
1212
import kotlinx.rpc.annotations.Rpc
13+
import kotlinx.rpc.krpc.KrpcConfig
1314
import kotlinx.rpc.krpc.KrpcConfigBuilder
15+
import kotlinx.rpc.krpc.KrpcTransport
16+
import kotlinx.rpc.krpc.client.KrpcClient
17+
import kotlinx.rpc.krpc.internal.KrpcProtocolMessage
1418
import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger
1519
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
1620
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
@@ -76,21 +80,24 @@ class TransportTest {
7680
return KrpcTestServer(serverConfig, localTransport.server)
7781
}
7882

79-
private fun runTest(block: suspend TestScope.() -> Unit): TestResult =
83+
private fun runTest(block: suspend TestScope.(logs: List<String>) -> Unit): TestResult =
8084
kotlinx.coroutines.test.runTest(timeout = 20.seconds) {
8185
debugCoroutines()
8286

8387
val logger = RpcInternalCommonLogger.logger("TransportTest")
8488

89+
val logs = mutableListOf<String>()
8590
RpcInternalDumpLoggerContainer.set(object : RpcInternalDumpLogger {
8691
override val isEnabled: Boolean = true
8792

8893
override fun dump(vararg tags: String, message: () -> String) {
89-
logger.info { "${tags.joinToString(" ") { "[$it]" }} ${message()}" }
94+
val message = "${tags.joinToString(" ") { "[$it]" }} ${message()}"
95+
logs.add(message)
96+
logger.info { message }
9097
}
9198
})
9299

93-
block()
100+
block(logs)
94101

95102
RpcInternalDumpLoggerContainer.set(null)
96103
}
@@ -240,6 +247,38 @@ class TransportTest {
240247
transports.cancel()
241248
}
242249

250+
private val clientHandshake = ".*\\[Client] \\[Send] \\{\"type\":\"${KrpcProtocolMessage.Handshake.serializer().descriptor.serialName}\".*+".toRegex()
251+
252+
@Test
253+
fun transportInitializedOnlyOnce() = runTest { logs ->
254+
val localTransport = LocalTransport()
255+
var transportInitialized = 0
256+
var configInitialized = 0
257+
val client = object : KrpcClient() {
258+
override suspend fun initializeTransport(): KrpcTransport {
259+
transportInitialized++
260+
return localTransport.client
261+
}
262+
263+
override fun initializeConfig(): KrpcConfig.Client {
264+
configInitialized++
265+
return clientConfig
266+
}
267+
}
268+
269+
val server = serverOf(localTransport)
270+
271+
server.registerServiceAndReturn<Echo, _> { EchoImpl() }
272+
server.registerServiceAndReturn<Second, _> { SecondServer() }
273+
274+
client.withService<Echo>().apply { echo("foo"); echo("bar") }
275+
client.withService<Second>().apply{ second("bar"); second("baz") }
276+
277+
assertEquals(1, transportInitialized)
278+
assertEquals(1, configInitialized)
279+
assertEquals(1, logs.count { it.matches(clientHandshake) })
280+
}
281+
243282
private inline fun <@Rpc reified Service : Any, reified Impl : Service> RpcServer.registerServiceAndReturn(
244283
crossinline body: () -> Impl,
245284
): List<Impl> {

0 commit comments

Comments
 (0)