Skip to content

Commit 93a761c

Browse files
committed
Improve TransactionTemplate usage
* Replace `TransactionCallbackWithoutResult` with the `TransactionTemplate.executeWithoutResult()` * Also use `executeWithoutResult()` whenever we return `null` from the callback
1 parent e7bdb7e commit 93a761c

File tree

6 files changed

+53
-80
lines changed

6 files changed

+53
-80
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,7 @@ private void executeListenerInTransaction(Object data, long deliveryTag) {
11801180
this.transactionTemplate =
11811181
new TransactionTemplate(this.transactionManager, this.transactionAttribute);
11821182
}
1183-
this.transactionTemplate.execute(s -> {
1183+
this.transactionTemplate.executeWithoutResult((status) -> {
11841184
RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
11851185
new RabbitResourceHolder(getChannel(), false), this.connectionFactory, true);
11861186
if (resourceHolder != null) {
@@ -1197,7 +1197,6 @@ private void executeListenerInTransaction(Object data, long deliveryTag) {
11971197
catch (Throwable e2) { //NOSONAR ok to catch Throwable here because we re-throw it below
11981198
throw new WrappedTransactionException(e2);
11991199
}
1200-
return null;
12011200
});
12021201
}
12031202

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,6 @@ private void restart(BlockingQueueConsumer oldConsumer) {
10231023
}
10241024
}
10251025

1026-
@SuppressWarnings("NullAway") // Dataflow analysis limitation
10271026
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
10281027

10291028
PlatformTransactionManager transactionManager = getTransactionManager();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.rabbitmq.client.impl.AMQImpl;
5050
import org.apache.commons.logging.Log;
5151
import org.apache.commons.logging.LogFactory;
52+
import org.jetbrains.annotations.Nullable;
5253
import org.junit.jupiter.api.AfterEach;
5354
import org.junit.jupiter.api.BeforeEach;
5455
import org.junit.jupiter.api.Disabled;
@@ -106,10 +107,8 @@
106107
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
107108
import org.springframework.transaction.TransactionDefinition;
108109
import org.springframework.transaction.TransactionException;
109-
import org.springframework.transaction.TransactionStatus;
110110
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
111111
import org.springframework.transaction.support.DefaultTransactionStatus;
112-
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
113112
import org.springframework.transaction.support.TransactionSynchronization;
114113
import org.springframework.transaction.support.TransactionSynchronizationManager;
115114
import org.springframework.transaction.support.TransactionSynchronizationUtils;
@@ -628,20 +627,18 @@ public void testReceiveInExternalTransactionWithNoRollback() throws Exception {
628627
}
629628

630629
@Test
631-
public void testSendInExternalTransaction() throws Exception {
630+
public void testSendInExternalTransaction() {
632631
template.setChannelTransacted(true);
633-
new TransactionTemplate(new TestTransactionManager()).execute(status -> {
634-
template.convertAndSend(ROUTE, "message");
635-
return null;
636-
});
632+
new TransactionTemplate(new TestTransactionManager())
633+
.executeWithoutResult(status -> template.convertAndSend(ROUTE, "message"));
637634
String result = (String) template.receiveAndConvert(ROUTE);
638635
assertThat(result).isEqualTo("message");
639636
result = (String) template.receiveAndConvert(ROUTE);
640637
assertThat(result).isEqualTo(null);
641638
}
642639

643640
@Test
644-
public void testSendInExternalTransactionWithRollback() throws Exception {
641+
public void testSendInExternalTransactionWithRollback() {
645642
template.setChannelTransacted(true);
646643
try {
647644
new TransactionTemplate(new TestTransactionManager()).execute(status -> {
@@ -1160,32 +1157,29 @@ private void testReceiveAndReply(long timeout) throws Exception {
11601157

11611158
this.template.convertAndSend(ROUTE, "TEST");
11621159
this.template.setReceiveTimeout(timeout);
1163-
result = new TransactionTemplate(new TestTransactionManager()).execute(status -> {
1164-
final AtomicReference<String> payloadReference = new AtomicReference<String>();
1165-
boolean received1 = template.receiveAndReply((ReceiveAndReplyCallback<String, Void>) payload -> {
1166-
payloadReference.set(payload);
1167-
return null;
1168-
});
1169-
assertThat(received1).isTrue();
1170-
return payloadReference.get();
1171-
});
1160+
result = new TransactionTemplate(new TestTransactionManager())
1161+
.execute(status -> {
1162+
AtomicReference<@Nullable String> payloadReference = new AtomicReference<>();
1163+
boolean received1 = template.<String, Object>receiveAndReply(payload -> {
1164+
payloadReference.set(payload);
1165+
return null;
1166+
});
1167+
assertThat(received1).isTrue();
1168+
return payloadReference.get();
1169+
});
11721170
assertThat(result).isEqualTo("TEST");
11731171
this.template.setReceiveTimeout(0);
11741172
assertThat(this.template.receive(ROUTE)).isEqualTo(null);
11751173

11761174
this.template.convertAndSend(ROUTE, "TEST");
11771175
this.template.setReceiveTimeout(timeout);
11781176
try {
1179-
new TransactionTemplate(new TestTransactionManager()).execute(new TransactionCallbackWithoutResult() {
1180-
1181-
@Override
1182-
public void doInTransactionWithoutResult(TransactionStatus status) {
1183-
template.receiveAndReply((ReceiveAndReplyMessageCallback) message -> message,
1184-
(request, reply) -> {
1185-
throw new PlannedException();
1186-
});
1187-
}
1188-
});
1177+
new TransactionTemplate(new TestTransactionManager())
1178+
.executeWithoutResult((status) ->
1179+
template.receiveAndReply((ReceiveAndReplyMessageCallback) message -> message,
1180+
(request, reply) -> {
1181+
throw new PlannedException();
1182+
}));
11891183
fail("Expected PlannedException");
11901184
}
11911185
catch (Exception e) {
@@ -1513,26 +1507,24 @@ public void testSendInGlobalTransactionRollback() throws Exception {
15131507
assertThat(template.receive(ROUTE)).isNull();
15141508
}
15151509

1516-
private void testSendInGlobalTransactionGuts(final boolean rollback) throws Exception {
1510+
private void testSendInGlobalTransactionGuts(final boolean rollback) {
15171511
template.setChannelTransacted(true);
1518-
new TransactionTemplate(new TestTransactionManager()).execute(status -> {
1512+
new TransactionTemplate(new TestTransactionManager())
1513+
.executeWithoutResult(status -> {
1514+
template.convertAndSend(ROUTE, "message");
15191515

1520-
template.convertAndSend(ROUTE, "message");
1516+
if (rollback) {
1517+
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
15211518

1522-
if (rollback) {
1523-
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
1519+
@Override
1520+
public void afterCommit() {
1521+
TransactionSynchronizationUtils
1522+
.triggerAfterCompletion(TransactionSynchronization.STATUS_ROLLED_BACK);
1523+
}
15241524

1525-
@Override
1526-
public void afterCommit() {
1527-
TransactionSynchronizationUtils
1528-
.triggerAfterCompletion(TransactionSynchronization.STATUS_ROLLED_BACK);
1525+
});
15291526
}
1530-
15311527
});
1532-
}
1533-
1534-
return null;
1535-
});
15361528
}
15371529

15381530
@Test

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,8 @@ public void testSendAndReceiveExternalTransacted() throws InterruptedException {
109109
List<String> results = Collections.synchronizedList(new ArrayList<>());
110110
template.setChannelTransacted(true);
111111
Stream.of(1, 2, 3, 4).forEach(i -> exec.execute(() -> {
112-
new TransactionTemplate(new TestTransactionManager()).execute(status -> {
113-
template.convertAndSend(ROUTE, "message");
114-
return null;
115-
});
112+
new TransactionTemplate(new TestTransactionManager())
113+
.executeWithoutResult(status -> template.convertAndSend(ROUTE, "message"));
116114
template.convertAndSend(ROUTE, "message");
117115
results.add((String) template.receiveAndConvert(ROUTE, 10_000L));
118116
latch.countDown();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,8 @@ public void returnConnectionAfterCommit() throws Exception {
129129
final RabbitTemplate template = new RabbitTemplate(connectionFactory);
130130
template.setChannelTransacted(true);
131131

132-
txTemplate.execute(status -> {
133-
template.convertAndSend("foo", "bar");
134-
return null;
135-
});
136-
txTemplate.execute(status -> {
137-
template.convertAndSend("baz", "qux");
138-
return null;
139-
});
132+
txTemplate.executeWithoutResult(status -> template.convertAndSend("foo", "bar"));
133+
txTemplate.executeWithoutResult(status -> template.convertAndSend("baz", "qux"));
140134
verify(mockConnectionFactory, Mockito.times(1)).newConnection(any(ExecutorService.class), anyString());
141135
// ensure we used the same channel
142136
verify(mockConnection, times(1)).createChannel();
@@ -424,12 +418,12 @@ public void testNestedTxBinding() throws Exception {
424418
admin.setApplicationContext(ac);
425419
admin.afterPropertiesSet();
426420
AtomicReference<Channel> templateChannel = new AtomicReference<>();
427-
new TransactionTemplate(new TestTransactionManager()).execute(s -> {
428-
return rabbitTemplate.execute(c -> {
429-
templateChannel.set(((ChannelProxy) c).getTargetChannel());
430-
return true;
431-
});
432-
});
421+
new TransactionTemplate(new TestTransactionManager())
422+
.execute(s ->
423+
rabbitTemplate.execute(c -> {
424+
templateChannel.set(((ChannelProxy) c).getTargetChannel());
425+
return true;
426+
}));
433427
verify(channel1).txSelect();
434428
verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), anyMap());
435429
assertThat(templateChannel.get()).isEqualTo(channel1);
@@ -606,10 +600,7 @@ void resourcesClearedAfterTxFails() throws IOException, TimeoutException {
606600
TransactionTemplate tt = new TransactionTemplate(new RabbitTransactionManager(connectionFactory));
607601
assertThatIllegalStateException()
608602
.isThrownBy(() ->
609-
tt.execute(status -> {
610-
template.convertAndSend("foo", "bar");
611-
return null;
612-
}));
603+
tt.executeWithoutResult(status -> template.convertAndSend("foo", "bar")));
613604
assertThat(TransactionSynchronizationManager.hasResource(connectionFactory)).isFalse();
614605
assertThatIllegalStateException()
615606
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
@@ -633,16 +624,13 @@ void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException
633624
RabbitTemplate template = new RabbitTemplate(connectionFactory);
634625
template.setChannelTransacted(true);
635626
TransactionTemplate tt = new TransactionTemplate(new TestTransactionManager());
636-
tt.execute(status -> {
637-
template.convertAndSend("foo", "bar");
638-
return null;
639-
});
627+
tt.executeWithoutResult(status -> template.convertAndSend("foo", "bar"));
640628
assertThat(TransactionSynchronizationManager.hasResource(connectionFactory)).isFalse();
641629
assertThatIllegalStateException()
642630
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
643631
.withMessage("Transaction synchronization is not active");
644632
assertThatExceptionOfType(AfterCompletionFailedException.class)
645-
.isThrownBy(() -> ConnectionFactoryUtils.checkAfterCompletion());
633+
.isThrownBy(ConnectionFactoryUtils::checkAfterCompletion);
646634
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(false);
647635
}
648636

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/transaction/RabbitTransactionManagerIntegrationTests.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void cleanup() throws Exception {
6464
}
6565

6666
@Test
67-
public void testSendAndReceiveInTransaction() throws Exception {
67+
public void testSendAndReceiveInTransaction() {
6868
String result = transactionTemplate.execute(status -> {
6969
template.convertAndSend(ROUTE, "message");
7070
return (String) template.receiveAndConvert(ROUTE);
@@ -75,7 +75,7 @@ public void testSendAndReceiveInTransaction() throws Exception {
7575
}
7676

7777
@Test
78-
public void testReceiveInTransaction() throws Exception {
78+
public void testReceiveInTransaction() {
7979
template.convertAndSend(ROUTE, "message");
8080
String result = transactionTemplate.execute(status -> (String) template.receiveAndConvert(ROUTE));
8181
assertThat(result).isEqualTo("message");
@@ -84,7 +84,7 @@ public void testReceiveInTransaction() throws Exception {
8484
}
8585

8686
@Test
87-
public void testReceiveInTransactionWithRollback() throws Exception {
87+
public void testReceiveInTransactionWithRollback() {
8888
// Makes receive (and send in principle) transactional
8989
template.setChannelTransacted(true);
9090
template.convertAndSend(ROUTE, "message");
@@ -100,20 +100,17 @@ public void testReceiveInTransactionWithRollback() throws Exception {
100100
}
101101

102102
@Test
103-
public void testSendInTransaction() throws Exception {
103+
public void testSendInTransaction() {
104104
template.setChannelTransacted(true);
105-
transactionTemplate.execute(status -> {
106-
template.convertAndSend(ROUTE, "message");
107-
return null;
108-
});
105+
transactionTemplate.executeWithoutResult(status -> template.convertAndSend(ROUTE, "message"));
109106
String result = (String) template.receiveAndConvert(ROUTE);
110107
assertThat(result).isEqualTo("message");
111108
result = (String) template.receiveAndConvert(ROUTE);
112109
assertThat(result).isEqualTo(null);
113110
}
114111

115112
@Test
116-
public void testSendInTransactionWithRollback() throws Exception {
113+
public void testSendInTransactionWithRollback() {
117114
template.setChannelTransacted(true);
118115
assertThatExceptionOfType(PlannedException.class)
119116
.isThrownBy(() -> transactionTemplate.execute(status -> {

0 commit comments

Comments
 (0)