|
22 | 22 | import java.util.function.BiPredicate; |
23 | 23 | import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; |
24 | 24 |
|
| 25 | +import ch.qos.logback.core.Context; |
| 26 | + |
25 | 27 | /** |
26 | 28 | * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. |
27 | 29 | * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}. |
@@ -77,27 +79,27 @@ public abstract class TopologyRecoveryRetryLogic { |
77 | 79 | * Recover a binding. |
78 | 80 | */ |
79 | 81 | public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = context -> { |
| 82 | + context.binding().recover(); |
| 83 | + return null; |
| 84 | + }; |
| 85 | + |
| 86 | + /** |
| 87 | + * Recover earlier bindings that share the same queue as this retry context |
| 88 | + */ |
| 89 | + public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_QUEUE_BINDINGS = context -> { |
80 | 90 | if (context.entity() instanceof RecordedQueueBinding) { |
81 | | - // recover all bindings for the queue. |
82 | | - // need to do this incase some bindings have already been recovered successfully before this binding failed |
| 91 | + // recover all bindings for the same queue that were recovered before this current binding |
| 92 | + // need to do this incase some bindings had already been recovered successfully before the queue was deleted & this binding failed |
83 | 93 | String queue = context.binding().getDestination(); |
84 | 94 | for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { |
85 | | - if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) { |
86 | | - recordedBinding.recover(); |
87 | | - } |
88 | | - } |
89 | | - } else if (context.entity() instanceof RecordedExchangeBinding) { |
90 | | - // recover all bindings for the exchange |
91 | | - // need to do this incase some bindings have already been recovered successfully before this binding failed |
92 | | - String exchange = context.binding().getDestination(); |
93 | | - for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { |
94 | | - if (recordedBinding instanceof RecordedExchangeBinding && exchange.equals(recordedBinding.getDestination())) { |
| 95 | + if (recordedBinding == context.entity()) { |
| 96 | + // we have gotten to the binding in this context. Since this is an ordered list we can now break |
| 97 | + // as we know we have recovered all the earlier bindings that may have existed on this queue |
| 98 | + break; |
| 99 | + } else if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) { |
95 | 100 | recordedBinding.recover(); |
96 | 101 | } |
97 | 102 | } |
98 | | - } else { |
99 | | - // should't be possible to get here, but just in case recover just this binding |
100 | | - context.binding().recover(); |
101 | 103 | } |
102 | 104 | return null; |
103 | 105 | }; |
@@ -148,7 +150,8 @@ public abstract class TopologyRecoveryRetryLogic { |
148 | 150 | public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder() |
149 | 151 | .bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) |
150 | 152 | .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) |
151 | | - .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) |
| 153 | + .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING) |
| 154 | + .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)) |
152 | 155 | .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER) |
153 | 156 | .andThen(RECOVER_CONSUMER_QUEUE_BINDINGS))); |
154 | 157 | } |
0 commit comments