Skip to content

Commit 6d263e7

Browse files
committed
improve connection logic
1 parent 6fd2e34 commit 6d263e7

File tree

15 files changed

+206
-260
lines changed

15 files changed

+206
-260
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentContext.kt

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,23 +20,16 @@ import io.rsocket.kotlin.frame.*
2020
import io.rsocket.kotlin.frame.io.*
2121
import io.rsocket.kotlin.keepalive.*
2222
import io.rsocket.kotlin.payload.*
23-
import io.rsocket.kotlin.transport.*
2423
import kotlinx.io.*
2524

2625
// send/receive setup, resume, resume ok, lease, error
27-
@RSocketTransportApi
2826
internal abstract class ConnectionEstablishmentContext(
29-
private val frameCodec: FrameCodec,
27+
protected val frameCodec: FrameCodec,
3028
) {
31-
protected abstract suspend fun receiveFrameRaw(): Buffer?
32-
protected abstract suspend fun sendFrame(frame: Buffer)
33-
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
29+
protected abstract suspend fun receiveConnectionFrameRaw(): Buffer?
30+
protected abstract suspend fun sendConnectionFrameRaw(frame: Buffer)
3431

35-
// only setup|lease|resume|resume_ok|error frames
36-
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
37-
expectedStreamId = 0,
38-
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
39-
)
32+
protected suspend fun sendFrameConnectionFrame(frame: Frame): Unit = sendConnectionFrameRaw(frameCodec.encodeFrame(frame))
4033

4134
suspend fun sendSetup(
4235
version: Version,
@@ -45,5 +38,11 @@ internal abstract class ConnectionEstablishmentContext(
4538
resumeToken: Buffer?,
4639
payloadMimeType: PayloadMimeType,
4740
payload: Payload,
48-
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
41+
): Unit = sendFrameConnectionFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
42+
43+
// only setup|lease|resume|resume_ok|error frames
44+
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
45+
expectedStreamId = 0,
46+
frame = receiveConnectionFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
47+
)
4948
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionInbound.kt

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,20 +18,15 @@ package io.rsocket.kotlin.connection
1818

1919
import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.frame.*
21-
import io.rsocket.kotlin.keepalive.*
2221
import io.rsocket.kotlin.operation.*
23-
import io.rsocket.kotlin.transport.*
2422
import kotlinx.coroutines.*
2523
import kotlinx.io.*
26-
import kotlin.coroutines.*
2724

