Skip to content

Commit aa83867

Browse files
committed
Handle connection and topology recovery
Use Java client connection recovery but uses specific topology recovery. Let each connection (configuration, producer, consumer) recovers the resources it needs. [#159130745] Fixes #106
1 parent 495c608 commit aa83867

File tree

14 files changed

+724
-206
lines changed

14 files changed

+724
-206
lines changed

src/main/java/com/rabbitmq/perf/AgentBase.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717

1818
import com.rabbitmq.client.MissedHeartbeatException;
1919
import com.rabbitmq.client.ShutdownSignalException;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
import java.io.IOException;
22-
import java.util.function.BooleanSupplier;
2324
import java.util.function.Predicate;
2425

2526
/**
2627
*
2728
*/
2829
public abstract class AgentBase {
2930

31+
private static final Logger LOGGER = LoggerFactory.getLogger(AgentBase.class);
32+
3033
// FIXME this is the condition to start connection recovery
3134
// ensure it's the appropriate condition and get it from the Java client code
3235
static final Predicate<ShutdownSignalException> CONNECTION_RECOVERY_TRIGGERED =
@@ -54,25 +57,38 @@ protected boolean isConnectionRecoveryTriggered(ShutdownSignalException e) {
5457
return CONNECTION_RECOVERY_TRIGGERED.test(e);
5558
}
5659

57-
protected void handleShutdownSignalExceptionOnWrite(BooleanSupplier showStopper, ShutdownSignalException e) {
58-
if (shouldStop(showStopper, e)) {
60+
protected void handleShutdownSignalExceptionOnWrite(Recovery.RecoveryProcess recoveryProcess, ShutdownSignalException e) {
61+
if (LOGGER.isDebugEnabled()) {
62+
LOGGER.debug(
63+
"Handling write error, recovery process enabled? {}, condition to trigger connection recovery? {}",
64+
recoveryProcess.isEnabled(), isConnectionRecoveryTriggered(e)
65+
);
66+
}
67+
if (shouldStop(recoveryProcess, e)) {
5968
throw e;
6069
}
6170
}
6271

63-
protected boolean shouldStop(BooleanSupplier showStopper, ShutdownSignalException e) {
64-
return !showStopper.getAsBoolean() || !isConnectionRecoveryTriggered(e);
72+
protected boolean shouldStop(Recovery.RecoveryProcess recoveryProcess, ShutdownSignalException e) {
73+
if (recoveryProcess.isEnabled()) {
74+
// we stop only if the error isn't likely to trigger connection recovery
75+
return !isConnectionRecoveryTriggered(e);
76+
} else {
77+
return true;
78+
}
6579
}
6680

67-
protected void dealWithWriteOperation(WriteOperation writeOperation, BooleanSupplier showStopper) throws IOException {
81+
protected void dealWithWriteOperation(WriteOperation writeOperation, Recovery.RecoveryProcess recoveryProcess) throws IOException {
6882
try {
6983
writeOperation.call();
7084
} catch (ShutdownSignalException e) {
71-
handleShutdownSignalExceptionOnWrite(showStopper, e);
85+
handleShutdownSignalExceptionOnWrite(recoveryProcess, e);
7286
// connection recovery is in progress, we ignore the exception if this point is reached
7387
}
7488
}
7589

90+
public abstract void recover(TopologyRecording topologyRecording);
91+
7692
protected interface AgentState {
7793

7894
float getRateLimit();

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.rabbitmq.client.DefaultConsumer;
2121
import com.rabbitmq.client.Envelope;
2222
import com.rabbitmq.client.ShutdownSignalException;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

2426
import java.io.ByteArrayInputStream;
2527
import java.io.DataInputStream;
@@ -31,11 +33,12 @@
3133
import java.util.concurrent.atomic.AtomicBoolean;
3234
import java.util.concurrent.atomic.AtomicInteger;
3335
import java.util.function.BiFunction;
34-
import java.util.function.BooleanSupplier;
3536

3637
public class Consumer extends AgentBase implements Runnable {
3738

38-
private ConsumerImpl q;
39+
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
40+
41+
private volatile ConsumerImpl q;
3942
private final Channel channel;
4043
private final String id;
4144
private final List<String> queueNames;
@@ -53,15 +56,15 @@ public class Consumer extends AgentBase implements Runnable {
5356

5457
private final ConsumerState state;
5558

56-
private final BooleanSupplier showStopper;
59+
private final Recovery.RecoveryProcess recoveryProcess;
5760

5861
public Consumer(Channel channel, String id,
5962
List<String> queueNames, int txSize, boolean autoAck,
6063
int multiAckEvery, Stats stats, float rateLimit, int msgLimit,
6164
int consumerLatencyInMicroSeconds,
6265
TimestampProvider timestampProvider,
6366
MulticastSet.CompletionHandler completionHandler,
64-
BooleanSupplier showStopper) {
67+
Recovery.RecoveryProcess recoveryProcess) {
6568

6669
this.channel = channel;
6770
this.id = id;
@@ -101,7 +104,8 @@ public Consumer(Channel channel, String id,
101104
}
102105

103106
this.state = new ConsumerState(rateLimit);
104-
this.showStopper = showStopper;
107+
this.recoveryProcess = recoveryProcess;
108+
this.recoveryProcess.init(this);
105109
}
106110

107111
public void run() {
@@ -140,11 +144,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
140144
} else if (currentMessageCount % multiAckEvery == 0) {
141145
channel.basicAck(envelope.getDeliveryTag(), true);
142146
}
143-
}, showStopper);
147+
}, recoveryProcess);
144148
}
145149

146150
if (txSize != 0 && currentMessageCount % txSize == 0) {
147-
dealWithWriteOperation(() -> channel.txCommit(), showStopper);
151+
dealWithWriteOperation(() -> channel.txCommit(), recoveryProcess);
148152
}
149153

150154
long diff_time = timestampProvider.getDifference(nowTimestamp, messageTimestamp);
@@ -163,7 +167,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
163167

164168
@Override
165169
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
166-
countDown();
170+
LOGGER.debug(
171+
"Consumer received shutdown signal, recovery process enabled? {}, condition to trigger connection recovery? {}",
172+
recoveryProcess.isEnabled(), isConnectionRecoveryTriggered(sig)
173+
);
174+
if (!recoveryProcess.isEnabled()) {
175+
LOGGER.debug("Counting down for consumer");
176+
countDown();
177+
}
167178
}
168179

169180
@Override
@@ -185,6 +196,21 @@ private void countDown() {
185196
}
186197
}
187198

199+
@Override
200+
public void recover(TopologyRecording topologyRecording) {
201+
for (Map.Entry<String, String> entry : consumerTagBranchMap.entrySet()) {
202+
TopologyRecording.RecordedQueue queue = topologyRecording.queue(entry.getValue());
203+
try {
204+
channel.basicConsume(queue.name(), autoAck, entry.getKey(), q);
205+
} catch (IOException e) {
206+
LOGGER.warn(
207+
"Error while recovering consumer {} on queue {} on connection {}",
208+
entry.getKey(), queue.name(), channel.getConnection().getClientProvidedName(), e
209+
);
210+
}
211+
}
212+
}
213+
188214
private static class ConsumerState implements AgentState {
189215

190216
private final float rateLimit;

0 commit comments

Comments
 (0)