Skip to content

Commit a7843af

Browse files
GH-3521: Delayer: schedule release task with TX (#3525)
* GH-3521: Delayer: schedule release task with TX Fixes #3521 There is a race condition when transactional `MessageStore` is used for `DelayHandler`, so the message is not visible for reads until after TX is committed, but a scheduled release task may be already ready after delay * Register a `TransactionSynchronization` with scheduling a releasing task when TX is committed **Cherry-pick to `5.4.x` & `5.3.x`** * Fix language in delayer.adoc Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent d016dd5 commit a7843af

File tree

4 files changed

+186
-150
lines changed

4 files changed

+186
-150
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.springframework.messaging.core.DestinationResolver;
5252
import org.springframework.messaging.support.ErrorMessage;
5353
import org.springframework.scheduling.TaskScheduler;
54+
import org.springframework.transaction.support.TransactionSynchronization;
55+
import org.springframework.transaction.support.TransactionSynchronizationManager;
5456
import org.springframework.util.Assert;
5557
import org.springframework.util.CollectionUtils;
5658
import org.springframework.util.ObjectUtils;
@@ -438,7 +440,24 @@ private void releaseMessageAfterDelay(final Message<?> message, long delay) {
438440
};
439441
}
440442

441-
getTaskScheduler().schedule(releaseTask, new Date(messageWrapper.getRequestDate() + delay));
443+
Date startTime = new Date(messageWrapper.getRequestDate() + delay);
444+
445+
if (TransactionSynchronizationManager.isSynchronizationActive() &&
446+
TransactionSynchronizationManager.isActualTransactionActive()) {
447+
448+
TransactionSynchronizationManager.registerSynchronization(
449+
new TransactionSynchronization() {
450+
451+
@Override
452+
public void afterCommit() {
453+
getTaskScheduler().schedule(releaseTask, startTime);
454+
}
455+
456+
});
457+
}
458+
else {
459+
getTaskScheduler().schedule(releaseTask, startTime);
460+
}
442461
}
443462

444463
private Message<?> getMessageById(UUID messageId) {

spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerUsageTests.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,28 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import org.junit.Ignore;
22-
import org.junit.Test;
23-
import org.junit.runner.RunWith;
21+
import org.junit.jupiter.api.Test;
2422

2523
import org.springframework.beans.factory.annotation.Autowired;
2624
import org.springframework.beans.factory.annotation.Qualifier;
2725
import org.springframework.integration.support.MessageBuilder;
26+
import org.springframework.integration.transaction.PseudoTransactionManager;
2827
import org.springframework.messaging.Message;
2928
import org.springframework.messaging.MessageChannel;
3029
import org.springframework.messaging.PollableChannel;
3130
import org.springframework.messaging.support.GenericMessage;
32-
import org.springframework.test.context.ContextConfiguration;
33-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
31+
import org.springframework.test.annotation.DirtiesContext;
32+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
33+
import org.springframework.transaction.support.TransactionTemplate;
3434

3535
/**
3636
* @author Oleg Zhurakousky
3737
* @author Artem Bilan
38+
*
3839
* @since 1.0.3
3940
*/
40-
@RunWith(SpringJUnit4ClassRunner.class)
41-
@ContextConfiguration
41+
@SpringJUnitConfig
42+
@DirtiesContext
4243
public class DelayerUsageTests {
4344

4445
@Autowired
@@ -68,11 +69,17 @@ public class DelayerUsageTests {
6869

6970

7071
@Test
71-
public void testDelayWithDefaultScheduler() {
72+
public void testDelayWithDefaultSchedulerAndTransactionSynchronization() {
7273
long start = System.currentTimeMillis();
73-
inputA.send(new GenericMessage<String>("Hello"));
74+
75+
new TransactionTemplate(new PseudoTransactionManager())
76+
.execute(status -> {
77+
inputA.send(new GenericMessage<>("Hello"));
78+
return null;
79+
});
80+
7481
assertThat(outputA.receive(10000)).isNotNull();
75-
assertThat((System.currentTimeMillis() - start) >= 1000).isTrue();
82+
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(1000);
7683
}
7784

7885
@Test
@@ -83,20 +90,19 @@ public void testDelayWithDefaultSchedulerCustomDelayHeader() {
8390
long start = System.currentTimeMillis();
8491
inputA.send(builder.build());
8592
assertThat(outputA.receive(10000)).isNotNull();
86-
assertThat((System.currentTimeMillis() - start) >= 2000).isTrue();
93+
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(2000);
8794
}
8895

8996
@Test
90-
@Ignore("Enough wonky test based on the timeout and hardware")
9197
public void testDelayWithCustomScheduler() {
9298
long start = System.currentTimeMillis();
93-
inputB.send(new GenericMessage<String>("1"));
94-
inputB.send(new GenericMessage<String>("2"));
95-
inputB.send(new GenericMessage<String>("3"));
96-
inputB.send(new GenericMessage<String>("4"));
97-
inputB.send(new GenericMessage<String>("5"));
98-
inputB.send(new GenericMessage<String>("6"));
99-
inputB.send(new GenericMessage<String>("7"));
99+
inputB.send(new GenericMessage<>("1"));
100+
inputB.send(new GenericMessage<>("2"));
101+
inputB.send(new GenericMessage<>("3"));
102+
inputB.send(new GenericMessage<>("4"));
103+
inputB.send(new GenericMessage<>("5"));
104+
inputB.send(new GenericMessage<>("6"));
105+
inputB.send(new GenericMessage<>("7"));
100106
assertThat(outputB1.receive(10000)).isNotNull();
101107
assertThat(outputB1.receive(10000)).isNotNull();
102108
assertThat(outputB1.receive(10000)).isNotNull();
@@ -108,27 +114,26 @@ public void testDelayWithCustomScheduler() {
108114
// must execute under 3 seconds, since threadPool is set too 5.
109115
// first batch is 5 concurrent invocations on SA, then 2 more
110116
// elapsed time for the whole execution should be a bit over 2 seconds depending on the hardware
111-
assertThat(((System.currentTimeMillis() - start) >= 1000) && ((System.currentTimeMillis() - start) < 3000))
112-
.isTrue();
117+
assertThat(System.currentTimeMillis() - start).isBetween(1000L, 3000L);
113118
}
114119

115-
@Test //INT-1132
120+
@Test
116121
public void testDelayerInsideChain() {
117122
long start = System.currentTimeMillis();
118-
delayerInsideChain.send(new GenericMessage<String>("Hello"));
123+
delayerInsideChain.send(new GenericMessage<>("Hello"));
119124
Message<?> message = outputA.receive(10000);
120125
assertThat(message).isNotNull();
121-
assertThat((System.currentTimeMillis() - start) >= 1000).isTrue();
126+
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(1000);
122127
assertThat(message.getPayload()).isEqualTo("hello");
123128
}
124129

125130
@Test
126131
public void testInt2243DelayerExpression() {
127132
long start = System.currentTimeMillis();
128-
this.inputC.send(new GenericMessage<String>("test"));
133+
this.inputC.send(new GenericMessage<>("test"));
129134
Message<?> message = this.outputC.receive(10000);
130135
assertThat(message).isNotNull();
131-
assertThat((System.currentTimeMillis() - start) >= 1000).isTrue();
136+
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(1000);
132137
assertThat(message.getPayload()).isEqualTo("test");
133138
}
134139

0 commit comments

Comments
 (0)