Skip to content

Commit 40e6955

Browse files
committed
minor netty tcp cleanup
1 parent 6d8f55b commit 40e6955

File tree

4 files changed

+59
-84
lines changed

4 files changed

+59
-84
lines changed

rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private class NettyTcpClientTransportBuilderImpl : NettyTcpClientTransportBuilde
9191
}
9292

9393
return NettyTcpClientTransportImpl(
94-
coroutineContext = context.supervisorContext(),
94+
coroutineContext = context.supervisorContext() + Dispatchers.Default,
9595
bootstrap = bootstrap,
9696
sslContext = sslContext,
9797
).also {
@@ -121,8 +121,16 @@ private class NettyTcpClientTargetImpl(
121121
bootstrap: Bootstrap,
122122
sslContext: SslContext?,
123123
remoteAddress: SocketAddress,
124-
) : RSocketClientTarget, NettyTcpConnectionChannelInitializer(coroutineContext, sslContext) {
125-
private val bootstrap = bootstrap.clone().handler(this).remoteAddress(remoteAddress)
124+
) : RSocketClientTarget {
125+
private val bootstrap = bootstrap.clone()
126+
.handler(
127+
NettyTcpConnectionInitializer(
128+
parentContext = coroutineContext,
129+
sslContext = sslContext,
130+
onConnection = null
131+
)
132+
)
133+
.remoteAddress(remoteAddress)
126134

127135
@RSocketTransportApi
128136
override suspend fun connectClient(): RSocketConnection {

rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpConnection.kt

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package io.rsocket.kotlin.transport.netty.tcp
1919
import io.netty.buffer.*
2020
import io.netty.channel.*
2121
import io.netty.channel.socket.*
22+
import io.netty.handler.codec.*
23+
import io.netty.handler.ssl.*
2224
import io.netty.util.*
2325
import io.rsocket.kotlin.internal.io.*
2426
import io.rsocket.kotlin.transport.*
@@ -44,7 +46,7 @@ internal class NettyTcpConnection(
4446
init {
4547
@OptIn(DelicateCoroutinesApi::class)
4648
launch(start = CoroutineStart.ATOMIC) {
47-
val outboundJob = launch {
49+
val outboundJob = launch(start = CoroutineStart.ATOMIC) {
4850
nonCancellable {
4951
try {
5052
while (true) {
@@ -86,8 +88,8 @@ internal class NettyTcpConnection(
8688
cancel("exceptionCaught", cause)
8789
}
8890

89-
override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) {
90-
if (evt is ChannelInputShutdownEvent) inbound.close()
91+
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any?) {
92+
if (evt === ChannelInputShutdownEvent.INSTANCE) inbound.close()
9193
super.userEventTriggered(ctx, evt)
9294
}
9395

@@ -110,3 +112,43 @@ internal class NettyTcpConnection(
110112
val ATTRIBUTE: AttributeKey<RSocketConnection> = AttributeKey.newInstance<RSocketConnection>("rsocket-tcp-connection")
111113
}
112114
}
115+
116+
@OptIn(RSocketTransportApi::class)
117+
internal class NettyTcpConnectionInitializer(
118+
private val parentContext: CoroutineContext,
119+
private val sslContext: SslContext?,
120+
private val onConnection: ((RSocketConnection) -> Unit)?,
121+
) : ChannelInitializer<DuplexChannel>() {
122+
override fun initChannel(channel: DuplexChannel) {
123+
channel.config().isAutoRead = false
124+
125+
val connection = NettyTcpConnection(parentContext, channel)
126+
channel.attr(NettyTcpConnection.ATTRIBUTE).set(connection)
127+
128+
if (sslContext != null) {
129+
channel.pipeline().addLast("ssl", sslContext.newHandler(channel.alloc()))
130+
}
131+
channel.pipeline().addLast(
132+
"rsocket-length-encoder",
133+
LengthFieldPrepender(
134+
/* lengthFieldLength = */ 3
135+
)
136+
)
137+
channel.pipeline().addLast(
138+
"rsocket-length-decoder",
139+
LengthFieldBasedFrameDecoder(
140+
/* maxFrameLength = */ Int.MAX_VALUE,
141+
/* lengthFieldOffset = */ 0,
142+
/* lengthFieldLength = */ 3,
143+
/* lengthAdjustment = */ 0,
144+
/* initialBytesToStrip = */ 3
145+
)
146+
)
147+
channel.pipeline().addLast(
148+
"rsocket-connection",
149+
connection
150+
)
151+
152+
onConnection?.invoke(connection)
153+
}
154+
}

rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpConnectionChannelInitializer.kt

Lines changed: 0 additions & 62 deletions
This file was deleted.

rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport.kt

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import io.netty.bootstrap.*
2020
import io.netty.channel.*
2121
import io.netty.channel.ChannelFactory
2222
import io.netty.channel.nio.*
23-
import io.netty.channel.socket.*
2423
import io.netty.channel.socket.nio.*
2524
import io.netty.handler.ssl.*
2625
import io.rsocket.kotlin.internal.io.*
@@ -111,7 +110,7 @@ private class NettyTcpServerTransportBuilderImpl : NettyTcpServerTransportBuilde
111110
}
112111

113112
return NettyTcpServerTransportImpl(
114-
coroutineContext = context.supervisorContext(),
113+
coroutineContext = context.supervisorContext() + Dispatchers.Default,
115114
bootstrap = bootstrap,
116115
sslContext = sslContext,
117116
).also {
@@ -149,8 +148,8 @@ private class NettyTcpServerTargetImpl(
149148

150149
val instanceContext = coroutineContext.childContext()
151150
val channel = try {
152-
val handler = NettyTcpConnectionServerInitializer(
153-
coroutineContext = instanceContext.supervisorContext(),
151+
val handler = NettyTcpConnectionInitializer(
152+
parentContext = instanceContext.supervisorContext(),
154153
sslContext = sslContext,
155154
onConnection = onConnection,
156155
)
@@ -187,15 +186,3 @@ private class NettyTcpServerInstanceImpl(
187186
}
188187
}
189188
}
190-
191-
@RSocketTransportApi
192-
private class NettyTcpConnectionServerInitializer(
193-
coroutineContext: CoroutineContext,
194-
sslContext: SslContext?,
195-
private val onConnection: (RSocketConnection) -> Unit,
196-
) : NettyTcpConnectionChannelInitializer(coroutineContext, sslContext) {
197-
override fun initChannel(ch: DuplexChannel) {
198-
super.initChannel(ch)
199-
onConnection(ch.attr(NettyTcpConnection.ATTRIBUTE).get())
200-
}
201-
}

0 commit comments

Comments
 (0)