Skip to content

Commit 2e042e5

Browse files
committed
Refine topology recovery filter with JDK 8 stuff
[#159461281] References #382, #383
1 parent 71ead95 commit 2e042e5

File tree

3 files changed

+30
-51
lines changed

3 files changed

+30
-51
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,17 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
7272
private final ConnectionParams params;
7373
private volatile RecoveryAwareAMQConnection delegate;
7474

75-
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>());
76-
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
77-
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());
75+
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<>());
76+
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<>());
77+
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<>());
7878

7979
// Records topology changes
80-
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
81-
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
82-
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
83-
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
84-
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
85-
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
80+
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<>());
81+
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<>());
82+
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<>());
83+
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<>());
84+
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<>());
85+
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<>());
8686

8787
private final TopologyRecoveryFilter topologyRecoveryFilter;
8888

@@ -143,28 +143,7 @@ public void run() {
143143
}
144144

145145
private TopologyRecoveryFilter letAllPassFilter() {
146-
return new TopologyRecoveryFilter() {
147-
148-
@Override
149-
public boolean filterExchange(RecordedExchange recordedExchange) {
150-
return true;
151-
}
152-
153-
@Override
154-
public boolean filterQueue(RecordedQueue recordedQueue) {
155-
return true;
156-
}
157-
158-
@Override
159-
public boolean filterBinding(RecordedBinding recordedBinding) {
160-
return true;
161-
}
162-
163-
@Override
164-
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
165-
return true;
166-
}
167-
};
146+
return new TopologyRecoveryFilter() {};
168147
}
169148

170149
/**

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryFilter.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,35 @@ public interface TopologyRecoveryFilter {
2626
* @param recordedExchange
2727
* @return true to recover the exchange, false otherwise
2828
*/
29-
boolean filterExchange(RecordedExchange recordedExchange);
29+
default boolean filterExchange(RecordedExchange recordedExchange) {
30+
return true;
31+
}
3032

3133
/**
3234
* Decides whether a queue is recovered or not.
3335
* @param recordedQueue
3436
* @return true to recover the queue, false otherwise
3537
*/
36-
boolean filterQueue(RecordedQueue recordedQueue);
38+
default boolean filterQueue(RecordedQueue recordedQueue) {
39+
return true;
40+
}
3741

3842
/**
3943
* Decides whether a binding is recovered or not.
4044
* @param recordedBinding
4145
* @return true to recover the binding, false otherwise
4246
*/
43-
boolean filterBinding(RecordedBinding recordedBinding);
47+
default boolean filterBinding(RecordedBinding recordedBinding) {
48+
return true;
49+
}
4450

4551
/**
4652
* Decides whether a consumer is recovered or not.
4753
* @param recordedConsumer
4854
* @return true to recover the consumer, false otherwise
4955
*/
50-
boolean filterConsumer(RecordedConsumer recordedConsumer);
56+
default boolean filterConsumer(RecordedConsumer recordedConsumer) {
57+
return true;
58+
}
5159

5260
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,18 @@ private static boolean resourceExists(Callable<Channel> callback) throws Excepti
112112
}
113113

114114
private static boolean queueExists(final String queue, final Connection connection) throws Exception {
115-
return resourceExists(new Callable<Channel>() {
116-
117-
@Override
118-
public Channel call() throws Exception {
119-
Channel channel = connection.createChannel();
120-
channel.queueDeclarePassive(queue);
121-
return channel;
122-
}
115+
return resourceExists(() -> {
116+
Channel channel = connection.createChannel();
117+
channel.queueDeclarePassive(queue);
118+
return channel;
123119
});
124120
}
125121

126122
private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception {
127-
return resourceExists(new Callable<Channel>() {
128-
129-
@Override
130-
public Channel call() throws Exception {
131-
Channel channel = connection.createChannel();
132-
channel.exchangeDeclarePassive(exchange);
133-
return channel;
134-
}
123+
return resourceExists(() -> {
124+
Channel channel = connection.createChannel();
125+
channel.exchangeDeclarePassive(exchange);
126+
return channel;
135127
});
136128
}
137129

0 commit comments

Comments
 (0)