@@ -4,9 +4,7 @@ import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow
44import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket
55import dev.slne.surf.cloud.api.common.netty.packet.RespondingNettyPacket
66import dev.slne.surf.cloud.api.common.util.DefaultUncaughtExceptionHandlerWithName
7- import dev.slne.surf.cloud.api.common.util.mutableObjectListOf
87import dev.slne.surf.cloud.api.common.util.netty.suspend
9- import dev.slne.surf.cloud.api.common.util.synchronize
108import dev.slne.surf.cloud.api.common.util.threadFactory
119import dev.slne.surf.cloud.core.common.config.cloudConfig
1210import dev.slne.surf.cloud.core.common.coroutines.ConnectionManagementScope
@@ -29,13 +27,12 @@ import io.netty.handler.flush.FlushConsolidationHandler
2927import io.netty.handler.timeout.ReadTimeoutHandler
3028import kotlinx.coroutines.Dispatchers
3129import kotlinx.coroutines.launch
32- import kotlinx.coroutines.sync.Mutex
33- import kotlinx.coroutines.sync.withLock
3430import kotlinx.coroutines.withContext
3531import java.net.InetAddress
3632import java.net.InetSocketAddress
3733import java.net.SocketAddress
3834import java.util.concurrent.ConcurrentLinkedQueue
35+ import java.util.concurrent.CopyOnWriteArrayList
3936
4037class ServerConnectionListener (val server : NettyServerImpl ) {
4138
@@ -45,87 +42,78 @@ class ServerConnectionListener(val server: NettyServerImpl) {
4542 var running = false
4643 private set
4744
48- private val channels = mutableObjectListOf<ChannelFuture >().synchronize()
49- private val channelsMutex = Mutex ()
50-
51- val connections = mutableObjectListOf<ConnectionImpl >().synchronize()
52- val connectionsMutex = Mutex ()
53-
45+ private val channels = CopyOnWriteArrayList <ChannelFuture >()
46+ val connections = CopyOnWriteArrayList <ConnectionImpl >()
5447 private val pending = ConcurrentLinkedQueue <ConnectionImpl >()
5548
5649 init {
5750 running = true
5851 }
5952
60-
6153 suspend fun startTcpServerListener (address : InetAddress ? , port : Int ) {
6254 bind(InetSocketAddress (address, port))
6355 }
6456
6557 suspend fun bind (address : SocketAddress ) = withContext(Dispatchers .IO ) {
66- channelsMutex.withLock {
67- val channelClass: Class <out ServerChannel >
68- val eventloopgroup: EventLoopGroup
69-
70- if (Epoll .isAvailable() && cloudConfig.connectionConfig.nettyConfig.useEpoll) {
71- channelClass =
72- if (address is DomainSocketAddress ) EpollServerDomainSocketChannel ::class .java else EpollServerSocketChannel ::class .java
73- eventloopgroup = SERVER_EPOLL_EVENT_GROUP
74- log.atInfo().log(" Using epoll channel type" )
75- } else {
76- channelClass = NioServerSocketChannel ::class .java
77- eventloopgroup = SERVER_EVENT_GROUP
78- log.atInfo().log(" Using default channel type" )
79- }
58+ val channelClass: Class <out ServerChannel >
59+ val eventloopgroup: EventLoopGroup
60+
61+ if (Epoll .isAvailable() && cloudConfig.connectionConfig.nettyConfig.useEpoll) {
62+ channelClass =
63+ if (address is DomainSocketAddress ) EpollServerDomainSocketChannel ::class .java else EpollServerSocketChannel ::class .java
64+ eventloopgroup = SERVER_EPOLL_EVENT_GROUP
65+ log.atInfo().log(" Using epoll channel type" )
66+ } else {
67+ channelClass = NioServerSocketChannel ::class .java
68+ eventloopgroup = SERVER_EVENT_GROUP
69+ log.atInfo().log(" Using default channel type" )
70+ }
8071
81- channels.add(
82- ServerBootstrap ()
83- .channel(channelClass)
84- .group(eventloopgroup)
85- .localAddress(address)
86- .option(ChannelOption .AUTO_READ , false )
87- .childHandler(object : ChannelInitializer <Channel >() {
88- override fun initChannel (channel : Channel ) {
89- runCatching {
90- channel.config().setOption(ChannelOption .TCP_NODELAY , true )
91- }
92-
93- val pipeline = channel.pipeline()
94- .addFirst(FlushConsolidationHandler ())
95- .addLast(HandlerNames .TIMEOUT , ReadTimeoutHandler (30 ))
96-
97- ConnectionImpl .configureSerialization(
98- pipeline,
99- PacketFlow .SERVERBOUND ,
100- false
101- )
102-
103- val connection =
104- ConnectionImpl (PacketFlow .SERVERBOUND , EncryptionManager .instance)
105-
106- pending.add(connection)
107- connection.configurePacketHandler(channel, pipeline)
108-
109- val loggableAddress = connection.getLoggableAddress(logIps)
110- log.atInfo()
111- .log(" Configured packet handler for $loggableAddress " )
112-
113- connection.setListenerForServerboundHandshake(
114- ServerHandshakePacketListenerImpl (server, connection)
115- )
72+ channels.add(
73+ ServerBootstrap ()
74+ .channel(channelClass)
75+ .group(eventloopgroup)
76+ .localAddress(address)
77+ .option(ChannelOption .AUTO_READ , false )
78+ .childHandler(object : ChannelInitializer <Channel >() {
79+ override fun initChannel (channel : Channel ) {
80+ runCatching {
81+ channel.config().setOption(ChannelOption .TCP_NODELAY , true )
11682 }
117- })
118- .bind()
119- .suspend ()
120- )
121- }
83+
84+ val pipeline = channel.pipeline()
85+ .addFirst(FlushConsolidationHandler ())
86+ .addLast(HandlerNames .TIMEOUT , ReadTimeoutHandler (30 ))
87+
88+ ConnectionImpl .configureSerialization(
89+ pipeline,
90+ PacketFlow .SERVERBOUND ,
91+ false
92+ )
93+
94+ val connection =
95+ ConnectionImpl (PacketFlow .SERVERBOUND , EncryptionManager .instance)
96+
97+ pending.add(connection)
98+ connection.configurePacketHandler(channel, pipeline)
99+
100+ val loggableAddress = connection.getLoggableAddress(logIps)
101+ log.atInfo()
102+ .log(" Configured packet handler for $loggableAddress " )
103+
104+ connection.setListenerForServerboundHandshake(
105+ ServerHandshakePacketListenerImpl (server, connection)
106+ )
107+ }
108+ })
109+ .bind()
110+ .suspend ()
111+ )
122112 }
123113
124- suspend fun acceptConnections () {
125- channelsMutex.withLock {
126- for (future in channels) {
127- future.channel().config().isAutoRead = true
128- }
114+ fun acceptConnections () {
115+ for (future in channels) {
116+ future.channel().config().isAutoRead = true
129117 }
130118 }
131119
@@ -149,10 +137,8 @@ class ServerConnectionListener(val server: NettyServerImpl) {
149137 }
150138
151139
152- connectionsMutex.withLock {
153- for (connection in connections) {
154- disconnect(connection)
155- }
140+ for (connection in connections) {
141+ disconnect(connection)
156142 }
157143 connections.clear()
158144
@@ -161,13 +147,11 @@ class ServerConnectionListener(val server: NettyServerImpl) {
161147 }
162148 pending.clear()
163149
164- channelsMutex.withLock {
165- for (future in channels) {
166- try {
167- future.channel().close().suspend ()
168- } catch (e: InterruptedException ) {
169- log.atSevere().withCause(e).log(" Interrupted whilst closing channel" )
170- }
150+ for (future in channels) {
151+ try {
152+ future.channel().close().suspend ()
153+ } catch (e: InterruptedException ) {
154+ log.atSevere().withCause(e).log(" Interrupted whilst closing channel" )
171155 }
172156 }
173157
@@ -176,49 +160,45 @@ class ServerConnectionListener(val server: NettyServerImpl) {
176160 }
177161
178162 private suspend fun addPending () {
179- var connection: ConnectionImpl
180- while ((pending.poll().also { connection = it }) != null ) {
181- connectionsMutex.withLock {
182- connections.add(connection)
183- connection.isPending = false
184- }
163+ while (true ) {
164+ val connection = pending.poll() ? : break
165+ connections.add(connection)
166+ connection.isPending = false
185167 }
186168 }
187169
188170 suspend fun tick () {
189171 addPending()
190172
191- connectionsMutex.withLock {
192- val iterator = connections.iterator()
193- for (connection in iterator) {
194- if (connection.connecting) {
195- continue
196- }
197-
198- if (connection.connected) {
199- try {
200- connection.tick()
201- } catch (e: Exception ) {
202- log.atWarning()
203- .withCause(e)
204- .log(" Failed to handle packet for ${connection.getLoggableAddress(logIps)} " )
205-
206- ConnectionManagementScope .launch {
207- val details = DisconnectionDetails (DisconnectReason .INTERNAL_EXCEPTION , e.message)
208- connection.sendWithIndication(ClientboundDisconnectPacket (details))
209- connection.disconnect(details)
210- }
173+ for (connection in connections) {
174+ if (connection.connecting) {
175+ continue
176+ }
211177
212- connection.setReadOnly()
213- }
214- } else {
215- if (connection.preparing) {
216- continue
178+ if (connection.connected) {
179+ try {
180+ connection.tick()
181+ } catch (e: Exception ) {
182+ log.atWarning()
183+ .withCause(e)
184+ .log(" Failed to handle packet for ${connection.getLoggableAddress(logIps)} " )
185+
186+ ConnectionManagementScope .launch {
187+ val details =
188+ DisconnectionDetails (DisconnectReason .INTERNAL_EXCEPTION , e.message)
189+ connection.sendWithIndication(ClientboundDisconnectPacket (details))
190+ connection.disconnect(details)
217191 }
218192
219- iterator.remove()
220- connection.handleDisconnection()
193+ connection.setReadOnly()
221194 }
195+ } else {
196+ if (connection.preparing) {
197+ continue
198+ }
199+
200+ connections.remove(connection)
201+ connection.handleDisconnection()
222202 }
223203 }
224204 }
0 commit comments