Skip to content

Commit 99cbfb6

Browse files
committed
Test recovery with Netty
1 parent 329aee1 commit 99cbfb6

File tree

2 files changed

+10
-25
lines changed

2 files changed

+10
-25
lines changed

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import com.rabbitmq.client.impl.CredentialsRefreshService;
3333
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder;
3434
import com.rabbitmq.client.impl.DefaultExceptionHandler;
35-
import com.rabbitmq.client.impl.nio.NioParams;
3635
import com.rabbitmq.perf.Metrics.ConfigurationContext;
3736
import com.rabbitmq.perf.Utils.GsonOAuth2ClientCredentialsGrantCredentialsProvider;
3837
import com.rabbitmq.perf.metrics.CompositeMetricsFormatter;
@@ -797,7 +796,8 @@ private static ConnectionFactory configureNioIfRequested(
797796
int nbThreads = Utils.intArg(cmd, "niot", -1);
798797
int executorSize = Utils.intArg(cmd, "niotp", -1);
799798
if (nbThreads > 0 || executorSize > 0) {
800-
NioParams nioParams = new NioParams();
799+
com.rabbitmq.client.impl.nio.NioParams nioParams =
800+
new com.rabbitmq.client.impl.nio.NioParams();
801801
int[] nbThreadsAndExecutorSize = getNioNbThreadsAndExecutorSize(nbThreads, executorSize);
802802
nioParams.setNbIoThreads(nbThreadsAndExecutorSize[0]);
803803
// FIXME we cannot limit the max size of the thread pool because of
@@ -847,7 +847,7 @@ protected static int[] getNioNbThreadsAndExecutorSize(
847847
}
848848

849849
@SuppressWarnings("deprecation")
850-
private static NioParams nioParams(ConnectionFactory cf) {
850+
private static com.rabbitmq.client.impl.nio.NioParams nioParams(ConnectionFactory cf) {
851851
return cf.getNioParams();
852852
}
853853

src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@
2929
import com.rabbitmq.client.Channel;
3030
import com.rabbitmq.client.Connection;
3131
import com.rabbitmq.client.ConnectionFactory;
32-
import com.rabbitmq.client.impl.nio.NioParams;
3332
import com.rabbitmq.perf.MulticastParams;
3433
import com.rabbitmq.perf.MulticastSet;
35-
import com.rabbitmq.perf.NamedThreadFactory;
3634
import com.rabbitmq.perf.PerformanceMetricsAdapter;
3735
import com.rabbitmq.perf.metrics.PerformanceMetrics;
3836
import java.io.IOException;
@@ -99,18 +97,17 @@ public Duration interval() {
9997
};
10098

10199
static Stream<Arguments> configurationArguments() {
102-
return Stream.of(blockingIoAndNio(multicastParamsConfigurers()));
100+
return Stream.of(blockingIoAndNetty(multicastParamsConfigurers()));
103101
}
104102

105-
static Arguments[] blockingIoAndNio(List<Consumer<MulticastParams>> multicastParamsConfigurers) {
103+
static Arguments[] blockingIoAndNetty(
104+
List<Consumer<MulticastParams>> multicastParamsConfigurers) {
106105
List<Arguments> arguments = new ArrayList<>();
107106
for (Consumer<MulticastParams> configurer : multicastParamsConfigurers) {
108107
arguments.add(
109108
Arguments.of(
110109
configurer, namedConsumer("blocking IO", (Consumer<ConnectionFactory>) cf -> {})));
111-
arguments.add(
112-
Arguments.of(
113-
configurer, namedConsumer("NIO", (Consumer<ConnectionFactory>) cf -> cf.useNio())));
110+
arguments.add(Arguments.of(configurer, namedConsumer("Netty", ConnectionFactory::useNetty)));
114111
}
115112

116113
return arguments.toArray(new Arguments[0]);
@@ -146,7 +143,7 @@ static Stream<Arguments> configurationArgumentsForSeveralUris() {
146143
namedConsumer("one server-named queue", empty()),
147144
namedConsumer("several queues", severalQueues()),
148145
namedConsumer("queue sequence", queueSequence()))
149-
.map(configurer -> Arguments.of(configurer));
146+
.map(Arguments::of);
150147
}
151148

152149
static Consumer<MulticastParams> empty() {
@@ -299,23 +296,11 @@ public void shouldRecoverWhenConnectionsAreKilledAndUsingPublishingInterval(
299296
}
300297

301298
@Test
302-
public void shouldRecoverWithNio(TestInfo info) throws Exception {
299+
public void shouldRecoverWithNetty(TestInfo info) throws Exception {
303300
params.setQueueNames(Arrays.asList("one", "two", "three"));
304301
params.setProducerCount(10);
305302
params.setConsumerCount(10);
306-
cf.useNio();
307-
cf.setNioParams(
308-
new NioParams()
309-
.setNbIoThreads(10)
310-
// see PerfTest#configureNioIfRequested
311-
.setNioExecutor(
312-
new ThreadPoolExecutor(
313-
10,
314-
params.getProducerCount() + params.getConsumerCount() + 5,
315-
30L,
316-
TimeUnit.SECONDS,
317-
new SynchronousQueue<>(),
318-
new NamedThreadFactory("perf-test-nio-"))));
303+
cf.useNetty();
319304
int producerConsumerCount = params.getProducerCount();
320305
MulticastSet set =
321306
new MulticastSet(performanceMetrics, cf, params, "", URIS, latchCompletionHandler(1, info));

0 commit comments

Comments
 (0)