Skip to content

Commit 7d350ce

Browse files
committed
Fix JdbcMessageStoreChannelTests race condition
The message in the `QueueChannel` appears for consuming a bit earlier than TX is committed * Introduce `afterCommitLatch` into the test verify the state when TX is really committed and data is removed from DB **Cherry-pick to 5.0.x and 4.3.x** (cherry picked from commit e4e4ee0) # Conflicts: # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests-context.xml # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java
1 parent 9c0a628 commit 7d350ce

File tree

2 files changed

+42
-26
lines changed

2 files changed

+42
-26
lines changed

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests-context.xml

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,43 @@
55
xmlns:int="http://www.springframework.org/schema/integration"
66
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
77
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
8-
xmlns:context="http://www.springframework.org/schema/context"
98
xsi:schemaLocation="http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
109
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
1110
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
12-
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
13-
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
11+
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd">
1412

15-
<jdbc:embedded-database id="dataSource" type="DERBY" />
13+
<jdbc:embedded-database id="dataSource" type="H2" />
1614

1715
<jdbc:initialize-database data-source="dataSource" ignore-failures="DROPS">
1816
<jdbc:script location="${int.drop.script}" />
19-
<jdbc:script location="${int.schema.script}" />
17+
<jdbc:script location="org/springframework/integration/jdbc/schema-h2.sql" />
2018
</jdbc:initialize-database>
2119

2220
<int-jdbc:message-store id="messageStore" data-source="dataSource" />
2321

24-
<channel id="input" xmlns="http://www.springframework.org/schema/integration">
25-
<queue ref="queue"/>
26-
</channel>
27-
28-
<int:channel id="output"/>
29-
30-
<int:logging-channel-adapter channel="output"/>
22+
<int:channel id="input">
23+
<int:queue ref="queue"/>
24+
</int:channel>
3125

3226
<bean id="queue" class="org.springframework.integration.store.MessageGroupQueue">
3327
<constructor-arg ref="messageStore" />
3428
<constructor-arg value="JdbcMessageStoreChannelTests" />
3529
</bean>
3630

37-
<service-activator id="service-activator" input-channel="input" output-channel="output" xmlns="http://www.springframework.org/schema/integration">
31+
<int:service-activator id="service-activator" input-channel="input" output-channel="nullChannel">
3832
<beans:bean class="org.springframework.integration.jdbc.JdbcMessageStoreChannelTests$Service" />
39-
<poller fixed-rate="2000">
40-
<transactional />
41-
</poller>
42-
</service-activator>
43-
44-
<context:property-placeholder location="int-${ENVIRONMENT:derby}.properties"
45-
system-properties-mode="OVERRIDE"
46-
ignore-unresolvable="true"
47-
order="1"/>
33+
<int:poller fixed-rate="2000">
34+
<int:transactional synchronization-factory="transactionSynchronizationFactory"/>
35+
</int:poller>
36+
</int:service-activator>
37+
38+
<int:transaction-synchronization-factory id="transactionSynchronizationFactory">
39+
<int:after-commit expression="@afterCommitLatch.countDown()"/>
40+
</int:transaction-synchronization-factory>
41+
42+
<bean id="afterCommitLatch" class="java.util.concurrent.CountDownLatch">
43+
<constructor-arg value="1"/>
44+
</bean>
4845

4946
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
5047
<property name="dataSource" ref="dataSource" />

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.jdbc;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
2021
import static org.junit.Assert.fail;
2122

2223
import java.util.List;
@@ -45,9 +46,28 @@ public class JdbcMessageStoreChannelTests {
4546
@Autowired
4647
private MessageChannel input;
4748

49+
@Autowired
50+
private CountDownLatch afterCommitLatch;
51+
4852
@Autowired
4953
private JdbcMessageStore messageStore;
5054

55+
@Autowired
56+
@Qualifier("service-activator")
57+
private AbstractEndpoint serviceActivator;
58+
59+
60+
@Before
61+
public void init() {
62+
Service.reset(1);
63+
this.serviceActivator.start();
64+
}
65+
66+
@After
67+
public void tearDown() {
68+
this.serviceActivator.stop();
69+
}
70+
5171
@BeforeTransaction
5272
public void clear() {
5373
for (MessageGroup group : messageStore) {
@@ -56,10 +76,9 @@ public void clear() {
5676
}
5777

5878
@Test
59-
public void testSendAndActivate() throws Exception {
60-
Service.reset(1);
61-
input.send(new GenericMessage<String>("foo"));
62-
Service.await(10000);
79+
public void testSendAndActivate() throws InterruptedException {
80+
this.input.send(new GenericMessage<>("foo"));
81+
assertTrue(this.afterCommitLatch.await(10, TimeUnit.SECONDS));
6382
assertEquals(1, Service.messages.size());
6483
assertEquals(0, messageStore.getMessageGroup("JdbcMessageStoreChannelTests").size());
6584
}

0 commit comments

Comments
 (0)