Skip to content

Commit de1cf7e

Browse files
garyrussellartembilan
authored andcommitted
GH-1315: Add onFailed() to ConnectionListener
Resolves #1315 **cherry-pick to 2.2.x** * Fix `@since` in the new test class
1 parent d12cb45 commit de1cf7e

File tree

4 files changed

+104
-14
lines changed

4 files changed

+104
-14
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -520,8 +520,10 @@ public void handleRecovery(Recoverable recoverable) {
520520

521521
return connection;
522522
}
523-
catch (IOException | TimeoutException e) {
524-
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
523+
catch (IOException | TimeoutException ex) {
524+
RuntimeException converted = RabbitExceptionTranslator.convertRabbitAccessException(ex);
525+
this.connectionListener.onFailed(ex);
526+
throw converted;
525527
}
526528
}
527529

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeConnectionListener.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import com.rabbitmq.client.ShutdownSignalException;
2424

2525
/**
26+
* A composite listener that invokes its delegages in turn.
27+
*
2628
* @author Dave Syer
2729
* @author Gary Russell
2830
*
@@ -31,23 +33,24 @@ public class CompositeConnectionListener implements ConnectionListener {
3133

3234
private List<ConnectionListener> delegates = new CopyOnWriteArrayList<ConnectionListener>();
3335

36+
@Override
3437
public void onCreate(Connection connection) {
35-
for (ConnectionListener delegate : this.delegates) {
36-
delegate.onCreate(connection);
37-
}
38+
this.delegates.forEach(delegate -> delegate.onCreate(connection));
3839
}
3940

41+
@Override
4042
public void onClose(Connection connection) {
41-
for (ConnectionListener delegate : this.delegates) {
42-
delegate.onClose(connection);
43-
}
43+
this.delegates.forEach(delegate -> delegate.onClose(connection));
4444
}
4545

4646
@Override
4747
public void onShutDown(ShutdownSignalException signal) {
48-
for (ConnectionListener delegate : this.delegates) {
49-
delegate.onShutDown(signal);
50-
}
48+
this.delegates.forEach(delegate -> delegate.onShutDown(signal));
49+
}
50+
51+
@Override
52+
public void onFailed(Exception exception) {
53+
this.delegates.forEach(delegate -> delegate.onFailed(exception));
5154
}
5255

5356
public void setDelegates(List<? extends ConnectionListener> delegates) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionListener.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,4 +50,12 @@ default void onClose(Connection connection) {
5050
default void onShutDown(ShutdownSignalException signal) {
5151
}
5252

53+
/**
54+
* Called when a connection couldn't be established.
55+
* @param exception the exception thrown.
56+
* @since 2.2.17
57+
*/
58+
default void onFailed(Exception exception) {
59+
}
60+
5361
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.amqp.AmqpIOException;
27+
28+
/**
29+
* @author Gary Russell
30+
* @since 2.2.17
31+
*
32+
*/
33+
public class ConnnectionListenerTests {
34+
35+
@Test
36+
void cantConnectCCF() {
37+
CachingConnectionFactory ccf = new CachingConnectionFactory(rcf());
38+
cantConnect(ccf);
39+
}
40+
41+
@Test
42+
void cantConnectTCCF() {
43+
ThreadChannelConnectionFactory tccf = new ThreadChannelConnectionFactory(rcf());
44+
cantConnect(tccf);
45+
}
46+
47+
@Test
48+
void cantConnectPCCF() {
49+
PooledChannelConnectionFactory pccf = new PooledChannelConnectionFactory(rcf());
50+
cantConnect(pccf);
51+
}
52+
53+
private com.rabbitmq.client.ConnectionFactory rcf() {
54+
com.rabbitmq.client.ConnectionFactory rcf = new com.rabbitmq.client.ConnectionFactory();
55+
rcf.setHost("junk.host");
56+
return rcf;
57+
}
58+
59+
private void cantConnect(ConnectionFactory cf) {
60+
AtomicBoolean failed = new AtomicBoolean();
61+
cf.addConnectionListener(new ConnectionListener() {
62+
63+
@Override
64+
public void onCreate(Connection connection) {
65+
}
66+
67+
@Override
68+
public void onFailed(Exception exception) {
69+
failed.set(true);
70+
}
71+
72+
});
73+
assertThatExceptionOfType(AmqpIOException.class).isThrownBy(() -> cf.createConnection());
74+
assertThat(failed.get()).isTrue();
75+
}
76+
77+
}

0 commit comments

Comments
 (0)