Skip to content

Commit 137eecc

Browse files
committed
Add --netty-kqueue option
1 parent 53a9258 commit 137eecc

File tree

4 files changed

+106
-7
lines changed

4 files changed

+106
-7
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@
176176
<version>${netty.version}</version>
177177
<classifier>linux-x86_64</classifier>
178178
</dependency>
179+
<dependency>
180+
<groupId>io.netty</groupId>
181+
<artifactId>netty-transport-native-kqueue</artifactId>
182+
<version>${netty.version}</version>
183+
<classifier>osx-aarch_64</classifier>
184+
</dependency>
179185

180186
<dependency>
181187
<groupId>org.junit.jupiter</groupId>

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.rabbitmq.client.ConnectionFactory.computeDefaultTlsProtocol;
1919
import static com.rabbitmq.perf.OptionsUtils.forEach;
20+
import static com.rabbitmq.perf.Tuples.pair;
2021
import static com.rabbitmq.perf.Utils.strArg;
2122
import static java.lang.String.format;
2223
import static java.util.Arrays.asList;
@@ -33,6 +34,7 @@
3334
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder;
3435
import com.rabbitmq.client.impl.DefaultExceptionHandler;
3536
import com.rabbitmq.perf.Metrics.ConfigurationContext;
37+
import com.rabbitmq.perf.Tuples.Pair;
3638
import com.rabbitmq.perf.Utils.GsonOAuth2ClientCredentialsGrantCredentialsProvider;
3739
import com.rabbitmq.perf.metrics.CompositeMetricsFormatter;
3840
import com.rabbitmq.perf.metrics.CsvMetricsFormatter;
@@ -42,12 +44,18 @@
4244
import com.rabbitmq.perf.metrics.MetricsFormatterFactory.Context;
4345
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
4446
import io.netty.bootstrap.Bootstrap;
47+
import io.netty.channel.Channel;
4548
import io.netty.channel.EventLoopGroup;
4649
import io.netty.channel.IoHandlerFactory;
4750
import io.netty.channel.MultiThreadIoEventLoopGroup;
51+
import io.netty.channel.epoll.Epoll;
4852
import io.netty.channel.epoll.EpollIoHandler;
4953
import io.netty.channel.epoll.EpollSocketChannel;
54+
import io.netty.channel.kqueue.KQueue;
55+
import io.netty.channel.kqueue.KQueueIoHandler;
56+
import io.netty.channel.kqueue.KQueueSocketChannel;
5057
import io.netty.channel.nio.NioIoHandler;
58+
import io.netty.channel.socket.nio.NioSocketChannel;
5159
import java.io.*;
5260
import java.math.BigDecimal;
5361
import java.nio.charset.Charset;
@@ -63,6 +71,7 @@
6371
import java.util.function.BooleanSupplier;
6472
import java.util.function.Consumer;
6573
import java.util.function.Function;
74+
import java.util.function.Supplier;
6675
import java.util.stream.Collectors;
6776
import javax.net.ssl.SSLContext;
6877
import javax.net.ssl.TrustManager;
@@ -860,10 +869,10 @@ private static void configureNettyIfRequested(
860869
if (netty(cmd)) {
861870
int nbThreads = Utils.intArg(cmd, "ntyt", -1);
862871
boolean epoll = hasOption(cmd, "ntyep");
863-
IoHandlerFactory ioHandlerFactory =
864-
epoll ? EpollIoHandler.newFactory() : NioIoHandler.newFactory();
865-
Consumer<Bootstrap> bootstrapCustomizer =
866-
epoll ? b -> b.channel(EpollSocketChannel.class) : b -> {};
872+
boolean kqueue = hasOption(cmd, "ntykq");
873+
Pair<IoHandlerFactory, Class<? extends Channel>> io = nettyIo(epoll, kqueue);
874+
IoHandlerFactory ioHandlerFactory = io.v1();
875+
Consumer<Bootstrap> bootstrapCustomizer = b -> b.channel(io.v2());
867876
EventLoopGroup eventLoopGroup =
868877
nbThreads > 0
869878
? new MultiThreadIoEventLoopGroup(nbThreads, ioHandlerFactory)
@@ -873,11 +882,32 @@ private static void configureNettyIfRequested(
873882
}
874883
}
875884

885+
private static Pair<IoHandlerFactory, Class<? extends Channel>> nettyIo(
886+
boolean epoll, boolean kqueue) {
887+
Supplier<Pair<IoHandlerFactory, Class<? extends Channel>>> result =
888+
() -> pair(NioIoHandler.newFactory(), NioSocketChannel.class);
889+
if (epoll) {
890+
if (Epoll.isAvailable()) {
891+
result = () -> pair(EpollIoHandler.newFactory(), EpollSocketChannel.class);
892+
} else {
893+
LOGGER.warn("epoll not available, using Java NIO instead");
894+
}
895+
} else if (kqueue) {
896+
if (KQueue.isAvailable()) {
897+
result = () -> pair(KQueueIoHandler.newFactory(), KQueueSocketChannel.class);
898+
} else {
899+
LOGGER.warn("kqueue not available, using Java NIO instead");
900+
}
901+
}
902+
return result.get();
903+
}
904+
876905
private static boolean netty(CommandLineProxy cmd) {
877906
boolean netty = hasOption(cmd, "nty");
878907
int nbThreads = Utils.intArg(cmd, "ntyt", -1);
879908
boolean epoll = hasOption(cmd, "ntyep");
880-
return netty || nbThreads > 0 || epoll;
909+
boolean kqueue = hasOption(cmd, "ntykq");
910+
return netty || nbThreads > 0 || epoll || kqueue;
881911
}
882912

883913
static MulticastSet.CompletionHandler getCompletionHandler(
@@ -1460,7 +1490,12 @@ static Options getOptions() {
14601490
"netty-epoll",
14611491
false,
14621492
"use Netty's native epoll transport (Linux x86-64 only)"));
1463-
1493+
options.addOption(
1494+
new Option(
1495+
"ntykq",
1496+
"netty-kqueue",
1497+
false,
1498+
"use Netty's native kqueue transport (macOS aarch64 only)"));
14641499
return options;
14651500
}
14661501

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Java client library, is triple-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
6+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
7+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
8+
// please see LICENSE-APACHE2.
9+
//
10+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
11+
// either express or implied. See the LICENSE file for specific language governing
12+
// rights and limitations of this software.
13+
//
14+
// If you have any questions regarding licensing, please contact us at
15+
16+
package com.rabbitmq.perf;
17+
18+
final class Tuples {
19+
20+
private Tuples() {}
21+
22+
public static <A, B> Pair<A, B> pair(A v1, B v2) {
23+
return new Pair<>(v1, v2);
24+
}
25+
26+
public static class Pair<A, B> {
27+
28+
private final A v1;
29+
private final B v2;
30+
31+
private Pair(A v1, B v2) {
32+
this.v1 = v1;
33+
this.v2 = v2;
34+
}
35+
36+
public A v1() {
37+
return this.v1;
38+
}
39+
40+
public B v2() {
41+
return this.v2;
42+
}
43+
}
44+
}

