Skip to content

Commit 35c073a

Browse files
committed
Ensure publisher-only process ends when broker fails
[#156568899] Fixes #94
1 parent eaf3f8f commit 35c073a

File tree

4 files changed

+156
-8
lines changed

4 files changed

+156
-8
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,12 @@ public void run(boolean announceStartup)
188188
if(count % params.getProducerChannelCount() == 0) {
189189
// this is the end of a group of threads on the same connection,
190190
// closing the connection
191-
producerConnections[count / params.getProducerChannelCount() - 1].close();
191+
try {
192+
producerConnections[count / params.getProducerChannelCount() - 1].close();
193+
} catch (Exception e) {
194+
// don't do anything, we need to close the other connections
195+
}
196+
192197
}
193198
count++;
194199
}
@@ -197,7 +202,11 @@ public void run(boolean announceStartup)
197202
// when using exclusive queues, some connections can have already been
198203
// closed because of connection re-using, so checking before closing.
199204
if (consumerConnection.isOpen()) {
200-
consumerConnection.close();
205+
try {
206+
consumerConnection.close();
207+
} catch (Exception e) {
208+
// don't do anything, we need to close the other connections
209+
}
201210
}
202211
}
203212

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,10 +270,16 @@ public void run() {
270270
ProducerState state = new ProducerState(this.rateLimit);
271271
state.setLastStatsTime(startTime);
272272
state.setMsgCount(0);
273-
while (keepGoing(state)) {
274-
delay(now, state);
275-
handlePublish(state);
276-
now = System.currentTimeMillis();
273+
try {
274+
while (keepGoing(state)) {
275+
delay(now, state);
276+
handlePublish(state);
277+
now = System.currentTimeMillis();
278+
}
279+
} catch (RuntimeException e) {
280+
// failing, we don't want to block the whole process, so counting down
281+
countDown();
282+
throw e;
277283
}
278284
if (state.getMsgCount() >= msgLimit) {
279285
countDown();
@@ -310,7 +316,13 @@ public int incrementMessageCount() {
310316
state.setLastStatsTime(System.currentTimeMillis());
311317
state.setMsgCount(0);
312318
}
313-
maybeHandlePublish(state);
319+
try {
320+
maybeHandlePublish(state);
321+
} catch (RuntimeException e) {
322+
// failing, we don't want to block the whole process, so counting down
323+
countDown();
324+
throw e;
325+
}
314326
};
315327
}
316328

src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import static org.mockito.Mockito.when;
7070
import static org.mockito.MockitoAnnotations.initMocks;
7171

72-
//@DisabledIfSystemProperty(named = "travis", matches = "true")
72+
7373
public class MessageCountTimeLimitAndPublishingIntervalRateTest {
7474

7575
@Mock
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.perf;
17+
18+
import com.rabbitmq.client.Channel;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.ConnectionFactory;
21+
import com.rabbitmq.client.impl.AMQImpl;
22+
import org.hamcrest.Matchers;
23+
import org.hamcrest.junit.MatcherAssert;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.Arguments;
29+
import org.junit.jupiter.params.provider.MethodSource;
30+
31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.function.Consumer;
38+
import java.util.function.Supplier;
39+
import java.util.stream.Stream;
40+
41+
import static com.rabbitmq.perf.MockUtils.callback;
42+
import static com.rabbitmq.perf.MockUtils.connectionFactoryThatReturns;
43+
import static com.rabbitmq.perf.MockUtils.proxy;
44+
import static java.util.Collections.singletonList;
45+
import static org.hamcrest.Matchers.is;
46+
import static org.hamcrest.junit.MatcherAssert.assertThat;
47+
48+
/**
49+
*
50+
*/
51+
public class PublisherOnlyStopsCorrectlyTest {
52+
53+
MulticastParams params;
54+
55+
Stats stats = new Stats(0) {
56+
@Override
57+
protected void report(long now) {
58+
}
59+
};
60+
61+
ExecutorService executorService;
62+
63+
@BeforeEach public void init() {
64+
params = new MulticastParams();
65+
executorService = Executors.newSingleThreadExecutor();
66+
}
67+
68+
@AfterEach public void tearDown() {
69+
executorService.shutdownNow();
70+
}
71+
72+
static Stream<Arguments> publisherOnlyStopsWhenBrokerCrashesArguments() {
73+
return Stream.of(
74+
// number of messages before throwing exception, configurator, assertion message
75+
Arguments.of(10, (Consumer<MulticastParams>)(params) -> {}, "Sender should have failed and program should stop"),
76+
Arguments.of(2, (Consumer<MulticastParams>)(params) -> params.setPublishingInterval(1), "Sender should have failed and program should stop")
77+
);
78+
}
79+
80+
@ParameterizedTest
81+
@MethodSource("publisherOnlyStopsWhenBrokerCrashesArguments")
82+
public void publisherOnlyStopsWhenBrokerCrashes(
83+
int messageTotal, Consumer<MulticastParams> configurator, String message) throws Exception {
84+
params.setConsumerCount(0);
85+
params.setProducerCount(1);
86+
configurator.accept(params);
87+
88+
AtomicInteger publishedMessages = new AtomicInteger(0);
89+
Channel channel = proxy(Channel.class,
90+
callback("queueDeclare", (proxy, method, args) -> new AMQImpl.Queue.DeclareOk(args[0].toString(), 0, 0)),
91+
callback("basicPublish", (proxy, method, args) -> {
92+
if (publishedMessages.incrementAndGet() > messageTotal) {
93+
throw new RuntimeException("Expected exception, simulating broker crash");
94+
}
95+
return null;
96+
})
97+
);
98+
99+
Supplier<Connection> connectionSupplier = () -> proxy(Connection.class,
100+
callback("createChannel", (proxy, method, args) -> channel),
101+
callback("isOpen", (proxy, method, args) -> true)
102+
);
103+
104+
ConnectionFactory connectionFactory = connectionFactoryThatReturns(connectionSupplier);
105+
106+
MulticastSet set = getMulticastSet(connectionFactory);
107+
108+
CountDownLatch latch = new CountDownLatch(1);
109+
executorService.submit((Callable<Void>) () -> {
110+
set.run();
111+
latch.countDown();
112+
return null;
113+
});
114+
assertThat(message, latch.await(10, TimeUnit.SECONDS), is(true));
115+
116+
}
117+
118+
private MulticastSet getMulticastSet(ConnectionFactory connectionFactory) {
119+
MulticastSet set = new MulticastSet(
120+
stats, connectionFactory, params, singletonList("amqp://localhost"),
121+
PerfTest.getCompletionHandler(params)
122+
);
123+
set.setThreadingHandler(new MulticastSet.DefaultThreadingHandler());
124+
return set;
125+
}
126+
127+
}

0 commit comments

Comments
 (0)