Skip to content

Commit c746497

Browse files
garyrussellartembilan
authored andcommitted
GH-1091: Fix DMLC with Routing Connection Factory
Fixes #1091 The DMLC can open a connection on the calling thread (e.g. `start()`, `setConsumersPerQueue()`). When using a routing connection factory, the factory key is temporarily bound to the thread for proper CF selection in connection listeners (e.g. `RabbitAdmin`). This is disallowed if the calling thread is, itself, a listener container thread. Add `push/pop` operations to `SimpleResourceHolder`. **cherry-pick to 2.1.x**
1 parent 3f65fc0 commit c746497

File tree

3 files changed

+134
-28
lines changed

3 files changed

+134
-28
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleResourceHolder.java

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import java.util.Collections;
20+
import java.util.Deque;
2021
import java.util.HashMap;
22+
import java.util.LinkedList;
2123
import java.util.Map;
2224

2325
import org.apache.commons.logging.Log;
@@ -28,15 +30,18 @@
2830
import org.springframework.util.Assert;
2931

3032
/**
31-
* Central helper that manages resources per thread to be used by resource management code.
32-
*
33-
* <p>Supports one resource per key without overwriting, that is, a resource needs
34-
* to be removed before a new one can be set for the same key.
35-
*
36-
* <p>Resource management code should check for thread-bound resources via {@link #has(Object)}.
37-
*
38-
* <p>This helper isn't designed for transaction synchronization cases.
39-
* Use {@code TransactionSynchronizationManager} and {@code ResourceHolder} instead.
33+
* Central helper that manages resources per thread to be used by resource management
34+
* code.
35+
* <p>
36+
* {@link #bind(Object, Object)} supports one resource per key without overwriting, that
37+
* is, a resource needs to be removed before a new one can be set for the same key. But
38+
* see {@link #push(Object, Object)} and {@link #pop(Object)}.
39+
* <p>
40+
* Resource management code should check for thread-bound resources via
41+
* {@link #has(Object)}.
42+
* <p>
43+
* This helper isn't designed for transaction synchronization cases. Use
44+
* {@code TransactionSynchronizationManager} and {@code ResourceHolder} instead.
4045
*
4146
* @author Artem Bilan
4247
* @author Gary Russell
@@ -48,11 +53,14 @@ public final class SimpleResourceHolder {
4853

4954
private static final String BOUND_TO_THREAD = "] bound to thread [";
5055

51-
private static final Log logger = LogFactory.getLog(SimpleResourceHolder.class); // NOSONAR lower case
56+
private static final Log LOGGER = LogFactory.getLog(SimpleResourceHolder.class);
5257

53-
private static final ThreadLocal<Map<Object, Object>> resources = // NOSONAR lower case
58+
private static final ThreadLocal<Map<Object, Object>> RESOURCES =
5459
new NamedThreadLocal<Map<Object, Object>>("Simple resources");
5560

61+
private static final ThreadLocal<Map<Object, Deque<Object>>> STACK =
62+
new NamedThreadLocal<Map<Object, Deque<Object>>>("Simple resources");
63+
5664
/**
5765
* Return all resources that are bound to the current thread.
5866
* <p>Mainly for debugging purposes. Resource managers should always invoke
@@ -63,7 +71,7 @@ public final class SimpleResourceHolder {
6371
* @see #has
6472
*/
6573
public static Map<Object, Object> getResources() {
66-
Map<Object, Object> map = resources.get();
74+
Map<Object, Object> map = RESOURCES.get();
6775
return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());
6876
}
6977

