11/*
2- * Copyright 2015-2024 the original author or authors.
2+ * Copyright 2015-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -22,174 +22,159 @@ import io.rsocket.kotlin.transport.internal.*
2222import kotlinx.coroutines.*
2323import kotlinx.coroutines.channels.*
2424import kotlinx.io.*
25+ import kotlin.coroutines.*
2526
2627internal sealed class LocalServerConnector {
2728 @RSocketTransportApi
28- abstract fun connect (
29- clientScope : CoroutineScope ,
30- clientHandler : RSocketConnectionHandler ,
31- serverScope : CoroutineScope ,
32- serverHandler : RSocketConnectionHandler ,
33- ): Job
34-
35- internal class Sequential (
36- private val prioritizationQueueBuffersCapacity : Int ,
37- ) : LocalServerConnector() {
38-
39- @RSocketTransportApi
40- override fun connect (
41- clientScope : CoroutineScope ,
42- clientHandler : RSocketConnectionHandler ,
43- serverScope : CoroutineScope ,
44- serverHandler : RSocketConnectionHandler ,
45- ): Job {
46- val clientToServer = PrioritizationFrameQueue (prioritizationQueueBuffersCapacity)
47- val serverToClient = PrioritizationFrameQueue (prioritizationQueueBuffersCapacity)
48-
49- launchLocalConnection(serverScope, serverToClient, clientToServer, serverHandler)
50- return launchLocalConnection(clientScope, clientToServer, serverToClient, clientHandler)
51- }
29+ abstract suspend fun connect (
30+ clientContext : CoroutineContext ,
31+ serverContext : CoroutineContext ,
32+ onConnection : (RSocketConnection ) -> Unit ,
33+ ): RSocketConnection
5234
35+ object Sequential : LocalServerConnector() {
5336 @RSocketTransportApi
54- private fun launchLocalConnection (
55- scope : CoroutineScope ,
56- outbound : PrioritizationFrameQueue ,
57- inbound : PrioritizationFrameQueue ,
58- handler : RSocketConnectionHandler ,
59- ): Job = scope.launch {
60- handler.handleConnection(Connection (outbound, inbound))
61- }.onCompletion {
62- outbound.close()
63- inbound.cancel()
37+ override suspend fun connect (
38+ clientContext : CoroutineContext ,
39+ serverContext : CoroutineContext ,
40+ onConnection : (RSocketConnection ) -> Unit ,
41+ ): RSocketConnection {
42+ val frames = Frames ()
43+ onConnection(Connection (serverContext.childContext(), frames.clientToServer, frames.serverToClient))
44+ return Connection (clientContext.childContext(), frames.serverToClient, frames.clientToServer)
6445 }
6546
6647 @RSocketTransportApi
6748 private class Connection (
68- private val outbound : PrioritizationFrameQueue ,
69- private val inbound : PrioritizationFrameQueue ,
49+ override val coroutineContext : CoroutineContext ,
50+ private val incomingFrames : ReceiveChannel <Buffer >,
51+ private val outgoingFrames : SendChannel <Buffer >,
7052 ) : RSocketSequentialConnection {
71- override val isClosedForSend: Boolean get() = outbound.isClosedForSend
53+ private val outboundQueue = PrioritizationFrameQueue ()
54+
55+ init {
56+ @OptIn(DelicateCoroutinesApi ::class )
57+ launch(start = CoroutineStart .ATOMIC ) {
58+ launch {
59+ nonCancellable {
60+ while (true ) outgoingFrames.send(outboundQueue.dequeueFrame() ? : break )
61+ }
62+ }.invokeOnCompletion {
63+ outboundQueue.cancel()
64+ outgoingFrames.close()
65+ }
66+ try {
67+ awaitCancellation()
68+ } finally {
69+ outboundQueue.close()
70+ incomingFrames.cancel()
71+ }
72+ }
73+ }
7274
7375 override suspend fun sendFrame (streamId : Int , frame : Buffer ) {
74- return outbound .enqueueFrame(streamId, frame)
76+ return outboundQueue .enqueueFrame(streamId, frame)
7577 }
7678
7779 override suspend fun receiveFrame (): Buffer ? {
78- return inbound.dequeueFrame ()
80+ return incomingFrames.receiveCatching().getOrNull ()
7981 }
8082 }
8183 }
8284
83- // TODO: better parameters naming
84- class Multiplexed (
85- private val streamsQueueCapacity : Int ,
86- private val streamBufferCapacity : Int ,
87- ) : LocalServerConnector() {
85+ object Multiplexed : LocalServerConnector() {
8886 @RSocketTransportApi
89- override fun connect (
90- clientScope : CoroutineScope ,
91- clientHandler : RSocketConnectionHandler ,
92- serverScope : CoroutineScope ,
93- serverHandler : RSocketConnectionHandler ,
94- ): Job {
95- val streams = Streams (streamsQueueCapacity)
96-
97- launchLocalConnection(serverScope, streams.serverToClient, streams.clientToServer, serverHandler)
98- return launchLocalConnection(clientScope, streams.clientToServer, streams.serverToClient, clientHandler)
99- }
100-
101- @RSocketTransportApi
102- private fun launchLocalConnection (
103- scope : CoroutineScope ,
104- outbound : SendChannel <Frames >,
105- inbound : ReceiveChannel <Frames >,
106- handler : RSocketConnectionHandler ,
107- ): Job = scope.launch {
108- handler.handleConnection(Connection (SupervisorJob (coroutineContext.job), outbound, inbound, streamBufferCapacity))
109- }.onCompletion {
110- outbound.close()
111- inbound.cancel()
87+ override suspend fun connect (
88+ clientContext : CoroutineContext ,
89+ serverContext : CoroutineContext ,
90+ onConnection : (RSocketConnection ) -> Unit ,
91+ ): RSocketConnection {
92+ val streams = Streams ()
93+ onConnection(Connection (serverContext.childContext(), streams.clientToServer, streams.serverToClient))
94+ return Connection (clientContext.childContext(), streams.serverToClient, streams.clientToServer)
11295 }
11396
11497 @RSocketTransportApi
11598 private class Connection (
116- private val streamsJob : Job ,
117- private val outbound : SendChannel <Frames >,
118- private val inbound : ReceiveChannel <Frames >,
119- private val streamBufferCapacity : Int ,
99+ override val coroutineContext : CoroutineContext ,
100+ private val incomingStreams : ReceiveChannel <Frames >,
101+ private val outgoingStreams : SendChannel <Frames >,
120102 ) : RSocketMultiplexedConnection {
121- override suspend fun createStream (): RSocketMultiplexedConnection .Stream {
122- val frames = Frames (streamBufferCapacity)
103+ private val streamsContext = coroutineContext.supervisorContext()
123104
124- outbound.send(frames)
105+ init {
106+ coroutineContext.job.invokeOnCompletion {
107+ outgoingStreams.close()
108+ incomingStreams.cancel()
109+ }
110+ }
125111
112+ override suspend fun createStream (): RSocketMultiplexedConnection .Stream {
113+ val frames = Frames ()
114+ outgoingStreams.send(frames)
126115 return Stream (
127- parentJob = streamsJob ,
128- outbound = frames.clientToServer,
129- inbound = frames.serverToClient
116+ coroutineContext = streamsContext.childContext() ,
117+ incoming = frames.clientToServer,
118+ outgoing = frames.serverToClient
130119 )
131120 }
132121
133122 override suspend fun acceptStream (): RSocketMultiplexedConnection .Stream ? {
134- val frames = inbound.receiveCatching().getOrNull() ? : return null
135-
123+ val frames = incomingStreams.receiveCatching().getOrNull() ? : return null
136124 return Stream (
137- parentJob = streamsJob ,
138- outbound = frames.serverToClient,
139- inbound = frames.clientToServer
125+ coroutineContext = streamsContext.childContext() ,
126+ incoming = frames.serverToClient,
127+ outgoing = frames.clientToServer
140128 )
141129 }
142130 }
143131
144132 @RSocketTransportApi
145133 private class Stream (
146- parentJob : Job ,
147- private val outbound : SendChannel <Buffer >,
148- private val inbound : ReceiveChannel <Buffer >,
134+ override val coroutineContext : CoroutineContext ,
135+ private val incoming : ReceiveChannel <Buffer >,
136+ private val outgoing : SendChannel <Buffer >,
149137 ) : RSocketMultiplexedConnection.Stream {
150- private val streamJob = Job (parentJob).onCompletion {
151- outbound.close()
152- inbound.cancel()
138+ init {
139+ coroutineContext.job.invokeOnCompletion {
140+ outgoing.close()
141+ incoming.cancel()
142+ }
153143 }
154144
155- override fun close ( ) {
156- streamJob.complete()
145+ override fun setSendPriority ( priority : Int ) {
146+ // no-op
157147 }
158148
159- @OptIn(DelicateCoroutinesApi ::class )
160- override val isClosedForSend: Boolean get() = outbound.isClosedForSend
161-
162- override fun setSendPriority (priority : Int ) {}
163-
164149 override suspend fun sendFrame (frame : Buffer ) {
165- return outbound .send(frame)
150+ return outgoing .send(frame)
166151 }
167152
168153 override suspend fun receiveFrame (): Buffer ? {
169- return inbound .receiveCatching().getOrNull()
154+ return incoming .receiveCatching().getOrNull()
170155 }
171156 }
157+ }
158+ }
172159
173- private class Streams ( bufferCapacity : Int ) : AutoCloseable {
174- val clientToServer = channelForCloseable<Frames >(bufferCapacity )
175- val serverToClient = channelForCloseable<Frames >(bufferCapacity )
160+ private class Streams : AutoCloseable {
161+ val clientToServer = channelForCloseable<Frames >(Channel . BUFFERED )
162+ val serverToClient = channelForCloseable<Frames >(Channel . BUFFERED )
176163
177- // only for undelivered element case
178- override fun close () {
179- clientToServer.cancel()
180- serverToClient.cancel()
181- }
182- }
164+ // only for undelivered element case
165+ override fun close () {
166+ clientToServer.cancel()
167+ serverToClient.cancel()
168+ }
169+ }
183170
184- private class Frames ( bufferCapacity : Int ) : AutoCloseable {
185- val clientToServer = bufferChannel(bufferCapacity )
186- val serverToClient = bufferChannel(bufferCapacity )
171+ private class Frames : AutoCloseable {
172+ val clientToServer = bufferChannel(Channel . BUFFERED )
173+ val serverToClient = bufferChannel(Channel . BUFFERED )
187174
188- // only for undelivered element case
189- override fun close () {
190- clientToServer.cancel()
191- serverToClient.cancel()
192- }
193- }
175+ // only for undelivered element case
176+ override fun close () {
177+ clientToServer.cancel()
178+ serverToClient.cancel()
194179 }
195180}
0 commit comments