Skip to content

Commit a5d3e4f

Browse files
vikinghawkacogoluegnes
authored andcommitted
add topology recovery started listener
(cherry picked from commit 97190dd)
1 parent 87661ee commit a5d3e4f

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

src/main/java/com/rabbitmq/client/RecoveryListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ public interface RecoveryListener {
3636
* @param recoverable a {@link Recoverable} connection.
3737
*/
3838
void handleRecoveryStarted(Recoverable recoverable);
39+
40+
/**
41+
* Invoked before automatic topology recovery starts.
42+
* @param recoverable a {@link Recoverable} connection.
43+
*/
44+
default void handleTopologyRecoveryStarted(Recoverable recoverable) {}
3945
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,10 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
561561

562562
private synchronized void beginAutomaticRecovery() throws InterruptedException {
563563
this.wait(this.params.getRecoveryDelayHandler().getDelay(0));
564+
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
565+
if (delay > 0) {
566+
this.wait(delay);
567+
}
564568

565569
this.notifyRecoveryListenersStarted();
566570

@@ -576,6 +580,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
576580
// don't assign new delegate connection until channel recovery is complete
577581
this.delegate = newConn;
578582
if (this.params.isTopologyRecoveryEnabled()) {
583+
notifyTopologyRecoveryListenersStarted();
579584
recoverTopology(params.getTopologyRecoveryExecutor());
580585
}
581586
this.notifyRecoveryListenersComplete();
@@ -650,6 +655,12 @@ private void notifyRecoveryListenersStarted() {
650655
}
651656
}
652657

658+
private void notifyTopologyRecoveryListenersStarted() {
659+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
660+
f.handleTopologyRecoveryStarted(this);
661+
}
662+
}
663+
653664
private void recoverTopology(final ExecutorService executor) {
654665
// The recovery sequence is the following:
655666
// 1. Recover exchanges

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public String getPassword() {
170170
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
171171
@Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
172172
final List<String> events = new CopyOnWriteArrayList<String>();
173-
final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete
173+
final CountDownLatch latch = new CountDownLatch(3); // one when started, another when complete
174174
connection.addShutdownListener(new ShutdownListener() {
175175
@Override
176176
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -202,6 +202,10 @@ public void handleRecovery(Recoverable recoverable) {
202202
public void handleRecoveryStarted(Recoverable recoverable) {
203203
latch.countDown();
204204
}
205+
@Override
206+
public void handleTopologyRecoveryStarted(Recoverable recoverable) {
207+
latch.countDown();
208+
}
205209
});
206210
assertThat(connection.isOpen()).isTrue();
207211
closeAndWaitForRecovery();

0 commit comments

Comments
 (0)