Skip to content

Commit fb3ec27

Browse files
committed
revert some changes
1 parent 0cbf25f commit fb3ec27

File tree

7 files changed

+36
-30
lines changed

7 files changed

+36
-30
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/Connection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ internal abstract class Connection2(
3838

3939
// connection establishment part
4040

41-
abstract suspend fun establishConnection(handler: ConnectionEstablishmentHandler<*>): ConnectionConfig
41+
abstract suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig
4242

4343
// setup completed, start handling requests
4444
abstract suspend fun handleConnection(inbound: ConnectionInbound)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/ConnectionEstablishmentHandler.kt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import kotlinx.coroutines.*
2626
import kotlin.coroutines.*
2727

2828
@RSocketTransportApi
29-
internal abstract class ConnectionEstablishmentHandler<T>(
29+
internal abstract class ConnectionEstablishmentHandler(
3030
private val isClient: Boolean,
3131
private val frameCodec: FrameCodec,
3232
private val connectionAcceptor: ConnectionAcceptor,
3333
private val interceptors: Interceptors,
34-
) : RSocketConnectionInitializer<T> {
34+
private val requesterDeferred: CompletableDeferred<RSocket>?,
35+
) : RSocketConnectionInitializer<Unit> {
3536
abstract suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig
36-
protected abstract fun transform(requester: RSocket): T
3737

3838
private suspend fun wrapConnection(
3939
connection: RSocketConnection,
@@ -54,8 +54,8 @@ internal abstract class ConnectionEstablishmentHandler<T>(
5454
}
5555

5656
@Suppress("SuspendFunctionOnCoroutineScope")
57-
private suspend fun CoroutineScope.handleConnection(connection: Connection2): T {
58-
return try {
57+
private suspend fun CoroutineScope.handleConnection(connection: Connection2) {
58+
try {
5959
val connectionConfig = connection.establishConnection(this@ConnectionEstablishmentHandler)
6060
try {
6161
val requester = interceptors.wrapRequester(connection)
@@ -77,14 +77,12 @@ internal abstract class ConnectionEstablishmentHandler<T>(
7777
responder.cancel("Connection closed", cause)
7878
}
7979

80-
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, connection, this)
81-
connection.launch {
82-
connection.handleConnection(
83-
ConnectionInbound(connection.coroutineContext, responder, keepAliveHandler)
84-
)
85-
}
80+
requesterDeferred?.complete(requester)
8681

87-
transform(requester)
82+
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, connection, this)
83+
connection.handleConnection(
84+
ConnectionInbound(connection.coroutineContext, responder, keepAliveHandler)
85+
)
8886
} catch (cause: Throwable) {
8987
connectionConfig.setupPayload.close()
9088
throw cause
@@ -103,7 +101,7 @@ internal abstract class ConnectionEstablishmentHandler<T>(
103101
}
104102
}
105103

106-
override suspend fun RSocketConnection.initialize(): T {
107-
return handleConnection(wrapConnection(this, coroutineContext.supervisorContext()))
104+
override suspend fun RSocketConnection.initialize() {
105+
handleConnection(wrapConnection(this, coroutineContext.supervisorContext()))
108106
}
109107
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/MultiplexedConnection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal class MultiplexedConnection(
4040
storage.clear()
4141
}
4242

43-
override suspend fun establishConnection(handler: ConnectionEstablishmentHandler<*>): ConnectionConfig {
43+
override suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig {
4444
return handler.establishConnection(EstablishmentContext())
4545
}
4646

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/OldConnection.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ internal class OldConnection(
3939
init {
4040
@OptIn(DelicateCoroutinesApi::class)
4141
launch(start = CoroutineStart.ATOMIC) {
42-
val outboundJob = launch {
42+
launch {
4343
nonCancellable {
4444
while (true) {
4545
connection.send(outboundQueue.dequeueFrame() ?: break)
@@ -52,10 +52,7 @@ internal class OldConnection(
5252
try {
5353
awaitCancellation()
5454
} finally {
55-
nonCancellable {
56-
outboundQueue.close()
57-
outboundJob.join()
58-
}
55+
outboundQueue.close()
5956
}
6057
}
6158
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/SequentialConnection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ internal class SequentialConnection(
3838
storage.clear().forEach { it.close() }
3939
}
4040

41-
override suspend fun establishConnection(handler: ConnectionEstablishmentHandler<*>): ConnectionConfig {
41+
override suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig {
4242
return handler.establishConnection(EstablishmentContext())
4343
}
4444

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.connection.*
2121
import io.rsocket.kotlin.frame.*
2222
import io.rsocket.kotlin.frame.io.*
23+
import io.rsocket.kotlin.internal.io.*
2324
import io.rsocket.kotlin.logging.*
2425
import io.rsocket.kotlin.transport.*
26+
import kotlinx.coroutines.*
2527
import kotlin.coroutines.*
2628

2729
@OptIn(RSocketTransportApi::class, RSocketLoggingApi::class)
@@ -58,16 +60,27 @@ public class RSocketConnector internal constructor(
5860
}
5961

6062
private suspend fun connectOnce(transport: RSocketClientTarget): RSocket {
61-
return SetupConnection().runInitializer(transport.connectClient().logging(frameLogger))
63+
val requesterDeferred = CompletableDeferred<RSocket>()
64+
val connectJob = SetupConnection(requesterDeferred).launchInitializer(
65+
transport.connectClient().logging(frameLogger)
66+
).onCompletion {
67+
if (it != null) requesterDeferred.completeExceptionally(it)
68+
}
69+
return try {
70+
requesterDeferred.await()
71+
} catch (cause: Throwable) {
72+
connectJob.cancel("RSocketConnector.connect was cancelled", cause)
73+
throw cause
74+
}
6275
}
6376

64-
private inner class SetupConnection : ConnectionEstablishmentHandler<RSocket>(
77+
private inner class SetupConnection(requesterDeferred: CompletableDeferred<RSocket>) : ConnectionEstablishmentHandler(
6578
isClient = true,
6679
frameCodec = FrameCodec(maxFragmentSize),
6780
connectionAcceptor = acceptor,
68-
interceptors = interceptors
81+
interceptors = interceptors,
82+
requesterDeferred = requesterDeferred
6983
) {
70-
override fun transform(requester: RSocket): RSocket = requester
7184
override suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig {
7285
val connectionConfig = connectionConfigProvider()
7386
try {
@@ -86,6 +99,5 @@ public class RSocketConnector internal constructor(
8699
}
87100
return connectionConfig
88101
}
89-
90102
}
91103
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,13 @@ public class RSocketServer internal constructor(
6363
AcceptConnection(acceptor).launchInitializer(connection.logging(frameLogger))
6464
}
6565

66-
private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionEstablishmentHandler<Unit>(
66+
private inner class AcceptConnection(acceptor: ConnectionAcceptor) : ConnectionEstablishmentHandler(
6767
isClient = false,
6868
frameCodec = FrameCodec(maxFragmentSize),
6969
connectionAcceptor = acceptor,
7070
interceptors = interceptors,
71+
requesterDeferred = null
7172
) {
72-
override fun transform(requester: RSocket) = Unit
73-
7473
override suspend fun establishConnection(context: ConnectionEstablishmentContext): ConnectionConfig {
7574
val setupFrame = context.receiveFrame()
7675
return try {

0 commit comments

Comments
 (0)