Skip to content

Commit fae2150

Browse files
committed
Add support for Netty
This commit adds support for the Netty-based frame handler that was added in RabbitMQ AMQP 091 Java client 5.27.0. New options: * --netty: to activate Netty with the defaults * --netty-threads: to set the number of threads used in the event loop * --netty-epoll: to use the native epoll support (Linux x86-64) * --netty-kqueue: to use the native kqueue support (macOS aarch64) This change also deprecates the options related to Java NIO. References rabbitmq/rabbitmq-java-client#1663 Fixes #856
1 parent 86c2a83 commit fae2150

File tree

16 files changed

+733
-108
lines changed

16 files changed

+733
-108
lines changed

.github/workflows/test-alphas.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ jobs:
1919
name: Test against ${{ matrix.rabbitmq-image }}
2020
steps:
2121
- uses: actions/checkout@v4
22+
- name: Checkout tls-gen
23+
uses: actions/checkout@v4
24+
with:
25+
repository: rabbitmq/tls-gen
26+
path: './tls-gen'
2227
- name: Set up JDK
2328
uses: actions/setup-java@v4
2429
with:

.github/workflows/test-pr.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

.github/workflows/test-supported-java-versions.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ jobs:
1818
name: Test against Java ${{ matrix.distribution }} ${{ matrix.version }}
1919
steps:
2020
- uses: actions/checkout@v4
21+
- name: Checkout tls-gen
22+
uses: actions/checkout@v4
23+
with:
24+
repository: rabbitmq/tls-gen
25+
path: './tls-gen'
2126
- name: Set up JDK
2227
uses: actions/setup-java@v4
2328
with:

.github/workflows/test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ jobs:
1212

1313
steps:
1414
- uses: actions/checkout@v4
15+
- name: Checkout tls-gen
16+
uses: actions/checkout@v4
17+
with:
18+
repository: rabbitmq/tls-gen
19+
path: './tls-gen'
1520
- name: Set up JDK
1621
uses: actions/setup-java@v4
1722
with:

ci/start-broker.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,32 @@ wait_for_message() {
1010
done
1111
}
1212