src/test/java/com/rabbitmq/perf/it/PerfTestIT.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,16 @@ void nativeEpollWorksOnLinux() throws Exception {
118118
assertThat(consoleOutput()).contains("starting", "stopped", "avg");
119119
}
120120

121+
@Test
122+
@EnabledOnOs(OS.MAC)
123+
@EnabledIfSystemProperty(named = "os.arch", matches = "aarch64")
124+
void nativeKqueueWorksOnMacOs() throws Exception {
125+
int messageCount = 1000;
126+
run(builder().pmessages(messageCount).cmessages(messageCount).nettyThreads(1).nettyKqueue());
127+
waitRunEnds();
128+
assertThat(consoleOutput()).contains("starting", "stopped", "avg");
129+
}
130+
121131
private static void waitOneSecond() throws InterruptedException {
122132
wait(Duration.ofSeconds(1));
123133
}
@@ -178,10 +188,14 @@ ArgumentsBuilder useDefaultSslContext() {
178188
return argument("use-default-ssl-context", "true");
179189
}
180190

181-
public ArgumentsBuilder nettyEpoll() {
191+
ArgumentsBuilder nettyEpoll() {
182192
return argument("netty-epoll", "true");
183193
}
184194

195+
ArgumentsBuilder nettyKqueue() {
196+
return argument("netty-kqueue", "true");
197+
}
198+
185199
ArgumentsBuilder rate(int rate) {
186200
return intArgument("rate", rate);
187201
}

0 commit comments

Comments
 (0)