Skip to content

Commit 71ead95

Browse files
committed
Merge branch '4.x.x-stable' into 5.x.x-stable
Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/ConnectionParams.java src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java src/test/java/com/rabbitmq/client/test/RpcTest.java
2 parents 4da4704 + bd09086 commit 71ead95

File tree

11 files changed

+639
-52
lines changed

11 files changed

+639
-52
lines changed

RUNNING_TESTS.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ For details on running specific tests, see below.
4141

4242
## Running a Specific Test Suite
4343

44-
To run a specific test suite you should execute one of the following in the
44+
To run a specific test suite, execute one of the following in the
4545
top-level directory of the source tree:
4646

4747
* To run the client unit tests:
@@ -59,7 +59,14 @@ top-level directory of the source tree:
5959
* To run a single test:
6060

6161
```
62-
./mvnw -Ddeps.dir=$(pwd)/deps/deps verify -Dit.test=DeadLetterExchange
62+
./mvnw -Ddeps.dir=$(pwd)/deps verify -Dit.test=DeadLetterExchange
63+
```
64+
65+
When running from the repository cloned as part of the [RabbitMQ public umbrella](https://github.com/rabbitmq/rabbitmq-public-umbrella),
66+
the `deps.dir` property path may have to change, e.g.
67+
68+
```
69+
./mvnw -Ddeps.dir=$(pwd)/.. verify -Dit.test=ConnectionRecovery
6370
```
6471

6572
For example, to run the client tests:
@@ -175,4 +182,4 @@ mvn verify -P '!setup-test-cluster'
175182
```
176183

177184
Note that by doing so some tests will fail as they require `rabbitmqctl` to
178-
control the running nodes.
185+
control the running nodes.

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.rabbitmq.client.impl.nio.NioParams;
3030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
3131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
32+
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
33+
3234
import java.io.IOException;
3335
import java.net.URI;
3436
import java.net.URISyntaxException;
@@ -178,6 +180,12 @@ public class ConnectionFactory implements Cloneable {
178180
*/
179181
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
180182

183+
/**
184+
* Filter to include/exclude entities from topology recovery.
185+
* @since 4.8.0
186+
*/
187+
private TopologyRecoveryFilter topologyRecoveryFilter;
188+
181189
/**
182190
* Condition to trigger automatic connection recovery.
183191
* @since 5.4.0
@@ -1077,6 +1085,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10771085
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
10781086
result.setWorkPoolTimeout(workPoolTimeout);
10791087
result.setErrorOnWriteListener(errorOnWriteListener);
1088+
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
10801089
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
10811090
return result;
10821091
}
@@ -1428,6 +1437,14 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
14281437
this.errorOnWriteListener = errorOnWriteListener;
14291438
}
14301439

1440+
/**
1441+
* Set filter to include/exclude entities from topology recovery.
1442+
* @since 4.8.0
1443+
*/
1444+
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
1445+
this.topologyRecoveryFilter = topologyRecoveryFilter;
1446+
}
1447+
14311448
/**
14321449
* Allows to decide on automatic connection recovery is triggered.
14331450
* Default is for shutdown not initiated by application or missed heartbeat errors.

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
23+
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2324

2425
import java.util.Map;
2526
import java.util.concurrent.ExecutorService;
@@ -48,6 +49,7 @@ public class ConnectionParams {
4849
private boolean channelShouldCheckRpcResponseType;
4950
private ErrorOnWriteListener errorOnWriteListener;
5051
private int workPoolTimeout = -1;
52+
private TopologyRecoveryFilter topologyRecoveryFilter;
5153
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
5254

5355
private ExceptionHandler exceptionHandler;
@@ -239,11 +241,20 @@ public int getWorkPoolTimeout() {
239241
return workPoolTimeout;
240242
}
241243

244+
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
245+
this.topologyRecoveryFilter = topologyRecoveryFilter;
246+
}
247+
248+
public TopologyRecoveryFilter getTopologyRecoveryFilter() {
249+
return topologyRecoveryFilter;
250+
}
251+
242252
public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition) {
243253
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
244254
}
245255

246256
public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringCondition() {
247257
return connectionRecoveryTriggeringCondition;
248258
}
259+
249260
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
8383
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
8484
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
8585
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
86+
87+
private final TopologyRecoveryFilter topologyRecoveryFilter;
8688

8789
// Used to block connection recovery attempts after close() is invoked.
8890
private volatile boolean manuallyClosed = false;
@@ -108,11 +110,11 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
108110
this.connectionRecoveryTriggeringCondition = params.getConnectionRecoveryTriggeringCondition() == null ?
109111
DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION : params.getConnectionRecoveryTriggeringCondition();
110112

111-
System.out.println(this.connectionRecoveryTriggeringCondition);
112-
113113
setupErrorOnWriteListenerForPotentialRecovery();
114114

115115
this.channels = new ConcurrentHashMap<>();
116+
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
117+
letAllPassFilter() : params.getTopologyRecoveryFilter();
116118
}
117119

118120
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -140,6 +142,31 @@ public void run() {
140142
});
141143
}
142144

145+
private TopologyRecoveryFilter letAllPassFilter() {
146+
return new TopologyRecoveryFilter() {
147+
148+
@Override
149+
public boolean filterExchange(RecordedExchange recordedExchange) {
150+
return true;
151+
}
152+
153+
@Override
154+
public boolean filterQueue(RecordedQueue recordedQueue) {
155+
return true;
156+
}
157+
158+
@Override
159+
public boolean filterBinding(RecordedBinding recordedBinding) {
160+
return true;
161+
}
162+
163+
@Override
164+
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
165+
return true;
166+
}
167+
};
168+
}
169+
143170
/**
144171
* Private API.
145172
* @throws IOException
@@ -677,8 +704,10 @@ private void recoverTopology(final ExecutorService executor) {
677704
private void recoverExchange(final RecordedExchange x) {
678705
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
679706
try {
680-
x.recover();
681-
LOGGER.debug("{} has recovered", x);
707+
if (topologyRecoveryFilter.filterExchange(x)) {
708+
x.recover();
709+
LOGGER.debug("{} has recovered", x);
710+
}
682711
} catch (Exception cause) {
683712
final String message = "Caught an exception while recovering exchange " + x.getName() +
684713
": " + cause.getMessage();
@@ -688,30 +717,33 @@ private void recoverExchange(final RecordedExchange x) {
688717
}
689718

690719
private void recoverQueue(final String oldName, final RecordedQueue q) {
691-
LOGGER.debug("Recovering {}", q);
720+
692721
try {
693-
q.recover();
694-
String newName = q.getName();
695-
if (!oldName.equals(newName)) {
696-
// make sure server-named queues are re-added with
697-
// their new names. MK.
698-
synchronized (this.recordedQueues) {
699-
this.propagateQueueNameChangeToBindings(oldName, newName);
700-
this.propagateQueueNameChangeToConsumers(oldName, newName);
701-
// bug26552:
702-
// remove old name after we've updated the bindings and consumers,
703-
// plus only for server-named queues, both to make sure we don't lose
704-
// anything to recover. MK.
705-
if(q.isServerNamed()) {
706-
deleteRecordedQueue(oldName);
722+
if (topologyRecoveryFilter.filterQueue(q)) {
723+
LOGGER.debug("Recovering {}", q);
724+
q.recover();
725+
String newName = q.getName();
726+
if (!oldName.equals(newName)) {
727+
// make sure server-named queues are re-added with
728+
// their new names. MK.
729+
synchronized (this.recordedQueues) {
730+
this.propagateQueueNameChangeToBindings(oldName, newName);
731+
this.propagateQueueNameChangeToConsumers(oldName, newName);
732+
// bug26552:
733+
// remove old name after we've updated the bindings and consumers,
734+
// plus only for server-named queues, both to make sure we don't lose
735+
// anything to recover. MK.
736+
if(q.isServerNamed()) {
737+
deleteRecordedQueue(oldName);
738+
}
739+
this.recordedQueues.put(newName, q);
707740
}
708-
this.recordedQueues.put(newName, q);
709741
}
742+
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
743+
qrl.queueRecovered(oldName, newName);
744+
}
745+
LOGGER.debug("{} has recovered", q);
710746
}
711-
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
712-
qrl.queueRecovered(oldName, newName);
713-
}
714-
LOGGER.debug("{} has recovered", q);
715747
} catch (Exception cause) {
716748
final String message = "Caught an exception while recovering queue " + oldName +
717749
": " + cause.getMessage();
@@ -722,8 +754,10 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
722754

723755
private void recoverBinding(final RecordedBinding b) {
724756
try {
725-
b.recover();
726-
LOGGER.debug("{} has recovered", b);
757+
if (this.topologyRecoveryFilter.filterBinding(b)) {
758+
b.recover();
759+
LOGGER.debug("{} has recovered", b);
760+
}
727761
} catch (Exception cause) {
728762
String message = "Caught an exception while recovering binding between " + b.getSource() +
729763
" and " + b.getDestination() + ": " + cause.getMessage();
@@ -733,22 +767,24 @@ private void recoverBinding(final RecordedBinding b) {
733767
}
734768

735769
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
736-
LOGGER.debug("Recovering {}", consumer);
737770
try {
738-
String newTag = consumer.recover();
739-
// make sure server-generated tags are re-added. MK.
740-
if(tag != null && !tag.equals(newTag)) {
741-
synchronized (this.consumers) {
742-
this.consumers.remove(tag);
743-
this.consumers.put(newTag, consumer);
771+
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
772+
LOGGER.debug("Recovering {}", consumer);
773+
String newTag = consumer.recover();
774+
// make sure server-generated tags are re-added. MK.
775+
if(tag != null && !tag.equals(newTag)) {
776+
synchronized (this.consumers) {
777+
this.consumers.remove(tag);
778+
this.consumers.put(newTag, consumer);
779+
}
780+
consumer.getChannel().updateConsumerTag(tag, newTag);
744781
}
745-
consumer.getChannel().updateConsumerTag(tag, newTag);
746-
}
747782

748-
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
749-
crl.consumerRecovered(tag, newTag);
783+
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
784+
crl.consumerRecovered(tag, newTag);
785+
}
786+
LOGGER.debug("{} has recovered", consumer);
750787
}
751-
LOGGER.debug("{} has recovered", consumer);
752788
} catch (Exception cause) {
753789
final String message = "Caught an exception while recovering consumer " + tag +
754790
": " + cause.getMessage();

src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public String getDestination() {
5959
return destination;
6060
}
6161

62+
public String getRoutingKey() {
63+
return routingKey;
64+
}
65+
6266
public Map<String, Object> getArguments() {
6367
return arguments;
6468
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.recovery;
17+
18+
/**
19+
* Filter to know whether entities should be recovered or not.
20+
* @since 4.8.0
21+
*/
22+
public interface TopologyRecoveryFilter {
23+
24+
/**
25+
* Decides whether an exchange is recovered or not.
26+
* @param recordedExchange
27+
* @return true to recover the exchange, false otherwise
28+
*/
29+
boolean filterExchange(RecordedExchange recordedExchange);
30+
31+
/**
32+
* Decides whether a queue is recovered or not.
33+
* @param recordedQueue
34+
* @return true to recover the queue, false otherwise
35+
*/
36+
boolean filterQueue(RecordedQueue recordedQueue);
37+
38+
/**
39+
* Decides whether a binding is recovered or not.
40+
* @param recordedBinding
41+
* @return true to recover the binding, false otherwise
42+
*/
43+
boolean filterBinding(RecordedBinding recordedBinding);
44+
45+
/**
46+
* Decides whether a consumer is recovered or not.
47+
* @param recordedConsumer
48+
* @return true to recover the consumer, false otherwise
49+
*/
50+
boolean filterConsumer(RecordedConsumer recordedConsumer);
51+
52+
}

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,26 @@ protected void deleteExchange(String x) throws IOException {
300300
channel.exchangeDelete(x);
301301
}
302302

303+
protected void deleteExchanges(String [] exchanges) throws IOException {
304+
if (exchanges != null) {
305+
for (String exchange : exchanges) {
306+
deleteExchange(exchange);
307+
}
308+
}
309+
}
310+
303311
protected void deleteQueue(String q) throws IOException {
304312
channel.queueDelete(q);
305313
}
306314

315+
protected void deleteQueues(String [] queues) throws IOException {
316+
if (queues != null) {
317+
for (String queue : queues) {
318+
deleteQueue(queue);
319+
}
320+
}
321+
}
322+
307323
protected void clearAllResourceAlarms() throws IOException, InterruptedException {
308324
clearResourceAlarm("memory");
309325
clearResourceAlarm("disk");

0 commit comments

Comments
 (0)