1818import com .rabbitmq .client .AMQP ;
1919import com .rabbitmq .client .ShutdownSignalException ;
2020
21+ import java .util .List ;
2122import java .util .function .BiPredicate ;
2223import java .util .function .Predicate ;
2324
25+ import static com .rabbitmq .client .impl .recovery .TopologyRecoveryRetryHandlerBuilder .builder ;
26+
2427/**
2528 * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}.
2629 * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}.
3235 */
3336public abstract class TopologyRecoveryRetryLogic {
3437
38+ /**
39+ * Channel has been closed because of a resource that doesn't exist.
40+ */
3541 public static final BiPredicate <RecordedEntity , Exception > CHANNEL_CLOSED_NOT_FOUND = (entity , ex ) -> {
3642 if (ex .getCause () instanceof ShutdownSignalException ) {
3743 ShutdownSignalException cause = (ShutdownSignalException ) ex .getCause ();
@@ -42,13 +48,19 @@ public abstract class TopologyRecoveryRetryLogic {
4248 return false ;
4349 };
4450
51+ /**
52+ * Recover a channel.
53+ */
4554 public static final DefaultRetryHandler .RetryOperation <Void > RECOVER_CHANNEL = context -> {
4655 if (!context .entity ().getChannel ().isOpen ()) {
4756 context .connection ().recoverChannel (context .entity ().getChannel ());
4857 }
4958 return null ;
5059 };
5160
61+ /**
62+ * Recover the destination queue of a binding.
63+ */
5264 public static final DefaultRetryHandler .RetryOperation <Void > RECOVER_BINDING_QUEUE = context -> {
5365 if (context .entity () instanceof RecordedQueueBinding ) {
5466 RecordedBinding binding = context .binding ();
@@ -63,11 +75,17 @@ public abstract class TopologyRecoveryRetryLogic {
6375 return null ;
6476 };
6577
78+ /**
79+ * Recover a binding.
80+ */
6681 public static final DefaultRetryHandler .RetryOperation <Void > RECOVER_BINDING = context -> {
6782 context .binding ().recover ();
6883 return null ;
6984 };
7085
86+ /**
87+ * Recover the queue of a consumer.
88+ */
7189 public static final DefaultRetryHandler .RetryOperation <Void > RECOVER_CONSUMER_QUEUE = context -> {
7290 if (context .entity () instanceof RecordedConsumer ) {
7391 RecordedConsumer consumer = context .consumer ();
@@ -82,5 +100,37 @@ public abstract class TopologyRecoveryRetryLogic {
82100 return null ;
83101 };
84102
103+ /**
104+ * Recover all the bindings of the queue of a consumer.
105+ */
106+ public static final DefaultRetryHandler .RetryOperation <Void > RECOVER_CONSUMER_QUEUE_BINDINGS = context -> {
107+ if (context .entity () instanceof RecordedConsumer ) {
108+ String queue = context .consumer ().getQueue ();
109+ for (RecordedBinding recordedBinding : context .connection ().getRecordedBindings ()) {
110+ if (recordedBinding instanceof RecordedQueueBinding && queue .equals (recordedBinding .getDestination ())) {
111+ recordedBinding .recover ();
112+ }
113+ }
114+ }
115+ return null ;
116+ };
117+
118+ /**
119+ * Recover a consumer.
120+ */
85121 public static final DefaultRetryHandler .RetryOperation <String > RECOVER_CONSUMER = context -> context .consumer ().recover ();
122+
123+ /**
124+ * Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
125+ * when their respective queue is not found.
126+ * This retry handler can be useful for long recovery processes, whereby auto-delete queues
127+ * can be deleted between queue recovery and binding/consumer recovery.
128+ */
129+ public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder ()
130+ .bindingRecoveryRetryCondition (CHANNEL_CLOSED_NOT_FOUND )
131+ .consumerRecoveryRetryCondition (CHANNEL_CLOSED_NOT_FOUND )
132+ .bindingRecoveryRetryOperation (RECOVER_CHANNEL .andThen (RECOVER_BINDING_QUEUE ).andThen (RECOVER_BINDING ))
133+ .consumerRecoveryRetryOperation (RECOVER_CHANNEL .andThen (RECOVER_CONSUMER_QUEUE .andThen (RECOVER_CONSUMER )
134+ .andThen (RECOVER_CONSUMER_QUEUE_BINDINGS )))
135+ .build ();
86136}
0 commit comments