Skip to content

Commit d8a8e04

Browse files
committed
Adapt core implementation to the latest change in transport API and improve code-organization a bit
1 parent 9b6a006 commit d8a8e04

31 files changed

+386
-417
lines changed

rsocket-core/api/rsocket-core.api

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,9 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
222222
}
223223

224224
public final class io/rsocket/kotlin/core/RSocketServer {
225+
public final fun acceptConnection (Lio/rsocket/kotlin/ConnectionAcceptor;Lio/rsocket/kotlin/transport/RSocketConnection;)V
225226
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
226227
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
227-
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
228228
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
229229
}
230230

@@ -751,31 +751,24 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
751751
}
752752

753753
public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
754-
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
754+
public abstract fun connectClient (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
755755
}
756756

757-
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
758-
}
759-
760-
public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
761-
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
757+
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection : kotlinx/coroutines/CoroutineScope {
762758
}
763759

764760
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
765761
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
766762
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
767763
}
768764

769-
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/lang/AutoCloseable {
770-
public abstract fun close ()V
771-
public abstract fun isClosedForSend ()Z
765+
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : kotlinx/coroutines/CoroutineScope {
772766
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
773767
public abstract fun sendFrame (Lkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
774768
public abstract fun setSendPriority (I)V
775769
}
776770

777771
public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
778-
public abstract fun isClosedForSend ()Z
779772
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
780773
public abstract fun sendFrame (ILkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
781774
}
@@ -784,7 +777,7 @@ public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstanc
784777
}
785778

786779
public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
787-
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
780+
public abstract fun startServer (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
788781
}
789782

790783
public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentContext.kt

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,23 +20,16 @@ import io.rsocket.kotlin.frame.*
2020
import io.rsocket.kotlin.frame.io.*
2121
import io.rsocket.kotlin.keepalive.*
2222
import io.rsocket.kotlin.payload.*
23-
import io.rsocket.kotlin.transport.*
2423
import kotlinx.io.*
2524

2625
// send/receive setup, resume, resume ok, lease, error
27-
@RSocketTransportApi
2826
internal abstract class ConnectionEstablishmentContext(
29-
private val frameCodec: FrameCodec,
27+
protected val frameCodec: FrameCodec,
3028
) {
31-
protected abstract suspend fun receiveFrameRaw(): Buffer?
32-
protected abstract suspend fun sendFrame(frame: Buffer)
33-
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
29+
protected abstract suspend fun receiveConnectionFrameRaw(): Buffer?
30+
protected abstract suspend fun sendConnectionFrameRaw(frame: Buffer)
3431

35-
// only setup|lease|resume|resume_ok|error frames
36-
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
37-
expectedStreamId = 0,
38-
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
39-
)
32+
protected suspend fun sendFrameConnectionFrame(frame: Frame): Unit = sendConnectionFrameRaw(frameCodec.encodeFrame(frame))
4033

4134
suspend fun sendSetup(
4235
version: Version,
@@ -45,5 +38,11 @@ internal abstract class ConnectionEstablishmentContext(
4538
resumeToken: Buffer?,
4639
payloadMimeType: PayloadMimeType,
4740
payload: Payload,
48-
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
41+
): Unit = sendFrameConnectionFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
42+
43+
// only setup|lease|resume|resume_ok|error frames
44+
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
45+
expectedStreamId = 0,
46+
frame = receiveConnectionFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
47+
)
4948
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentHandler.kt

Lines changed: 0 additions & 107 deletions
This file was deleted.

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,20 +18,15 @@ package io.rsocket.kotlin.connection
1818

1919
import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.frame.*
21-
import io.rsocket.kotlin.keepalive.*
2221
import io.rsocket.kotlin.operation.*
23-
import io.rsocket.kotlin.transport.*
2422
import kotlinx.coroutines.*
2523
import kotlinx.io.*
26-
import kotlin.coroutines.*
2724

28-
@RSocketTransportApi
2925
internal class ConnectionInbound(
30-
// requestContext
31-
override val coroutineContext: CoroutineContext,
26+
private val requestsScope: CoroutineScope,
3227
private val responder: RSocket,
3328
private val keepAliveHandler: KeepAliveHandler,
34-
) : CoroutineScope {
29+
) {
3530
fun handleFrame(frame: Frame): Unit = when (frame) {
3631
is MetadataPushFrame -> receiveMetadataPush(frame.metadata)
3732
is KeepAliveFrame -> receiveKeepAlive(frame.respond, frame.data, frame.lastPosition)
@@ -42,9 +37,9 @@ internal class ConnectionInbound(
4237
}
4338

4439
private fun receiveMetadataPush(metadata: Buffer) {
45-
launch {
40+
requestsScope.launch {
4641
responder.metadataPush(metadata)
47-
}.invokeOnCompletion { metadata.close() }
42+
}.invokeOnCompletion { metadata.clear() }
4843
}
4944

5045
@Suppress("UNUSED_PARAMETER") // will be used later
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2015-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.rsocket.kotlin.*
20+
import io.rsocket.kotlin.core.*
21+
import io.rsocket.kotlin.frame.*
22+
import io.rsocket.kotlin.internal.io.*
23+
import io.rsocket.kotlin.transport.*
24+
import kotlinx.coroutines.*
25+
26+
@RSocketTransportApi
27+
internal abstract class ConnectionInitializer(
28+
private val isClient: Boolean,
29+
private val frameCodec: FrameCodec,
30+
private val connectionAcceptor: ConnectionAcceptor,
31+
private val interceptors: Interceptors,
32+
) {
33+
protected abstract suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig
34+
35+
private suspend fun wrapConnection(
36+
connection: RSocketConnection,
37+
requestsScope: CoroutineScope,
38+
): ConnectionOutbound = when (connection) {
39+
is RSocketMultiplexedConnection -> {
40+
val initialStream = when {
41+
isClient -> connection.createStream()
42+
else -> connection.acceptStream() ?: error("Initial stream should be received")
43+
}
44+
initialStream.setSendPriority(0)
45+
MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope)
46+
}
47+
48+
is RSocketSequentialConnection -> {
49+
SequentialConnection(isClient, frameCodec, connection, requestsScope)
50+
}
51+
}
52+
53+
private suspend fun initialize(connection: RSocketConnection): RSocket {
54+
val requestsScope = CoroutineScope(connection.coroutineContext.supervisorContext())
55+
val outbound = wrapConnection(connection, requestsScope)
56+
val connectionJob = connection.launch(start = CoroutineStart.ATOMIC) {
57+
try {
58+
awaitCancellation()
59+
} catch (cause: Throwable) {
60+
if (connection.isActive) {
61+
nonCancellable {
62+
outbound.sendError(RSocketError.ConnectionError(cause.message ?: "Connection failed"))
63+
}
64+
connection.cancel("Connection failed", cause)
65+
}
66+
throw cause
67+
}
68+
}
69+
val connectionScope = CoroutineScope(connection.coroutineContext + connectionJob)
70+
try {
71+
val connectionConfig = establishConnection(outbound)
72+
try {
73+
val requester = interceptors.wrapRequester(
74+
RequesterRSocket(requestsScope, outbound)
75+
)
76+
val responder = interceptors.wrapResponder(
77+
with(interceptors.wrapAcceptor(connectionAcceptor)) {
78+
ConnectionAcceptorContext(connectionConfig, requester).accept()
79+
}
80+
)
81+
82+
// link completing of requester, connection and requestHandler
83+
requester.coroutineContext.job.invokeOnCompletion {
84+
connectionJob.cancel("Requester cancelled", it)
85+
}
86+
responder.coroutineContext.job.invokeOnCompletion {
87+
connectionJob.cancel("Responder cancelled", it)
88+
}
89+
connectionJob.invokeOnCompletion { cause ->
90+
// the responder is not linked to `coroutineContext`
91+
responder.cancel("Connection closed", cause)
92+
}
93+
94+
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, outbound, connectionScope)
95+
connectionScope.launch {
96+
outbound.handleConnection(ConnectionInbound(requestsScope, responder, keepAliveHandler))
97+
}
98+
return requester
99+
} catch (cause: Throwable) {
100+
connectionConfig.setupPayload.close()
101+
throw cause
102+
}
103+
} catch (cause: Throwable) {
104+
nonCancellable {
105+
outbound.sendError(
106+
when (cause) {
107+
is RSocketError -> cause
108+
else -> RSocketError.ConnectionError(cause.message ?: "Connection establishment failed")
109+
}
110+
)
111+
}
112+
throw cause
113+
}
114+
}
115+
116+
private fun asyncInitializer(connection: RSocketConnection): Deferred<RSocket> = connection.async {
117+
try {
118+
initialize(connection)
119+
} catch (cause: Throwable) {
120+
connection.cancel("Connection initialization failed", cause)
121+
throw cause
122+
}
123+
}
124+
125+
suspend fun runInitializer(connection: RSocketConnection): RSocket {
126+
val result = asyncInitializer(connection)
127+
try {
128+
result.join()
129+
} catch (cause: Throwable) {
130+
connection.cancel("Connection initialization cancelled", cause)
131+
throw cause
132+
}
133+
return result.await()
134+
}
135+
136+
fun launchInitializer(connection: RSocketConnection): Job = asyncInitializer(connection)
137+
}

0 commit comments

Comments
 (0)