|
41 | 41 | import com.rabbitmq.perf.metrics.MetricsFormatterFactory; |
42 | 42 | import com.rabbitmq.perf.metrics.MetricsFormatterFactory.Context; |
43 | 43 | import io.micrometer.core.instrument.composite.CompositeMeterRegistry; |
| 44 | +import io.netty.bootstrap.Bootstrap; |
44 | 45 | import io.netty.channel.EventLoopGroup; |
45 | 46 | import io.netty.channel.IoHandlerFactory; |
46 | 47 | import io.netty.channel.MultiThreadIoEventLoopGroup; |
47 | 48 | import io.netty.channel.epoll.EpollIoHandler; |
| 49 | +import io.netty.channel.epoll.EpollSocketChannel; |
48 | 50 | import io.netty.channel.nio.NioIoHandler; |
49 | 51 | import java.io.*; |
50 | 52 | import java.math.BigDecimal; |
|
59 | 61 | import java.util.concurrent.*; |
60 | 62 | import java.util.concurrent.atomic.AtomicBoolean; |
61 | 63 | import java.util.function.BooleanSupplier; |
| 64 | +import java.util.function.Consumer; |
62 | 65 | import java.util.function.Function; |
63 | 66 | import java.util.stream.Collectors; |
64 | 67 | import javax.net.ssl.SSLContext; |
@@ -180,8 +183,8 @@ && useDefaultSslContext(cmd, System.getProperties())) { |
180 | 183 | factory.useBlockingIo(); |
181 | 184 | } |
182 | 185 |
|
183 | | - factory = configureNioIfRequested(cmd, factory, shutdownService); |
184 | | - factory = configureNettyIfRequested(cmd, factory, shutdownService); |
| 186 | + configureNioIfRequested(cmd, factory, shutdownService); |
| 187 | + configureNettyIfRequested(cmd, factory, shutdownService); |
185 | 188 |
|
186 | 189 | String oauth2TokenEndpoint = strArg(cmd, "o2uri", null); |
187 | 190 | if (oauth2TokenEndpoint != null) { |
@@ -793,7 +796,7 @@ private static PrintWriter openCsvFileForWriting( |
793 | 796 | } |
794 | 797 |
|
795 | 798 | @SuppressWarnings("deprecation") |
796 | | - private static ConnectionFactory configureNioIfRequested( |
| 799 | + private static void configureNioIfRequested( |
797 | 800 | CommandLineProxy cmd, ConnectionFactory factory, ShutdownService shutdownService) { |
798 | 801 | int nbThreads = Utils.intArg(cmd, "niot", -1); |
799 | 802 | int executorSize = Utils.intArg(cmd, "niotp", -1); |
@@ -822,7 +825,6 @@ private static ConnectionFactory configureNioIfRequested( |
822 | 825 | factory.useNio(); |
823 | 826 | factory.setNioParams(nioParams); |
824 | 827 | } |
825 | | - return factory; |
826 | 828 | } |
827 | 829 |
|
828 | 830 | protected static int[] getNioNbThreadsAndExecutorSize( |
@@ -853,21 +855,22 @@ private static com.rabbitmq.client.impl.nio.NioParams nioParams(ConnectionFactor |
853 | 855 | return cf.getNioParams(); |
854 | 856 | } |
855 | 857 |
|
856 | | - private static ConnectionFactory configureNettyIfRequested( |
| 858 | + private static void configureNettyIfRequested( |
857 | 859 | CommandLineProxy cmd, ConnectionFactory factory, ShutdownService shutdownService) { |
858 | 860 | if (netty(cmd)) { |
859 | 861 | int nbThreads = Utils.intArg(cmd, "ntyt", -1); |
860 | 862 | boolean epoll = hasOption(cmd, "ntyep"); |
861 | 863 | IoHandlerFactory ioHandlerFactory = |
862 | 864 | epoll ? EpollIoHandler.newFactory() : NioIoHandler.newFactory(); |
| 865 | + Consumer<Bootstrap> bootstrapCustomizer = |
| 866 | + epoll ? b -> b.channel(EpollSocketChannel.class) : b -> {}; |
863 | 867 | EventLoopGroup eventLoopGroup = |
864 | 868 | nbThreads > 0 |
865 | 869 | ? new MultiThreadIoEventLoopGroup(nbThreads, ioHandlerFactory) |
866 | 870 | : new MultiThreadIoEventLoopGroup(ioHandlerFactory); |
867 | 871 | shutdownService.wrap(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)); |
868 | | - factory.netty().eventLoopGroup(eventLoopGroup); |
| 872 | + factory.netty().bootstrapCustomizer(bootstrapCustomizer).eventLoopGroup(eventLoopGroup); |
869 | 873 | } |
870 | | - return factory; |
871 | 874 | } |
872 | 875 |
|
873 | 876 | private static boolean netty(CommandLineProxy cmd) { |
|
0 commit comments