28-
@RSocketTransportApi
2925
internal class ConnectionInbound(
30-
// requestContext
31-
override val coroutineContext: CoroutineContext,
26+
private val requestsScope: CoroutineScope,
3227
private val responder: RSocket,
3328
private val keepAliveHandler: KeepAliveHandler,
34-
) : CoroutineScope {
29+
) {
3530
fun handleFrame(frame: Frame): Unit = when (frame) {
3631
is MetadataPushFrame -> receiveMetadataPush(frame.metadata)
3732
is KeepAliveFrame -> receiveKeepAlive(frame.respond, frame.data, frame.lastPosition)
@@ -42,9 +37,9 @@ internal class ConnectionInbound(
4237
}
4338

4439
private fun receiveMetadataPush(metadata: Buffer) {
45-
launch {
40+
requestsScope.launch {
4641
responder.metadataPush(metadata)
47-
}.invokeOnCompletion { metadata.close() }
42+
}.invokeOnCompletion { metadata.clear() }
4843
}
4944

5045
@Suppress("UNUSED_PARAMETER") // will be used later
Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,59 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.core.*
2121
import io.rsocket.kotlin.frame.*
2222
import io.rsocket.kotlin.internal.io.*
23-
import io.rsocket.kotlin.keepalive.*
2423
import io.rsocket.kotlin.transport.*
2524
import kotlinx.coroutines.*
26-
import kotlin.coroutines.*
2725

2826
@RSocketTransportApi
29-
internal abstract class ConnectionEstablishmentHandler(
27+
internal abstract class ConnectionInitializer(
3028
private val isClient: Boolean,
3129
private val frameCodec: FrameCodec,
3230
private val connectionAcceptor: ConnectionAcceptor,
3331
private val interceptors: Interceptors,
34-
private val requesterDeferred: CompletableDeferred<RSocket>?,
35-
) : RSocketConnectionInitializer<Unit> {
36-
abstract suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig
32+
) {
33+
protected abstract suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig
3734

3835
private suspend fun wrapConnection(
3936
connection: RSocketConnection,
40-
requestContext: CoroutineContext,
41-
): Connection2 = when (connection) {
37+
requestsScope: CoroutineScope,
38+
): ConnectionOutbound = when (connection) {
4239
is RSocketMultiplexedConnection -> {
4340
val initialStream = when {
4441
isClient -> connection.createStream()
4542
else -> connection.acceptStream() ?: error("Initial stream should be received")
4643
}
4744
initialStream.setSendPriority(0)
48-
MultiplexedConnection(isClient, frameCodec, requestContext, connection, initialStream)
45+
MultiplexedConnection(isClient, frameCodec, connection, initialStream, requestsScope)
4946
}
5047

5148
is RSocketSequentialConnection -> {
52-
SequentialConnection(isClient, frameCodec, requestContext, connection)
49+
SequentialConnection(isClient, frameCodec, connection, requestsScope)
5350
}
5451
}
5552

56-
@Suppress("SuspendFunctionOnCoroutineScope")
57-
private suspend fun CoroutineScope.handleConnection(connection: Connection2) {
53+
private suspend fun initialize(connection: RSocketConnection): RSocket {
54+
val requestsScope = CoroutineScope(connection.coroutineContext.supervisorContext())
55+
val outbound = wrapConnection(connection, requestsScope)
56+
val connectionJob = connection.launch(start = CoroutineStart.ATOMIC) {
57+
try {
58+
awaitCancellation()
59+
} catch (cause: Throwable) {
60+
if (connection.isActive) {
61+
nonCancellable {
62+
outbound.sendError(RSocketError.ConnectionError(cause.message ?: "Connection failed"))
63+
}
64+
connection.cancel("Connection failed", cause)
65+
}
66+
throw cause
67+
}
68+
}
69+
val connectionScope = CoroutineScope(connection.coroutineContext + connectionJob)
5870
try {
59-
val connectionConfig = connection.establishConnection(this@ConnectionEstablishmentHandler)
71+
val connectionConfig = establishConnection(outbound)
6072
try {
61-
val requester = interceptors.wrapRequester(connection)
73+
val requester = interceptors.wrapRequester(
74+
RequesterRSocket(requestsScope, outbound)
75+
)
6276
val responder = interceptors.wrapResponder(
6377
with(interceptors.wrapAcceptor(connectionAcceptor)) {
6478
ConnectionAcceptorContext(connectionConfig, requester).accept()
@@ -67,49 +81,57 @@ internal abstract class ConnectionEstablishmentHandler(
6781

6882
// link completing of requester, connection and requestHandler
6983
requester.coroutineContext.job.invokeOnCompletion {
70-
coroutineContext.job.cancel("Requester cancelled", it)
84+
connectionJob.cancel("Requester cancelled", it)
7185
}
7286
responder.coroutineContext.job.invokeOnCompletion {
73-
coroutineContext.job.cancel("Responder cancelled", it)
87+
connectionJob.cancel("Responder cancelled", it)
7488
}
75-
coroutineContext.job.invokeOnCompletion { cause ->
89+
connectionJob.invokeOnCompletion { cause ->
7690
// the responder is not linked to `coroutineContext`
7791
responder.cancel("Connection closed", cause)
7892
}
7993

80-
requesterDeferred?.complete(requester)
81-
82-
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, connection, this)
83-
connection.handleConnection(
84-
ConnectionInbound(connection.coroutineContext, responder, keepAliveHandler)
85-
)
94+
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, outbound, connectionScope)
95+
connectionScope.launch {
96+
outbound.handleConnection(ConnectionInbound(requestsScope, responder, keepAliveHandler))
97+
}
98+
return requester
8699
} catch (cause: Throwable) {
87100
connectionConfig.setupPayload.close()
88101
throw cause
89102
}
90103
} catch (cause: Throwable) {
91-
connection.close()
92104
nonCancellable {
93-
connection.sendError(
105+
outbound.sendError(
94106
when (cause) {
95107
is RSocketError -> cause
96-
else -> RSocketError.ConnectionError(cause.message ?: "Connection failed")
108+
else -> RSocketError.ConnectionError(cause.message ?: "Connection establishment failed")
97109
}
98110
)
99111
}
100112
throw cause
101113
}
102114
}
103115

104-
override suspend fun initialize(connection: RSocketConnection) {
116+
private fun asyncInitializer(connection: RSocketConnection): Deferred<RSocket> = connection.async {
105117
try {
106-
coroutineScope {
107-
handleConnection(wrapConnection(connection, coroutineContext.supervisorContext()))
108-
}
109-
connection.cancel()
118+
initialize(connection)
110119
} catch (cause: Throwable) {
111-
connection.cancel("Connection closed", cause)
120+
connection.cancel("Connection initialization failed", cause)
112121
throw cause
113122
}
114123
}
124+
125+
suspend fun runInitializer(connection: RSocketConnection): RSocket {
126+
val result = asyncInitializer(connection)
127+
try {
128+
result.join()
129+
} catch (cause: Throwable) {
130+
connection.cancel("Connection initialization cancelled", cause)
131+
throw cause
132+
}
133+
return result.await()
134+
}
135+
136+
fun launchInitializer(connection: RSocketConnection): Job = asyncInitializer(connection)
115137
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2015-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.connection
18+
19+
import io.rsocket.kotlin.frame.*
20+
import io.rsocket.kotlin.operation.*
21+
import io.rsocket.kotlin.payload.*
22+
import kotlinx.coroutines.*
23+
import kotlinx.io.*
24+
25+
internal abstract class ConnectionOutbound(
26+
frameCodec: FrameCodec,
27+
) : ConnectionEstablishmentContext(frameCodec) {
28+
suspend fun sendError(cause: Throwable) {
29+
sendFrameConnectionFrame(ErrorFrame(0, cause))
30+
}
31+
32+
suspend fun sendMetadataPush(metadata: Buffer) {
33+
sendFrameConnectionFrame(MetadataPushFrame(metadata))
34+
}
35+
36+
suspend fun sendKeepAlive(respond: Boolean, data: Buffer, lastPosition: Long) {
37+
sendFrameConnectionFrame(KeepAliveFrame(respond, lastPosition, data))
38+
}
39+
40+
abstract suspend fun handleConnection(inbound: ConnectionInbound)
41+
42+
abstract fun launchRequest(requestPayload: Payload, operation: RequesterOperation): Job
43+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt renamed to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/KeepAliveHandler.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,21 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.kotlin.keepalive
17+
package io.rsocket.kotlin.connection
1818

1919
import io.rsocket.kotlin.*
20-
import io.rsocket.kotlin.connection.*
21-
import io.rsocket.kotlin.frame.io.*
22-
import io.rsocket.kotlin.transport.*
20+
import io.rsocket.kotlin.keepalive.*
2321
import kotlinx.atomicfu.*
2422
import kotlinx.coroutines.*
2523
import kotlinx.io.*
2624
import kotlin.time.*
2725

28-
@RSocketTransportApi
2926
internal class KeepAliveHandler(
3027
private val keepAlive: KeepAlive,
31-
private val connection2: Connection2,
28+
private val outbound: ConnectionOutbound,
3229
private val connectionScope: CoroutineScope,
3330
) {
3431
private val initial = TimeSource.Monotonic.markNow()
@@ -44,7 +41,9 @@ internal class KeepAliveHandler(
4441
if (currentDelayMillis() - lastMark.value >= keepAlive.maxLifetimeMillis)
4542
throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms")
4643

47-
connection2.sendKeepAlive(true, EmptyBuffer, 0)
44+
outbound.sendKeepAlive(true, Buffer().apply {
45+
writeString(currentDelayMillis().toString())
46+
}, 0)
4847
}
4948
}
5049
}
@@ -53,7 +52,7 @@ internal class KeepAliveHandler(
5352
lastMark.value = currentDelayMillis()
5453
// in most cases it will be possible to not suspend at all
5554
if (respond) connectionScope.launch(start = CoroutineStart.UNDISPATCHED) {
56-
connection2.sendKeepAlive(false, data, 0)
55+
outbound.sendKeepAlive(false, data, 0)
5756
}
5857
}
5958
}

0 commit comments

Comments
 (0)