13+
make -C "${PWD}"/tls-gen/basic
14+
15+
mkdir -p rabbitmq-configuration/tls
16+
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
17+
chmod o+r rabbitmq-configuration/tls/*
18+
chmod g+r rabbitmq-configuration/tls/*
19+
20+
echo "loopback_users = none
21+
22+
listeners.ssl.default = 5671
23+
24+
ssl_options.cacertfile = /etc/rabbitmq/tls/ca_certificate.pem
25+
ssl_options.certfile = /etc/rabbitmq/tls/server_$(hostname)_certificate.pem
26+
ssl_options.keyfile = /etc/rabbitmq/tls/server_$(hostname)_key.pem
27+
ssl_options.verify = verify_peer
28+
ssl_options.fail_if_no_peer_cert = false
29+
ssl_options.depth = 1
30+
31+
auth_mechanisms.1 = PLAIN" >> rabbitmq-configuration/rabbitmq.conf
32+
1333
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
1434

1535
docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"
1636
docker run -d --name rabbitmq \
1737
--network host \
38+
-v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \
1839
"${RABBITMQ_IMAGE}"
1940

2041
wait_for_message rabbitmq "completed with"

pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@
5555
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5656

5757
<spotless.check.skip>true</spotless.check.skip>
58-
<rabbitmq.version>5.26.0</rabbitmq.version>
58+
<rabbitmq.version>5.27.0-SNAPSHOT</rabbitmq.version>
5959
<slf4j.version>2.0.17</slf4j.version>
6060
<commons-cli.version>1.10.0</commons-cli.version>
6161
<metrics.version>4.2.33</metrics.version>
6262
<micrometer.version>1.15.2</micrometer.version>
6363
<jgroups.version>5.4.8.Final</jgroups.version>
6464
<jgroups-kubernetes.version>2.0.2.Final</jgroups-kubernetes.version>
65+
<netty.version>4.2.3.Final</netty.version>
6566
<gson.version>2.13.1</gson.version>
6667
<resilience4j.version>2.1.0</resilience4j.version>
6768
<logback.version>1.3.15</logback.version>
@@ -169,6 +170,18 @@
169170
<artifactId>jgroups-kubernetes</artifactId>
170171
<version>${jgroups-kubernetes.version}</version>
171172
</dependency>
173+
<dependency>
174+
<groupId>io.netty</groupId>
175+
<artifactId>netty-transport-native-epoll</artifactId>
176+
<version>${netty.version}</version>
177+
<classifier>linux-x86_64</classifier>
178+
</dependency>
179+
<dependency>
180+
<groupId>io.netty</groupId>
181+
<artifactId>netty-transport-native-kqueue</artifactId>
182+
<version>${netty.version}</version>
183+
<classifier>osx-aarch_64</classifier>
184+
</dependency>
172185

173186
<dependency>
174187
<groupId>org.junit.jupiter</groupId>

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28+
import java.io.PrintStream;
2829
import java.time.Duration;
2930
import java.util.*;
3031
import java.util.concurrent.Callable;
@@ -98,6 +99,7 @@ public class Consumer extends AgentBase implements Runnable {
9899

99100
private final Runnable rateLimiterCallback;
100101
private final boolean rateLimitation;
102+
private final PrintStream out;
101103

102104
public Consumer(ConsumerParameters parameters) {
103105
super(
@@ -124,6 +126,7 @@ public Consumer(ConsumerParameters parameters) {
124126

125127
this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
126128
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
129+
this.out = parameters.getOut();
127130

128131
if (parameters.getConsumerLatenciesIndicator().isVariable()) {
129132
this.consumerLatency =
@@ -372,7 +375,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
372375

373376
@Override
374377
public void handleCancel(String consumerTag) {
375-
System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
378+
out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
376379
epochMessageCount.set(0);
377380
if (consumerTagBranchMap.containsKey(consumerTag)) {
378381
String qName = consumerTagBranchMap.get(consumerTag);
@@ -393,7 +396,7 @@ public void handleCancel(String consumerTag) {
393396
delay.toMillis(),
394397
TimeUnit.MILLISECONDS);
395398
} else {
396-
System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
399+
out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
397400
}
398401
}
399402
}

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.Channel;
1919
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2020
import com.rabbitmq.perf.metrics.PerformanceMetrics;
21+
import java.io.PrintStream;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ExecutorService;
@@ -60,6 +61,16 @@ public class ConsumerParameters {
6061

6162
private int id;
6263
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
64+
private PrintStream out = System.out;
65+
66+
ConsumerParameters setOut(PrintStream out) {
67+
this.out = out;
68+
return this;
69+
}
70+
71+
PrintStream getOut() {
72+
return out;
73+
}
6374

6475
public Channel getChannel() {
6576
return channel;

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2525
import com.rabbitmq.perf.metrics.PerformanceMetrics;
2626
import java.io.IOException;
27+
import java.io.PrintStream;
2728
import java.time.Duration;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
@@ -137,6 +138,9 @@ public class MulticastParams {
137138

138139
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
139140

141+
private PrintStream out = System.out;
142+
private boolean netty = false;
143+
140144
public void setExchangeType(String exchangeType) {
141145
this.exchangeType = exchangeType;
142146
}
@@ -319,6 +323,10 @@ void setConsumerStartDelay(Duration csd) {
319323
this.consumerStartDelay = csd;
320324
}
321325

326+
void setOut(PrintStream out) {
327+
this.out = out;
328+
}
329+
322330
public int getConsumerCount() {
323331
return consumerCount;
324332
}
@@ -485,6 +493,10 @@ public Duration getConsumerStartDelay() {
485493
return consumerStartDelay;
486494
}
487495

496+
PrintStream getOut() {
497+
return out;
498+
}
499+
488500
public void setPolling(boolean polling) {
489501
this.polling = polling;
490502
}
@@ -655,7 +667,8 @@ public Consumer createConsumer(
655667
topologyRecordingScheduledExecutorService)
656668
.setStartListener(this.startListener)
657669
.setRateLimiterFactory(this.rateLimiterFactory)
658-
.setFunctionalLogger(this.functionalLogger));
670+
.setFunctionalLogger(this.functionalLogger)
671+
.setOut(this.out));
659672
this.topologyHandler.next();
660673
return consumer;
661674
}
@@ -761,6 +774,14 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
761774
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
762775
}
763776

777+
void setNetty(boolean netty) {
778+
this.netty = netty;
779+
}
780+
781+
boolean netty() {
782+
return this.netty;
783+
}
784+
764785
/**
765786
* Contract to handle the creation and configuration of resources. E.g. creation of queues,
766787
* binding exchange to queues.

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.perf.metrics.PerformanceMetrics;
2929
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3030
import java.io.IOException;
31+
import java.io.PrintStream;
3132
import java.net.URISyntaxException;
3233
import java.security.KeyManagementException;
3334
import java.security.NoSuchAlgorithmException;
@@ -80,6 +81,7 @@ public class MulticastSet {
8081
private final ConnectionCreator connectionCreator;
8182
private final ExpectedMetrics expectedMetrics;
8283
private final InstanceSynchronization instanceSynchronization;
84+
private final PrintStream out;
8385

8486
public MulticastSet(
8587
PerformanceMetrics performanceMetrics,
@@ -167,6 +169,7 @@ public MulticastSet(
167169
this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation);
168170
this.expectedMetrics = expectedMetrics;
169171
this.instanceSynchronization = instanceSynchronization;
172+
this.out = params.getOut();
170173
}
171174

172175
protected static int nbThreadsForConsumer(MulticastParams params) {
@@ -233,10 +236,17 @@ public void run(boolean announceStartup)
233236
: params.getServersUpLimit(),
234237
uris,
235238
factory)) {
236-
ScheduledExecutorService heartbeatSenderExecutorService =
237-
this.threadingHandler.scheduledExecutorService(
238-
"perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
239-
factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
239+
// heartbeat sender executor not necessary with Netty
240+
if (!params.netty()) {
241+
ScheduledExecutorService heartbeatSenderExecutorService =
242+
this.threadingHandler.scheduledExecutorService(
243+
"perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
244+
factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
245+
if (heartbeatSenderExecutorService != null) {
246+
shutdownService.wrap(heartbeatSenderExecutorService::shutdownNow);
247+
}
248+
}
249+
240250
// use a single-threaded executor for the configuration connection
241251
// this way, a default one is not created and this one will shut down
242252
// when the run ends.
@@ -382,7 +392,7 @@ public void run(boolean announceStartup)
382392

383393
executeShutdownSequence.run();
384394
} else {
385-
System.out.println(
395+
out.println(
386396
"Could not connect to broker(s) in "
387397
+ params.getServersStartUpTimeout()
388398
+ " second(s), exiting.");
@@ -475,7 +485,7 @@ private void createConsumers(
475485
int consumerIndex = 0;
476486
for (int i = 0; i < consumerConnections.length; i++) {
477487
if (announceStartup) {
478-
System.out.println("id: " + testID + ", starting consumer #" + i);
488+
out.println("id: " + testID + ", starting consumer #" + i);
479489
}
480490
ExecutorService executorService = consumersExecutorsFactory.apply(i);
481491
factory.setSharedExecutor(executorService);
@@ -484,7 +494,7 @@ private void createConsumers(
484494
consumerConnections[i] = consumerConnection;
485495
for (int j = 0; j < params.getConsumerChannelCount(); j++) {
486496
if (announceStartup) {
487-
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
497+
out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
488498
}
489499
Consumer consumer =
490500
params.createConsumer(
@@ -507,13 +517,13 @@ private void createProducers(
507517
int producerIndex = 0;
508518
for (int i = 0; i < producerConnections.length; i++) {
509519
if (announceStartup) {
510-
System.out.println("id: " + testID + ", starting producer #" + i);
520+
out.println("id: " + testID + ", starting producer #" + i);
511521
}
512522
Connection producerConnection = createConnection(PRODUCER_THREAD_PREFIX + i);
513523
producerConnections[i] = producerConnection;
514524
for (int j = 0; j < params.getProducerChannelCount(); j++) {
515525
if (announceStartup) {
516-
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
526+
out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
517527
}
518528
AgentState agentState = new AgentState();
519529
agentState.runnable =
@@ -538,7 +548,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
538548
runnable.run();
539549
LOGGER.debug("Consumer runnable started");
540550
if (params.getConsumerSlowStart()) {
541-
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
551+
out.println("Delaying start by 1 second because -S/--slow-start was requested");
542552
Thread.sleep(1000);
543553
}
544554
}
@@ -551,8 +561,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
551561
for (Runnable runnable : consumerRunnables) {
552562
runnable.run();
553563
if (params.getConsumerSlowStart()) {
554-
System.out.println(
555-
"Delaying start by 1 second because -S/--slow-start was requested");
564+
out.println("Delaying start by 1 second because -S/--slow-start was requested");
556565
try {
557566
Thread.sleep(1000);
558567
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)