1818package com .velocitypowered .proxy .network ;
1919
2020import com .google .common .base .Preconditions ;
21+ import com .google .common .collect .HashMultimap ;
22+ import com .google .common .collect .Multimap ;
2123import com .velocitypowered .api .event .proxy .ListenerBoundEvent ;
2224import com .velocitypowered .api .event .proxy .ListenerCloseEvent ;
2325import com .velocitypowered .api .network .ListenerType ;
2830import io .netty .bootstrap .Bootstrap ;
2931import io .netty .bootstrap .ServerBootstrap ;
3032import io .netty .channel .Channel ;
33+ import io .netty .channel .ChannelFuture ;
3134import io .netty .channel .ChannelFutureListener ;
3235import io .netty .channel .ChannelOption ;
3336import io .netty .channel .EventLoopGroup ;
3437import io .netty .channel .WriteBufferWaterMark ;
38+ import io .netty .channel .unix .UnixChannelOption ;
3539import io .netty .util .concurrent .GlobalEventExecutor ;
40+ import io .netty .util .concurrent .MultithreadEventExecutorGroup ;
3641import java .net .InetSocketAddress ;
3742import java .net .http .HttpClient ;
38- import java .util .HashMap ;
43+ import java .util .Collection ;
3944import java .util .Map ;
4045import org .apache .logging .log4j .LogManager ;
4146import org .apache .logging .log4j .Logger ;
@@ -49,7 +54,7 @@ public final class ConnectionManager {
4954 private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark (1 << 20 ,
5055 1 << 21 );
5156 private static final Logger LOGGER = LogManager .getLogger (ConnectionManager .class );
52- private final Map <InetSocketAddress , Endpoint > endpoints = new HashMap <> ();
57+ private final Multimap <InetSocketAddress , Endpoint > endpoints = HashMultimap . create ();
5358 private final TransportType transportType ;
5459 private final EventLoopGroup bossGroup ;
5560 private final EventLoopGroup workerGroup ;
@@ -93,7 +98,6 @@ public void logChannelInformation() {
9398 public void bind (final InetSocketAddress address ) {
9499 final ServerBootstrap bootstrap = new ServerBootstrap ()
95100 .channelFactory (this .transportType .serverSocketChannelFactory )
96- .group (this .bossGroup , this .workerGroup )
97101 .childOption (ChannelOption .WRITE_BUFFER_WATER_MARK , SERVER_WRITE_MARK )
98102 .childHandler (this .serverChannelInitializer .get ())
99103 .childOption (ChannelOption .TCP_NODELAY , true )
@@ -104,26 +108,50 @@ public void bind(final InetSocketAddress address) {
104108 bootstrap .option (ChannelOption .TCP_FASTOPEN , 3 );
105109 }
106110
107- bootstrap .bind ()
108- .addListener ((ChannelFutureListener ) future -> {
109- final Channel channel = future .channel ();
110- if (future .isSuccess ()) {
111- this .endpoints .put (address , new Endpoint (channel , ListenerType .MINECRAFT ));
112-
113- // Warn people with console access that HAProxy is in use, see PR: #1436
114- if (this .server .getConfiguration ().isProxyProtocol ()) {
115- LOGGER .warn ("Using HAProxy and listening on {}, please ensure this listener is adequately firewalled." , channel .localAddress ());
116- }
111+ if (server .getConfiguration ().isEnableReusePort ()) {
112+ // We don't need a boss group, since each worker will bind to the socket
113+ bootstrap .option (UnixChannelOption .SO_REUSEPORT , true )
114+ .group (this .workerGroup );
115+ } else {
116+ bootstrap .group (this .bossGroup , this .workerGroup );
117+ }
117118
118- LOGGER .info ("Listening on {}" , channel .localAddress ());
119+ final int binds = server .getConfiguration ().isEnableReusePort ()
120+ ? ((MultithreadEventExecutorGroup ) this .workerGroup ).executorCount () : 1 ;
119121
120- // Fire the proxy bound event after the socket is bound
121- server .getEventManager ().fireAndForget (
122- new ListenerBoundEvent (address , ListenerType .MINECRAFT ));
123- } else {
124- LOGGER .error ("Can't bind to {}" , address , future .cause ());
125- }
126- });
122+ for (int bind = 0 ; bind < binds ; bind ++) {
123+ // Wait for each bind to open. If we encounter any errors, don't try to bind again.
124+ int finalBind = bind ;
125+ ChannelFuture f = bootstrap .bind ()
126+ .addListener ((ChannelFutureListener ) future -> {
127+ final Channel channel = future .channel ();
128+ if (future .isSuccess ()) {
129+ this .endpoints .put (address , new Endpoint (channel , ListenerType .MINECRAFT ));
130+
131+ LOGGER .info ("Listening on {}" , channel .localAddress ());
132+
133+ if (finalBind == 0 ) {
134+ // Warn people with console access that HAProxy is in use, see PR: #1436
135+ if (this .server .getConfiguration ().isProxyProtocol ()) {
136+ LOGGER .warn (
137+ "Using HAProxy and listening on {}, please ensure this listener is adequately firewalled." ,
138+ channel .localAddress ());
139+ }
140+
141+ // Fire the proxy bound event after the socket is bound
142+ server .getEventManager ().fireAndForget (
143+ new ListenerBoundEvent (address , ListenerType .MINECRAFT ));
144+ }
145+ } else {
146+ LOGGER .error ("Can't bind to {}" , address , future .cause ());
147+ }
148+ });
149+ f .syncUninterruptibly ();
150+
151+ if (!f .isSuccess ()) {
152+ break ;
153+ }
154+ }
127155 }
128156
129157 /**
@@ -181,17 +209,20 @@ public Bootstrap createWorker(@Nullable EventLoopGroup group) {
181209 * @param oldBind the endpoint to close
182210 */
183211 public void close (InetSocketAddress oldBind ) {
184- Endpoint endpoint = endpoints .remove (oldBind );
212+ Collection <Endpoint > endpoints = this .endpoints .removeAll (oldBind );
213+ Preconditions .checkState (!endpoints .isEmpty (), "Endpoint was not registered" );
214+
215+ ListenerType type = endpoints .iterator ().next ().getType ();
185216
186217 // Fire proxy close event to notify plugins of socket close. We block since plugins
187218 // should have a chance to be notified before the server stops accepting connections.
188- server .getEventManager ().fire (new ListenerCloseEvent (oldBind , endpoint .getType ())).join ();
189-
190- Channel serverChannel = endpoint .getChannel ();
219+ server .getEventManager ().fire (new ListenerCloseEvent (oldBind , type )).join ();
191220
192- Preconditions .checkState (serverChannel != null , "Endpoint %s not registered" , oldBind );
193- LOGGER .info ("Closing endpoint {}" , serverChannel .localAddress ());
194- serverChannel .close ().syncUninterruptibly ();
221+ for (Endpoint endpoint : endpoints ) {
222+ Channel serverChannel = endpoint .getChannel ();
223+ LOGGER .info ("Closing endpoint {}" , serverChannel .localAddress ());
224+ serverChannel .close ().syncUninterruptibly ();
225+ }
195226 }
196227
197228 /**
@@ -200,24 +231,28 @@ public void close(InetSocketAddress oldBind) {
200231 * @param interrupt should closing forward interruptions
201232 */
202233 public void closeEndpoints (boolean interrupt ) {
203- for (final Map .Entry <InetSocketAddress , Endpoint > entry : this .endpoints .entrySet ()) {
234+ for (final Map .Entry <InetSocketAddress , Collection <Endpoint >> entry : this .endpoints .asMap ()
235+ .entrySet ()) {
204236 final InetSocketAddress address = entry .getKey ();
205- final Endpoint endpoint = entry .getValue ();
237+ final Collection <Endpoint > endpoints = entry .getValue ();
238+ ListenerType type = endpoints .iterator ().next ().getType ();
206239
207240 // Fire proxy close event to notify plugins of socket close. We block since plugins
208241 // should have a chance to be notified before the server stops accepting connections.
209- server .getEventManager ().fire (new ListenerCloseEvent (address , endpoint .getType ())).join ();
210-
211- LOGGER .info ("Closing endpoint {}" , address );
212- if (interrupt ) {
213- try {
214- endpoint .getChannel ().close ().sync ();
215- } catch (final InterruptedException e ) {
216- LOGGER .info ("Interrupted whilst closing endpoint" , e );
217- Thread .currentThread ().interrupt ();
242+ server .getEventManager ().fire (new ListenerCloseEvent (address , type )).join ();
243+
244+ for (Endpoint endpoint : endpoints ) {
245+ LOGGER .info ("Closing endpoint {}" , address );
246+ if (interrupt ) {
247+ try {
248+ endpoint .getChannel ().close ().sync ();
249+ } catch (final InterruptedException e ) {
250+ LOGGER .info ("Interrupted whilst closing endpoint" , e );
251+ Thread .currentThread ().interrupt ();
252+ }
253+ } else {
254+ endpoint .getChannel ().close ().syncUninterruptibly ();
218255 }
219- } else {
220- endpoint .getChannel ().close ().syncUninterruptibly ();
221256 }
222257 }
223258 this .endpoints .clear ();
0 commit comments