Skip to content

Commit 329aee1

Browse files
committed
Do not create heartbeat executor when using Netty
1 parent 1517d3e commit 329aee1

File tree

4 files changed

+25
-8
lines changed

4 files changed

+25
-8
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public class MulticastParams {
139139
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
140140

141141
private PrintStream out = System.out;
142+
private boolean netty = false;
142143

143144
public void setExchangeType(String exchangeType) {
144145
this.exchangeType = exchangeType;
@@ -773,6 +774,14 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
773774
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
774775
}
775776

777+
void setNetty(boolean netty) {
778+
this.netty = netty;
779+
}
780+
781+
boolean netty() {
782+
return this.netty;
783+
}
784+
776785
/**
777786
* Contract to handle the creation and configuration of resources. E.g. creation of queues,
778787
* binding exchange to queues.

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,15 @@ public void run(boolean announceStartup)
236236
: params.getServersUpLimit(),
237237
uris,
238238
factory)) {
239-
// TODO do not set a heartbeat executor if Netty is used
240-
ScheduledExecutorService heartbeatSenderExecutorService =
241-
this.threadingHandler.scheduledExecutorService(
242-
"perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
243-
factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
239+
// heartbeat sender executor not necessary with Netty
240+
if (!params.netty()) {
241+
ScheduledExecutorService heartbeatSenderExecutorService =
242+
this.threadingHandler.scheduledExecutorService(
243+
"perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads());
244+
factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
245+
shutdownService.wrap(heartbeatSenderExecutorService::shutdownNow);
246+
}
247+
244248
// use a single-threaded executor for the configuration connection
245249
// this way, a default one is not created and this one will shut down
246250
// when the run ends.

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ && useDefaultSslContext(cmd, System.getProperties())) {
174174
factory.setRequestedFrameMax(frameMax);
175175
factory.setRequestedHeartbeat(heartbeat);
176176

177-
if (!netty(cmd)) {
177+
boolean netty = netty(cmd);
178+
if (netty) {
178179
factory.useBlockingIo();
179180
}
180181

@@ -241,7 +242,7 @@ && useDefaultSslContext(cmd, System.getProperties())) {
241242
nioParams(factory).setSslEngineConfigurator(Utils.sslEngineConfigurator(cmd));
242243
}
243244

244-
if (netty(cmd)) {
245+
if (netty) {
245246
factory.netty().channelCustomizer(Utils.channelCustomizer(cmd));
246247
if (factory.isSSL()) {
247248
factory.netty().sslContext(Utils.nettySslContext(sslContext));
@@ -682,6 +683,8 @@ static MulticastParams multicastParams(
682683
functionalLogger = new DefaultFunctionalLogger(perfTestOptions.consoleOut, verboseFull);
683684
}
684685

686+
boolean netty = netty(cmd);
687+
685688
MulticastParams p = new MulticastParams();
686689
p.setAutoAck(autoAck);
687690
p.setAutoDelete(autoDelete);
@@ -748,6 +751,7 @@ static MulticastParams multicastParams(
748751
p.setRateLimiterFactory(rateLimiterFactory);
749752
p.setFunctionalLogger(functionalLogger);
750753
p.setOut(consoleOut);
754+
p.setNetty(netty);
751755
return p;
752756
}
753757

src/test/java/com/rabbitmq/perf/it/PertTestIT.java renamed to src/test/java/com/rabbitmq/perf/it/PerfTestIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.junit.jupiter.params.ParameterizedTest;
3737
import org.junit.jupiter.params.provider.EnumSource;
3838

39-
public class PertTestIT {
39+
public class PerfTestIT {
4040

4141
static ExecutorService executor = Executors.newSingleThreadExecutor();
4242
volatile ByteArrayOutputStream out, err;

0 commit comments

Comments
 (0)