Skip to content

Commit 58820af

Browse files
garyrussellartembilan
authored andcommitted
New Conn. Factories - Honor Physical Close Request
The new connection factories did not honor `RabbitUtils.isPhysicalCloseRequired()`.
1 parent 2aa3ee3 commit 58820af

File tree

6 files changed

+139
-29
lines changed

6 files changed

+139
-29
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,7 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
13381338
if (logger.isDebugEnabled()) {
13391339
logger.debug("Closing cached Channel: " + this.target);
13401340
}
1341+
RabbitUtils.clearPhysicalCloseRequired();
13411342
if (this.target == null) {
13421343
return;
13431344
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import java.io.IOException;
20+
import java.util.concurrent.TimeoutException;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2022
import java.util.concurrent.atomic.AtomicReference;
2123
import java.util.function.BiConsumer;
2224

2325
import org.aopalliance.aop.Advice;
2426
import org.aopalliance.intercept.MethodInterceptor;
27+
import org.apache.commons.logging.Log;
28+
import org.apache.commons.logging.LogFactory;
2529
import org.apache.commons.pool2.ObjectPool;
2630
import org.apache.commons.pool2.PooledObject;
2731
import org.apache.commons.pool2.PooledObjectFactory;
@@ -120,6 +124,8 @@ public synchronized void destroy() {
120124

121125
private static final class ConnectionWrapper extends SimpleConnection {
122126

127+
private static final Log logger = LogFactory.getLog(ConnectionWrapper.class);
128+
123129
private final ObjectPool<Channel> channels;
124130

125131
private final ObjectPool<Channel> txChannels;
@@ -152,23 +158,55 @@ public Channel createChannel(boolean transactional) {
152158
private Channel createProxy(Channel channel, boolean transacted) {
153159
ProxyFactory pf = new ProxyFactory(channel);
154160
AtomicReference<Channel> proxy = new AtomicReference<>();
161+
AtomicBoolean confirmSelected = new AtomicBoolean();
155162
Advice advice =
156163
(MethodInterceptor) invocation -> {
157-
if (transacted) {
158-
ConnectionWrapper.this.txChannels.returnObject(proxy.get());
159-
}
160-
else {
161-
ConnectionWrapper.this.channels.returnObject(proxy.get());
164+
String method = invocation.getMethod().getName();
165+
switch (method) {
166+
case "close":
167+
return handleClose(channel, transacted, proxy);
168+
case "getTargetChannel":
169+
return channel;
170+
case "isTransactional":
171+
return transacted;
172+
case "confirmSelect":
173+
confirmSelected.set(true);
174+
return channel.confirmSelect();
175+
case "isConfirmSelected":
176+
return confirmSelected.get();
162177
}
163178
return null;
164179
};
165180
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
166181
advisor.addMethodName("close");
182+
advisor.addMethodName("getTargetChannel");
183+
advisor.addMethodName("isTransactional");
184+
advisor.addMethodName("confirmSelect");
185+
advisor.addMethodName("isConfirmSelected");
167186
pf.addAdvisor(advisor);
187+
pf.addInterface(ChannelProxy.class);
168188
proxy.set((Channel) pf.getProxy());
169189
return proxy.get();
170190
}
171191

192+
private Object handleClose(Channel channel, boolean transacted, AtomicReference<Channel> proxy)
193+
throws Exception {
194+
195+
if (!RabbitUtils.isPhysicalCloseRequired()) {
196+
if (transacted) {
197+
ConnectionWrapper.this.txChannels.returnObject(proxy.get());
198+
}
199+
else {
200+
ConnectionWrapper.this.channels.returnObject(proxy.get());
201+
}
202+
return null;
203+
}
204+
else {
205+
physicalClose(channel);
206+
}
207+
return null;
208+
}
209+
172210
@Override
173211
public void close() {
174212
}
@@ -179,11 +217,23 @@ void forceClose() {
179217
this.txChannels.close();
180218
}
181219

220+
private void physicalClose(Channel channel) {
221+
RabbitUtils.clearPhysicalCloseRequired();
222+
if (channel.isOpen()) {
223+
try {
224+
channel.close();
225+
}
226+
catch (IOException | TimeoutException e) {
227+
logger.debug("Error on close", e);
228+
}
229+
}
230+
}
231+
182232
private class ChannelFactory implements PooledObjectFactory<Channel> {
183233

184234
@Override
185235
public PooledObject<Channel> makeObject() {
186-
Channel channel = ConnectionWrapper.super.createChannel(false);
236+
Channel channel = createProxy(ConnectionWrapper.super.createChannel(false), false);
187237
if (ConnectionWrapper.this.simplePublisherConfirms) {
188238
try {
189239
channel.confirmSelect();
@@ -192,7 +242,7 @@ public PooledObject<Channel> makeObject() {
192242
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
193243
}
194244
}
195-
return new DefaultPooledObject<>(createProxy(channel, false));
245+
return new DefaultPooledObject<>(channel);
196246
}
197247

198248
@Override
@@ -219,14 +269,14 @@ private final class TxChannelFactory extends ChannelFactory {
219269

220270
@Override
221271
public PooledObject<Channel> makeObject() {
222-
Channel channel = ConnectionWrapper.super.createChannel(true);
272+
Channel channel = createProxy(ConnectionWrapper.super.createChannel(true), true);
223273
try {
224274
channel.txSelect();
225275
}
226276
catch (IOException e) {
227277
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
228278
}
229-
return new DefaultPooledObject<>(createProxy(channel, true));
279+
return new DefaultPooledObject<>(channel);
230280
}
231281

232282
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -229,12 +229,18 @@ public static void setPhysicalCloseRequired(Channel channel, boolean b) {
229229
public static boolean isPhysicalCloseRequired() {
230230
Boolean mustClose = physicalCloseRequired.get();
231231
if (mustClose == null) {
232-
mustClose = Boolean.FALSE;
232+
return false;
233233
}
234234
else {
235-
physicalCloseRequired.remove();
235+
return mustClose;
236236
}
237-
return mustClose;
237+
}
238+
239+
/**
240+
* Clear the physicalCloseRequired flag.
241+
*/
242+
public static void clearPhysicalCloseRequired() {
243+
physicalCloseRequired.remove();
238244
}
239245

240246
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import java.io.IOException;
2020
import java.util.concurrent.TimeoutException;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import org.aopalliance.aop.Advice;
2324
import org.aopalliance.intercept.MethodInterceptor;
25+
import org.aopalliance.intercept.MethodInvocation;
2426

2527
import org.springframework.amqp.AmqpException;
2628
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
@@ -125,15 +127,14 @@ private final class ConnectionWrapper extends SimpleConnection {
125127
public Channel createChannel(boolean transactional) {
126128
Channel channel = transactional ? this.txChannels.get() : this.channels.get();
127129
if (channel == null || !channel.isOpen()) {
128-
channel = super.createChannel(transactional);
130+
channel = createProxy(super.createChannel(transactional), transactional);
129131
if (transactional) {
130132
try {
131133
channel.txSelect();
132134
}
133135
catch (IOException e) {
134136
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
135137
}
136-
channel = createProxy(channel);
137138
this.txChannels.set(channel);
138139
}
139140
else {
@@ -145,30 +146,65 @@ public Channel createChannel(boolean transactional) {
145146
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
146147
}
147148
}
148-
channel = createProxy(channel);
149149
this.channels.set(channel);
150150
}
151151
}
152152
return channel;
153153
}
154154

155-
private Channel createProxy(Channel channel) {
155+
private Channel createProxy(Channel channel, boolean transactional) {
156156
ProxyFactory pf = new ProxyFactory(channel);
157+
AtomicBoolean confirmSelected = new AtomicBoolean();
157158
Advice advice =
158159
(MethodInterceptor) invocation -> {
159-
if (ConnectionWrapper.this.channels.get() == null) {
160-
return invocation.proceed();
161-
}
162-
else {
163-
return null;
160+
String method = invocation.getMethod().getName();
161+
switch (method) {
162+
case "close":
163+
return handleClose(channel, transactional, invocation);
164+
case "getTargetChannel":
165+
return channel;
166+
case "isTransactional":
167+
return transactional;
168+
case "confirmSelect":
169+
confirmSelected.set(true);
170+
return channel.confirmSelect();
171+
case "isConfirmSelected":
172+
return confirmSelected.get();
164173
}
174+
return null;
165175
};
166176
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
167177
advisor.addMethodName("close");
178+
advisor.addMethodName("getTargetChannel");
179+
advisor.addMethodName("isTransactional");
180+
advisor.addMethodName("confirmSelect");
181+
advisor.addMethodName("isConfirmSelected");
168182
pf.addAdvisor(advisor);
183+
pf.addInterface(ChannelProxy.class);
169184
return (Channel) pf.getProxy();
170185
}
171186

187+
private Object handleClose(Channel channel, boolean transactional, MethodInvocation invocation)
188+
throws Throwable {
189+
190+
if (ConnectionWrapper.this.channels.get() == null) {
191+
return invocation.proceed();
192+
}
193+
else {
194+
if (RabbitUtils.isPhysicalCloseRequired()) {
195+
physicalClose(channel);
196+
if (transactional) {
197+
this.txChannels.remove();
198+
}
199+
else {
200+
this.channels.remove();
201+
}
202+
RabbitUtils.clearPhysicalCloseRequired();
203+
}
204+
return null;
205+
}
206+
}
207+
172208
@Override
173209
public void close() {
174210
}
@@ -183,13 +219,17 @@ private void doClose(ThreadLocal<Channel> channelsTL) {
183219
Channel channel = channelsTL.get();
184220
if (channel != null) {
185221
channelsTL.remove();
186-
if (channel.isOpen()) {
187-
try {
188-
channel.close();
189-
}
190-
catch (IOException | TimeoutException e) {
191-
logger.debug("Error on close", e);
192-
}
222+
physicalClose(channel);
223+
}
224+
}
225+
226+
private void physicalClose(Channel channel) {
227+
if (channel.isOpen()) {
228+
try {
229+
channel.close();
230+
}
231+
catch (IOException | TimeoutException e) {
232+
logger.debug("Error on close", e);
193233
}
194234
}
195235
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactoryTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@ void testBasic() throws Exception {
5050
nonTxConfiged.set(true);
5151
}
5252
});
53+
pcf.setSimplePublisherConfirms(true);
5354
Connection conn = pcf.createConnection();
5455
assertThat(txConfiged.get()).isTrue();
5556
assertThat(nonTxConfiged.get()).isTrue();
5657
Channel chann1 = conn.createChannel(false);
58+
assertThat(((ChannelProxy) chann1).isConfirmSelected()).isTrue();
5759
chann1.close();
5860
Channel chann2 = conn.createChannel(false);
5961
assertThat(chann2).isSameAs(chann1);
@@ -63,6 +65,11 @@ void testBasic() throws Exception {
6365
chann1.close();
6466
chann2 = conn.createChannel(true);
6567
assertThat(chann2).isSameAs(chann1);
68+
RabbitUtils.setPhysicalCloseRequired(chann2, true);
69+
chann2.close();
70+
chann1 = conn.createChannel(true);
71+
assertThat(chann1).isNotSameAs(chann2);
72+
chann1.close();
6673
pcf.destroy();
6774
}
6875

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactoryTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ void testBasic() throws Exception {
6666
.isTrue();
6767
chann2.close();
6868
chann2 = conn.createChannel(false);
69+
RabbitUtils.setPhysicalCloseRequired(chann2, true);
6970
chann2.close();
71+
scf.setSimplePublisherConfirms(true);
72+
chann1 = conn.createChannel(false);
73+
assertThat(chann1).isNotSameAs(chann2);
74+
assertThat(((ChannelProxy) chann1).isConfirmSelected()).isTrue();
75+
chann1.close();
7076
scf.destroy();
7177
assertThat(((Channel) TestUtils.getPropertyValue(conn, "channels", ThreadLocal.class).get()).isOpen())
7278
.isFalse();

0 commit comments

Comments
 (0)