Skip to content

Commit 4752325

Browse files
committed
minor netty quic cleanup
1 parent 40e6955 commit 4752325

File tree

6 files changed

+47
-307
lines changed

6 files changed

+47
-307
lines changed

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/DebugLoggingHandler.kt

Lines changed: 0 additions & 235 deletions
This file was deleted.

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicAttributes.kt

Lines changed: 0 additions & 37 deletions
This file was deleted.

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,20 @@ private class NettyQuicClientTargetImpl(
149149
coroutineContext.ensureActive()
150150

151151
val channel = QuicChannel.newBootstrap(
152-
bootstrap.clone()
153-
.attr(ATTRIBUTE_TRANSPORT_CONTEXT, this@NettyQuicClientTargetImpl.coroutineContext)
154-
.bind().awaitChannel()
152+
bootstrap.bind().awaitChannel()
155153
)
156154
.apply(quicBootstrap)
157-
.handler(NettyQuicConnectionInitializer)
155+
.handler(
156+
NettyQuicConnectionInitializer(
157+
parentContext = coroutineContext,
158+
onConnection = null
159+
)
160+
)
158161
.streamHandler(NettyQuicStreamInitializer)
159162
.remoteAddress(remoteAddress)
160163
.connect()
161164
.awaitFuture()
162165

163-
return channel.attr(ATTRIBUTE_CONNECTION).get()
166+
return channel.attr(NettyQuicConnection.ATTRIBUTE).get()
164167
}
165168
}

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnection.kt

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package io.rsocket.kotlin.transport.netty.quic
1818

1919
import io.netty.channel.*
2020
import io.netty.incubator.codec.quic.*
21+
import io.netty.util.*
2122
import io.rsocket.kotlin.internal.io.*
2223
import io.rsocket.kotlin.transport.*
2324
import io.rsocket.kotlin.transport.netty.internal.*
@@ -31,10 +32,11 @@ internal class NettyQuicConnection(
3132
private val channel: QuicChannel,
3233
private val isClient: Boolean,
3334
) : RSocketMultiplexedConnection, ChannelInboundHandlerAdapter() {
34-
private val inboundStreams = Channel<RSocketMultiplexedConnection.Stream>(Channel.UNLIMITED)
3535
override val coroutineContext: CoroutineContext = parentContext.childContext() + channel.eventLoop().asCoroutineDispatcher()
3636
private val streamsContext = coroutineContext.supervisorContext()
3737

38+
private val inboundStreams = Channel<RSocketMultiplexedConnection.Stream>(Channel.UNLIMITED)
39+
3840
init {
3941
@OptIn(DelicateCoroutinesApi::class)
4042
launch(start = CoroutineStart.ATOMIC) {
@@ -55,7 +57,7 @@ internal class NettyQuicConnection(
5557

5658
fun initStreamChannel(streamChannel: QuicStreamChannel) {
5759
val stream = NettyQuicStream(streamsContext, streamChannel)
58-
streamChannel.attr(ATTRIBUTE_STREAM).set(stream)
60+
streamChannel.attr(NettyQuicStream.ATTRIBUTE).set(stream)
5961
streamChannel.pipeline().addLast("rsocket-quic-stream", stream)
6062

6163
if (streamChannel.isLocalCreated) return
@@ -74,40 +76,36 @@ internal class NettyQuicConnection(
7476

7577
override suspend fun createStream(): RSocketMultiplexedConnection.Stream {
7678
val streamChannel = channel.createStream(QuicStreamType.BIDIRECTIONAL, NettyQuicStreamInitializer).awaitFuture()
77-
return streamChannel.attr(ATTRIBUTE_STREAM).get()
79+
return streamChannel.attr(NettyQuicStream.ATTRIBUTE).get()
7880
}
7981

8082
override suspend fun acceptStream(): RSocketMultiplexedConnection.Stream? {
8183
return inboundStreams.receiveCatching().getOrNull()
8284
}
85+
86+
companion object {
87+
val ATTRIBUTE: AttributeKey<NettyQuicConnection> = AttributeKey.newInstance<NettyQuicConnection>("rsocket-quic-connection")
88+
}
8389
}
8490

8591
@RSocketTransportApi
86-
internal object NettyQuicConnectionInitializer : ChannelInitializer<QuicChannel>() {
92+
internal class NettyQuicConnectionInitializer(
93+
private val parentContext: CoroutineContext,
94+
private val onConnection: ((RSocketConnection) -> Unit)?,
95+
) : ChannelInitializer<QuicChannel>() {
8796
override fun initChannel(channel: QuicChannel) {
88-
val acceptor = channel.parent().attr(ATTRIBUTE_CONNECTION_ACCEPTOR).get()
89-
val isClient = acceptor == null
90-
// val name = if (isClient) {
91-
// "CLIENT"
92-
// } else {
93-
// "SERVER"
94-
// }
95-
// channel.pipeline().addLast(DebugLoggingHandler("CONNECTION-$name"))
96-
9797
val connection = NettyQuicConnection(
98-
parentContext = channel.parent().attr(ATTRIBUTE_TRANSPORT_CONTEXT).get(),
98+
parentContext = parentContext,
9999
channel = channel,
100-
isClient = isClient
100+
isClient = onConnection == null
101101
)
102-
channel.attr(ATTRIBUTE_CONNECTION).set(connection)
102+
channel.attr(NettyQuicConnection.ATTRIBUTE).set(connection)
103103

104-
//addLast(LoggingHandler(if (isClient) "CLIENT" else "SERVER"))
105104
channel.pipeline().addLast(
106105
"rsocket-connection",
107106
connection
108107
)
109108

110-
// initialize is run only for server
111-
acceptor?.invoke(connection)
109+
onConnection?.invoke(connection)
112110
}
113111
}

0 commit comments

Comments
 (0)