Skip to content

Commit fed2f31

Browse files
authored
GH-1268: New Factories: Call ConnectionListener(s)
Resolves #1268 * GH-1274: Support ChannelListener in new Factories Resolves #1274 * Apply suggestions from code review PR Suggestions.
1 parent 60b7a58 commit fed2f31

File tree

6 files changed

+266
-60
lines changed

6 files changed

+266
-60
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.commons.logging.LogFactory;
3838

3939
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
40+
import org.springframework.amqp.support.ConditionalExceptionLogger;
4041
import org.springframework.beans.factory.BeanNameAware;
4142
import org.springframework.beans.factory.DisposableBean;
4243
import org.springframework.context.ApplicationContext;
@@ -56,6 +57,8 @@
5657
import com.rabbitmq.client.BlockedListener;
5758
import com.rabbitmq.client.Recoverable;
5859
import com.rabbitmq.client.RecoveryListener;
60+
import com.rabbitmq.client.ShutdownListener;
61+
import com.rabbitmq.client.ShutdownSignalException;
5962
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
6063

6164
/**
@@ -67,7 +70,8 @@
6770
*
6871
*/
6972
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
70-
ApplicationContextAware, ApplicationEventPublisherAware, ApplicationListener<ContextClosedEvent> {
73+
ApplicationContextAware, ApplicationEventPublisherAware, ApplicationListener<ContextClosedEvent>,
74+
ShutdownListener {
7175

7276
/**
7377
* The mode used to shuffle the addresses.
@@ -152,6 +156,8 @@ public void handleRecovery(Recoverable recoverable) {
152156

153157
private volatile boolean contextStopped;
154158

159+
protected ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
160+
155161
/**
156162
* Create a new AbstractConnectionFactory for the given target ConnectionFactory,
157163
* with no publisher connection factory.
@@ -626,6 +632,19 @@ protected final String getDefaultHostName() {
626632
return temp;
627633
}
628634

635+
@Override
636+
public void shutdownCompleted(ShutdownSignalException cause) {
637+
int protocolClassId = cause.getReason().protocolClassId();
638+
if (protocolClassId == RabbitUtils.CHANNEL_PROTOCOL_CLASS_ID_20) {
639+
this.closeExceptionLogger.log(this.logger, "Shutdown Signal", cause);
640+
getChannelListener().onShutDown(cause);
641+
}
642+
else if (protocolClassId == RabbitUtils.CONNECTION_PROTOCOL_CLASS_ID_10) {
643+
getConnectionListener().onShutDown(cause);
644+
}
645+
646+
}
647+
629648
@Override
630649
public void destroy() {
631650
if (this.publisherConnectionFactory != null) {
@@ -666,4 +685,39 @@ public void handleUnblocked() {
666685

667686
}
668687

688+
/**
689+
* Default implementation of {@link ConditionalExceptionLogger} for logging channel
690+
* close exceptions.
691+
* @since 1.5
692+
*/
693+
private static class DefaultChannelCloseLogger implements ConditionalExceptionLogger {
694+
695+
DefaultChannelCloseLogger() {
696+
}
697+
698+
@Override
699+
public void log(Log logger, String message, Throwable t) {
700+
if (t instanceof ShutdownSignalException) {
701+
ShutdownSignalException cause = (ShutdownSignalException) t;
702+
if (RabbitUtils.isPassiveDeclarationChannelClose(cause)) {
703+
if (logger.isDebugEnabled()) {
704+
logger.debug(message + ": " + cause.getMessage());
705+
}
706+
}
707+
else if (RabbitUtils.isExclusiveUseChannelClose(cause)) {
708+
if (logger.isInfoEnabled()) {
709+
logger.info(message + ": " + cause.getMessage());
710+
}
711+
}
712+
else if (!RabbitUtils.isNormalChannelClose(cause)) {
713+
logger.error(message + ": " + cause.getMessage());
714+
}
715+
}
716+
else {
717+
logger.error("Unexpected invocation of " + getClass() + ", with message: " + message, t);
718+
}
719+
}
720+
721+
}
722+
669723
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import java.util.concurrent.atomic.AtomicBoolean;
4646
import java.util.concurrent.atomic.AtomicInteger;
4747

48-
import org.apache.commons.logging.Log;
49-
5048
import org.springframework.amqp.AmqpApplicationContextClosedException;
5149
import org.springframework.amqp.AmqpException;
5250
import org.springframework.amqp.AmqpTimeoutException;
@@ -207,8 +205,6 @@ public enum ConfirmType {
207205

208206
private boolean publisherReturns;
209207

210-
private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
211-
212208
private PublisherCallbackChannelFactory publisherChannelFactory = PublisherCallbackChannelImpl.factory();
213209

214210
private volatile boolean active = true;
@@ -536,19 +532,6 @@ public void addConnectionListener(ConnectionListener listener) {
536532
}
537533
}
538534

539-
@Override
540-
public void shutdownCompleted(ShutdownSignalException cause) {
541-
this.closeExceptionLogger.log(logger, "Channel shutdown", cause);
542-
int protocolClassId = cause.getReason().protocolClassId();
543-
if (protocolClassId == RabbitUtils.CHANNEL_PROTOCOL_CLASS_ID_20) {
544-
getChannelListener().onShutDown(cause);
545-
}
546-
else if (protocolClassId == RabbitUtils.CONNECTION_PROTOCOL_CLASS_ID_10) {
547-
getConnectionListener().onShutDown(cause);
548-
}
549-
550-
}
551-
552535
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
553536
Semaphore permits = null;
554537
if (this.channelCheckoutTimeout > 0) {
@@ -1548,39 +1531,4 @@ public String toString() {
15481531

15491532
}
15501533

1551-
/**
1552-
* Default implementation of {@link ConditionalExceptionLogger} for logging channel
1553-
* close exceptions.
1554-
* @since 1.5
1555-
*/
1556-
private static class DefaultChannelCloseLogger implements ConditionalExceptionLogger {
1557-
1558-
DefaultChannelCloseLogger() {
1559-
}
1560-
1561-
@Override
1562-
public void log(Log logger, String message, Throwable t) {
1563-
if (t instanceof ShutdownSignalException) {
1564-
ShutdownSignalException cause = (ShutdownSignalException) t;
1565-
if (RabbitUtils.isPassiveDeclarationChannelClose(cause)) {
1566-
if (logger.isDebugEnabled()) {
1567-
logger.debug(message + ": " + cause.getMessage());
1568-
}
1569-
}
1570-
else if (RabbitUtils.isExclusiveUseChannelClose(cause)) {
1571-
if (logger.isInfoEnabled()) {
1572-
logger.info(message + ": " + cause.getMessage());
1573-
}
1574-
}
1575-
else if (!RabbitUtils.isNormalChannelClose(cause)) {
1576-
logger.error(message + ": " + cause.getMessage());
1577-
}
1578-
}
1579-
else {
1580-
logger.error("Unexpected invocation of " + this.getClass() + ", with message: " + message, t);
1581-
}
1582-
}
1583-
1584-
}
1585-
15861534
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.rabbitmq.client.Channel;
4242
import com.rabbitmq.client.ConnectionFactory;
43+
import com.rabbitmq.client.ShutdownListener;
4344

4445
/**
4546
* A very simple connection factory that caches channels using Apache Pool2
@@ -48,11 +49,10 @@
4849
* a callback.
4950
*
5051
* @author Gary Russell
51-
*
5252
* @since 2.3
5353
*
5454
*/
55-
public class PooledChannelConnectionFactory extends AbstractConnectionFactory {
55+
public class PooledChannelConnectionFactory extends AbstractConnectionFactory implements ShutdownListener {
5656

5757
private volatile ConnectionWrapper connection;
5858

@@ -103,21 +103,42 @@ public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
103103
this.simplePublisherConfirms = simplePublisherConfirms;
104104
}
105105

106+
@Override
107+
public void addConnectionListener(ConnectionListener listener) {
108+
super.addConnectionListener(listener); // handles publishing sub-factory
109+
// If the connection is already alive we assume that the new listener wants to be notified
110+
if (this.connection != null && this.connection.isOpen()) {
111+
listener.onCreate(this.connection);
112+
}
113+
}
114+
106115
@Override
107116
public synchronized Connection createConnection() throws AmqpException {
108117
if (this.connection == null || !this.connection.isOpen()) {
109118
Connection bareConnection = createBareConnection(); // NOSONAR - see destroy()
110119
this.connection = new ConnectionWrapper(bareConnection.getDelegate(), getCloseTimeout(), // NOSONAR
111-
this.simplePublisherConfirms, this.poolConfigurer);
120+
this.simplePublisherConfirms, this.poolConfigurer, getChannelListener());
121+
getConnectionListener().onCreate(this.connection);
112122
}
113123
return this.connection;
114124
}
115125

126+
/**
127+
* Close the connection(s). This will impact any in-process operations. New
128+
* connection(s) will be created on demand after this method returns. This might be
129+
* used to force a reconnect to the primary broker after failing over to a secondary
130+
* broker.
131+
*/
132+
public void resetConnection() {
133+
destroy();
134+
}
135+
116136
@Override
117137
public synchronized void destroy() {
118138
super.destroy();
119139
if (this.connection != null) {
120140
this.connection.forceClose();
141+
getConnectionListener().onClose(this.connection);
121142
this.connection = null;
122143
}
123144
}
@@ -132,8 +153,10 @@ private static final class ConnectionWrapper extends SimpleConnection {
132153

133154
private final boolean simplePublisherConfirms;
134155

156+
private final ChannelListener channelListener;
157+
135158
ConnectionWrapper(com.rabbitmq.client.Connection delegate, int closeTimeout, boolean simplePublisherConfirms,
136-
BiConsumer<GenericObjectPool<Channel>, Boolean> configurer) {
159+
BiConsumer<GenericObjectPool<Channel>, Boolean> configurer, ChannelListener channelListener) {
137160

138161
super(delegate, closeTimeout);
139162
GenericObjectPool<Channel> pool = new GenericObjectPool<>(new ChannelFactory());
@@ -143,12 +166,15 @@ private static final class ConnectionWrapper extends SimpleConnection {
143166
configurer.accept(pool, true);
144167
this.txChannels = pool;
145168
this.simplePublisherConfirms = simplePublisherConfirms;
169+
this.channelListener = channelListener;
146170
}
147171

148172
@Override
149173
public Channel createChannel(boolean transactional) {
150174
try {
151-
return transactional ? this.txChannels.borrowObject() : this.channels.borrowObject();
175+
Channel channel = transactional ? this.txChannels.borrowObject() : this.channels.borrowObject();
176+
this.channelListener.onCreate(channel, transactional);
177+
return channel;
152178
}
153179
catch (Exception e) {
154180
throw RabbitExceptionTranslator.convertRabbitAccessException(e);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,18 @@
3030

3131
import com.rabbitmq.client.Channel;
3232
import com.rabbitmq.client.ConnectionFactory;
33+
import com.rabbitmq.client.ShutdownListener;
3334

3435
/**
3536
* A very simple connection factory that caches a channel per thread. Users are
3637
* responsible for releasing the thread's channel by calling
3738
* {@link #closeThreadChannel()}.
3839
*
3940
* @author Gary Russell
40-
*
4141
* @since 2.3
4242
*
4343
*/
44-
public class ThreadChannelConnectionFactory extends AbstractConnectionFactory {
44+
public class ThreadChannelConnectionFactory extends AbstractConnectionFactory implements ShutdownListener {
4545

4646
private volatile ConnectionWrapper connection;
4747

@@ -80,11 +80,21 @@ public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
8080
this.simplePublisherConfirms = simplePublisherConfirms;
8181
}
8282

83+
@Override
84+
public void addConnectionListener(ConnectionListener listener) {
85+
super.addConnectionListener(listener); // handles publishing sub-factory
86+
// If the connection is already alive we assume that the new listener wants to be notified
87+
if (this.connection != null && this.connection.isOpen()) {
88+
listener.onCreate(this.connection);
89+
}
90+
}
91+
8392
@Override
8493
public synchronized Connection createConnection() throws AmqpException {
8594
if (this.connection == null || !this.connection.isOpen()) {
8695
Connection bareConnection = createBareConnection(); // NOSONAR - see destroy()
8796
this.connection = new ConnectionWrapper(bareConnection.getDelegate(), getCloseTimeout());
97+
getConnectionListener().onCreate(this.connection);
8898
}
8999
return this.connection;
90100
}
@@ -99,6 +109,16 @@ public void closeThreadChannel() {
99109
}
100110
}
101111

112+
/**
113+
* Close the connection(s). This will impact any in-process operations. New
114+
* connection(s) will be created on demand after this method returns. This might be
115+
* used to force a reconnect to the primary broker after failing over to a secondary
116+
* broker.
117+
*/
118+
public void resetConnection() {
119+
destroy();
120+
}
121+
102122
@Override
103123
public synchronized void destroy() {
104124
super.destroy();
@@ -148,6 +168,7 @@ public Channel createChannel(boolean transactional) {
148168
this.channels.set(channel);
149169
}
150170
}
171+
getChannelListener().onCreate(channel, transactional);
151172
return channel;
152173
}
153174

@@ -234,6 +255,7 @@ private void physicalClose(Channel channel) {
234255

235256
void forceClose() {
236257
super.close();
258+
getConnectionListener().onClose(this);
237259
}
238260

239261
}

0 commit comments

Comments
 (0)