Skip to content

Commit 3b8fc2a

Browse files
garyrussellartembilan
authored andcommitted
GH-1341: Move Tx Synch Cleanup to a finally block
Resolves #1341 For more context see the issue description. If commit or rollback fails during synchronization `afterCompletion` cleanup code was not called. **cherry-pick to 2.2.x**
1 parent c5ee02d commit 3b8fc2a

File tree

3 files changed

+191
-10
lines changed

3 files changed

+191
-10
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -330,17 +330,20 @@ protected boolean shouldReleaseBeforeCompletion() {
330330

331331
@Override
332332
public void afterCompletion(int status) {
333-
if (status == TransactionSynchronization.STATUS_COMMITTED) {
334-
this.resourceHolder.commitAll();
335-
}
336-
else {
337-
this.resourceHolder.rollbackAll();
333+
try {
334+
if (status == TransactionSynchronization.STATUS_COMMITTED) {
335+
this.resourceHolder.commitAll();
336+
}
337+
else {
338+
this.resourceHolder.rollbackAll();
339+
}
338340
}
339-
340-
if (this.resourceHolder.isReleaseAfterCompletion()) {
341-
this.resourceHolder.setSynchronizedWithTransaction(false);
341+
finally {
342+
if (this.resourceHolder.isReleaseAfterCompletion()) {
343+
this.resourceHolder.setSynchronizedWithTransaction(false);
344+
}
345+
super.afterCompletion(status);
342346
}
343-
super.afterCompletion(status);
344347
}
345348

346349
@Override

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
3333
import static org.mockito.Mockito.times;
3434
import static org.mockito.Mockito.verify;
3535

36+
import java.io.IOException;
3637
import java.util.Collection;
3738
import java.util.Collections;
3839
import java.util.HashMap;
@@ -41,6 +42,7 @@
4142
import java.util.concurrent.ExecutorService;
4243
import java.util.concurrent.Executors;
4344
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
4446
import java.util.concurrent.atomic.AtomicBoolean;
4547
import java.util.concurrent.atomic.AtomicInteger;
4648
import java.util.concurrent.atomic.AtomicReference;
@@ -67,6 +69,7 @@
6769
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
6870
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
6971
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnsCallback;
72+
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
7073
import org.springframework.amqp.support.converter.SimpleMessageConverter;
7174
import org.springframework.amqp.utils.SerializationUtils;
7275
import org.springframework.amqp.utils.test.TestUtils;
@@ -79,9 +82,11 @@
7982
import org.springframework.transaction.TransactionException;
8083
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
8184
import org.springframework.transaction.support.DefaultTransactionStatus;
85+
import org.springframework.transaction.support.TransactionSynchronizationManager;
8286
import org.springframework.transaction.support.TransactionTemplate;
8387

8488
import com.rabbitmq.client.AMQP;
89+
import com.rabbitmq.client.AMQP.Tx.SelectOk;
8590
import com.rabbitmq.client.AuthenticationFailureException;
8691
import com.rabbitmq.client.Channel;
8792
import com.rabbitmq.client.Connection;
@@ -581,6 +586,60 @@ public void testReturnsFallback() {
581586
template2.setReturnCallback(callback);
582587
}
583588

589+
@Test
590+
void resourcesClearedAfterTxFails() throws IOException, TimeoutException {
591+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
592+
Connection mockConnection = mock(Connection.class);
593+
Channel mockChannel = mock(Channel.class);
594+
595+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
596+
given(mockConnection.isOpen()).willReturn(true);
597+
given(mockConnection.createChannel()).willReturn(mockChannel);
598+
given(mockChannel.txSelect()).willReturn(mock(SelectOk.class));
599+
given(mockChannel.txCommit()).willThrow(IllegalStateException.class);
600+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
601+
connectionFactory.setExecutor(mock(ExecutorService.class));
602+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
603+
template.setChannelTransacted(true);
604+
TransactionTemplate tt = new TransactionTemplate(new RabbitTransactionManager(connectionFactory));
605+
assertThatIllegalStateException()
606+
.isThrownBy(() ->
607+
tt.execute(status -> {
608+
template.convertAndSend("foo", "bar");
609+
return null;
610+
}));
611+
assertThat(TransactionSynchronizationManager.hasResource(connectionFactory)).isFalse();
612+
assertThatIllegalStateException()
613+
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
614+
.withMessage("Transaction synchronization is not active");
615+
}
616+
617+
@Test
618+
void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException {
619+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
620+
Connection mockConnection = mock(Connection.class);
621+
Channel mockChannel = mock(Channel.class);
622+
623+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
624+
given(mockConnection.isOpen()).willReturn(true);
625+
given(mockConnection.createChannel()).willReturn(mockChannel);
626+
given(mockChannel.txSelect()).willReturn(mock(SelectOk.class));
627+
given(mockChannel.txCommit()).willThrow(IllegalStateException.class);
628+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
629+
connectionFactory.setExecutor(mock(ExecutorService.class));
630+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
631+
template.setChannelTransacted(true);
632+
TransactionTemplate tt = new TransactionTemplate(new TestTransactionManager());
633+
tt.execute(status -> {
634+
template.convertAndSend("foo", "bar");
635+
return null;
636+
});
637+
assertThat(TransactionSynchronizationManager.hasResource(connectionFactory)).isFalse();
638+
assertThatIllegalStateException()
639+
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
640+
.withMessage("Transaction synchronization is not active");
641+
}
642+
584643
@SuppressWarnings("serial")
585644
private class TestTransactionManager extends AbstractPlatformTransactionManager {
586645

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyBoolean;
23+
import static org.mockito.ArgumentMatchers.anyString;
24+
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.Mockito.mock;
27+
28+
import java.io.IOException;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.TimeoutException;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
37+
import org.junit.jupiter.api.Test;
38+
39+
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
40+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
41+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
42+
import org.springframework.transaction.support.TransactionSynchronizationManager;
43+
44+
import com.rabbitmq.client.AMQP;
45+
import com.rabbitmq.client.AMQP.Tx.SelectOk;
46+
import com.rabbitmq.client.Channel;
47+
import com.rabbitmq.client.Connection;
48+
import com.rabbitmq.client.ConnectionFactory;
49+
import com.rabbitmq.client.Consumer;
50+
import com.rabbitmq.client.Envelope;
51+
52+
/**
53+
* @author Gary Russell
54+
* @since 2.2.18
55+
*
56+
*/
57+
public class MessageListenerContainerTxSynchTests {
58+
59+
@Test
60+
void resourcesClearedAfterTxFails() throws IOException, TimeoutException, InterruptedException, ExecutionException {
61+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
62+
Connection mockConnection = mock(Connection.class);
63+
Channel mockChannel = mock(Channel.class);
64+
65+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
66+
given(mockConnection.isOpen()).willReturn(true);
67+
given(mockConnection.createChannel()).willReturn(mockChannel);
68+
given(mockChannel.txSelect()).willReturn(mock(SelectOk.class));
69+
given(mockChannel.txCommit()).willThrow(IllegalStateException.class);
70+
AtomicReference<Consumer> consumer = new AtomicReference<>();
71+
AtomicReference<String> tag = new AtomicReference<>();
72+
CountDownLatch latch1 = new CountDownLatch(1);
73+
willAnswer(inv -> {
74+
consumer.set(inv.getArgument(6));
75+
consumer.get().handleConsumeOk(inv.getArgument(2));
76+
tag.set(inv.getArgument(2));
77+
latch1.countDown();
78+
return null;
79+
}).given(mockChannel).basicConsume(any(), anyBoolean(), any(), anyBoolean(), anyBoolean(), any(), any());
80+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
81+
connectionFactory.setExecutor(mock(ExecutorService.class));
82+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
83+
template.setChannelTransacted(true);
84+
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
85+
exec.initialize();
86+
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer();
87+
mlc.setConnectionFactory(connectionFactory);
88+
mlc.setQueueNames("foo");
89+
mlc.setTaskExecutor(exec);
90+
mlc.setChannelTransacted(true);
91+
CountDownLatch latch2 = new CountDownLatch(1);
92+
mlc.setMessageListener(msg -> {
93+
template.convertAndSend("foo", "bar");
94+
latch2.countDown();
95+
});
96+
mlc.start();
97+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
98+
consumer.get().handleDelivery(tag.get(), new Envelope(1, false, "", ""), new AMQP.BasicProperties(),
99+
new byte[0]);
100+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
101+
mlc.stop();
102+
Future<Exception> exception = exec.submit(() -> {
103+
try {
104+
// verify no lingering resources bound to the executor's single thread
105+
assertThat(TransactionSynchronizationManager.hasResource(connectionFactory)).isFalse();
106+
assertThatIllegalStateException()
107+
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
108+
.withMessage("Transaction synchronization is not active");
109+
return null;
110+
}
111+
catch (Exception e) {
112+
return e;
113+
}
114+
});
115+
assertThat(exception.get(10, TimeUnit.SECONDS)).isNull();
116+
exec.shutdown();
117+
}
118+
119+
}

0 commit comments

Comments
 (0)