@@ -86,8 +94,8 @@ public static boolean has(Object key) {
8694
@Nullable
8795
public static Object get(Object key) {
8896
Object value = doGet(key);
89-
if (value != null && logger.isTraceEnabled()) {
90-
logger.trace("Retrieved value [" + value + FOR_KEY + key + BOUND_TO_THREAD
97+
if (value != null && LOGGER.isTraceEnabled()) {
98+
LOGGER.trace("Retrieved value [" + value + FOR_KEY + key + BOUND_TO_THREAD
9199
+ Thread.currentThread().getName() + "]");
92100
}
93101
return value;
@@ -100,7 +108,7 @@ public static Object get(Object key) {
100108
*/
101109
@Nullable
102110
private static Object doGet(Object actualKey) {
103-
Map<Object, Object> map = resources.get();
111+
Map<Object, Object> map = RESOURCES.get();
104112
if (map == null) {
105113
return null;
106114
}
@@ -115,22 +123,71 @@ private static Object doGet(Object actualKey) {
115123
*/
116124
public static void bind(Object key, Object value) {
117125
Assert.notNull(value, "Value must not be null");
118-
Map<Object, Object> map = resources.get();
126+
Map<Object, Object> map = RESOURCES.get();
119127
// set ThreadLocal Map if none found
120128
if (map == null) {
121129
map = new HashMap<Object, Object>();
122-
resources.set(map);
130+
RESOURCES.set(map);
123131
}
124132
Object oldValue = map.put(key, value);
125133
Assert.isNull(oldValue, () -> "Already value [" + oldValue + FOR_KEY + key + BOUND_TO_THREAD
126134
+ Thread.currentThread().getName() + "]");
127135

128-
if (logger.isTraceEnabled()) {
129-
logger.trace(
136+
if (LOGGER.isTraceEnabled()) {
137+
LOGGER.trace(
130138
"Bound value [" + value + FOR_KEY + key + "] to thread [" + Thread.currentThread().getName() + "]");
131139
}
132140
}
133141

142+
/**
143+
* Set the value for this key and push any existing value onto a stack.
144+
* @param key the key.
145+
* @param value the value.
146+
* @since 2.1.11
147+
*/
148+
public static void push(Object key, Object value) {
149+
Object currentValue = get(key);
150+
if (currentValue == null) {
151+
bind(key, value);
152+
}
153+
else {
154+
Map<Object, Deque<Object>> stack = STACK.get();
155+
if (stack == null) {
156+
stack = new HashMap<>();
157+
STACK.set(stack);
158+
}
159+
stack.computeIfAbsent(key, k -> new LinkedList<>());
160+
stack.get(key).push(currentValue);
161+
unbind(key);
162+
bind(key, value);
163+
}
164+
}
165+
166+
/**
167+
* Unbind the current value and bind the head of the stack if present.
168+
* @param key the key.
169+
* @return the popped value.
170+
* @since 2.1.11
171+
*/
172+
@Nullable
173+
public static Object pop(Object key) {
174+
Object popped = unbind(key);
175+
Map<Object, Deque<Object>> stack = STACK.get();
176+
if (stack != null) {
177+
Deque<Object> deque = stack.get(key);
178+
if (deque != null && deque.size() > 0) {
179+
Object previousValue = deque.pop();
180+
if (previousValue != null) {
181+
bind(key, previousValue);
182+
}
183+
if (deque.isEmpty()) {
184+
STACK.remove();
185+
}
186+
}
187+
}
188+
return popped;
189+
}
190+
134191
/**
135192
* Unbind a resource for the given key from the current thread.
136193
* @param key the key to unbind (usually the resource factory)
@@ -151,18 +208,18 @@ public static Object unbind(Object key) throws IllegalStateException {
151208
*/
152209
@Nullable
153210
public static Object unbindIfPossible(Object key) {
154-
Map<Object, Object> map = resources.get();
211+
Map<Object, Object> map = RESOURCES.get();
155212
if (map == null) {
156213
return null;
157214
}
158215
Object value = map.remove(key);
159216
// Remove entire ThreadLocal if empty...
160217
if (map.isEmpty()) {
161-
resources.remove();
218+
RESOURCES.remove();
162219
}
163220

164-
if (value != null && logger.isTraceEnabled()) {
165-
logger.trace("Removed value [" + value + FOR_KEY + key + "] from thread ["
221+
if (value != null && LOGGER.isTraceEnabled()) {
222+
LOGGER.trace("Removed value [" + value + FOR_KEY + key + "] from thread ["
166223
+ Thread.currentThread().getName() + "]");
167224
}
168225
return value;
@@ -172,10 +229,12 @@ public static Object unbindIfPossible(Object key) {
172229
* Clear resources for the current thread.
173230
*/
174231
public static void clear() {
175-
resources.remove();
232+
RESOURCES.remove();
233+
STACK.remove();
176234
}
177235

178236
private SimpleResourceHolder() {
237+
super();
179238
}
180239

181240
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ private void startConsumers(final String[] queueNames) {
584584
protected void doRedeclareElementsIfNecessary() {
585585
String routingLookupKey = getRoutingLookupKey();
586586
if (routingLookupKey != null) {
587-
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null here
587+
SimpleResourceHolder.push(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null here
588588
}
589589
try {
590590
redeclareElementsIfNecessary();
@@ -594,7 +594,7 @@ protected void doRedeclareElementsIfNecessary() {
594594
}
595595
finally {
596596
if (routingLookupKey != null) {
597-
SimpleResourceHolder.unbind(getRoutingConnectionFactory()); // NOSONAR never null here
597+
SimpleResourceHolder.pop(getRoutingConnectionFactory()); // NOSONAR never null here
598598
}
599599
}
600600
}
@@ -660,7 +660,7 @@ private void doConsumeFromQueue(String queue) {
660660
}
661661
String routingLookupKey = getRoutingLookupKey();
662662
if (routingLookupKey != null) {
663-
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null here
663+
SimpleResourceHolder.push(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null here
664664
}
665665
Connection connection = null; // NOSONAR (close)
666666
try {
@@ -675,7 +675,7 @@ private void doConsumeFromQueue(String queue) {
675675
}
676676
finally {
677677
if (routingLookupKey != null) {
678-
SimpleResourceHolder.unbind(getRoutingConnectionFactory()); // NOSONAR never null here
678+
SimpleResourceHolder.pop(getRoutingConnectionFactory()); // NOSONAR never null here
679679
}
680680
}
681681
SimpleConsumer consumer = consume(queue, connection);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/RoutingConnectionFactoryTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.mockito.Mockito;
4242

4343
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
44+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
45+
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
4446
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
4547

4648
import com.rabbitmq.client.Channel;
@@ -276,4 +278,49 @@ protected synchronized void redeclareElementsIfNecessary() {
276278
assertThat(connectionMakerKey2.get()).isEqualTo("xxx[foo]");
277279
}
278280

281+
@Test
282+
public void testWithDRTDMLCAndConnectionListenerExistingRFK() throws Exception {
283+
ConnectionFactory connectionFactory1 = mock(ConnectionFactory.class);
284+
Map<Object, ConnectionFactory> factories = new HashMap<Object, ConnectionFactory>(2);
285+
factories.put("xxx[foo]", connectionFactory1);
286+
factories.put("xxx[amq.rabbitmq.reply-to]", connectionFactory1);
287+
288+
final SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
289+
SimpleResourceHolder.bind(connectionFactory, "foo");
290+
291+
final Connection connection = mock(Connection.class);
292+
Channel channel = mock(Channel.class);
293+
given(channel.isOpen()).willReturn(true);
294+
given(connection.createChannel(anyBoolean())).willReturn(channel);
295+
final AtomicReference<Object> connectionMakerKey = new AtomicReference<>();
296+
final CountDownLatch latch = new CountDownLatch(2); // early connection in Abstract container
297+
willAnswer(i -> {
298+
connectionMakerKey.set(connectionFactory.determineCurrentLookupKey());
299+
latch.countDown();
300+
return connection;
301+
}).given(connectionFactory1).createConnection();
302+
connectionFactory.setTargetConnectionFactories(factories);
303+
304+
final AtomicReference<Object> connectionMakerKey2 = new AtomicReference<>();
305+
DirectReplyToMessageListenerContainer container = new DirectReplyToMessageListenerContainer(connectionFactory) {
306+
307+
@Override
308+
protected synchronized void redeclareElementsIfNecessary() {
309+
connectionMakerKey2.set(connectionFactory.determineCurrentLookupKey());
310+
}
311+
312+
};
313+
container.setLookupKeyQualifier("xxx");
314+
container.setShutdownTimeout(10);
315+
container.afterPropertiesSet();
316+
container.start();
317+
ChannelHolder channelHolder = container.getChannelHolder();
318+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
319+
container.releaseConsumerFor(channelHolder, true, "test");
320+
container.stop();
321+
assertThat(connectionMakerKey.get()).isEqualTo("xxx[amq.rabbitmq.reply-to]");
322+
assertThat(connectionMakerKey2.get()).isEqualTo("xxx[amq.rabbitmq.reply-to]");
323+
assertThat(SimpleResourceHolder.unbind(connectionFactory)).isEqualTo("foo");
324+
}
325+
279326
}

0 commit comments

Comments
 (0)