Skip to content

Commit 012a03f

Browse files
committed
Fix PostgresSubscribableChannel race condition
The `PostgresSubscribableChannel` uses a task executor for dispatching messages. Even if we stop `PostgresChannelMessageTableSubscriber` and unsubscribe from the channel, the task might be ongoing. * Use explicit `ThreadPoolTaskExecutor` in the test to shout it down and wait for tasks to be completed before verifying DB status * Optimize `PostgresSubscribableChannel` to mark TX for rollback when we got a message from DB, but no handlers subscribed # Conflicts: # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java
1 parent 1aac947 commit 012a03f

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,16 @@ private Optional<Message<?>> askForMessage() {
168168
if (this.hasHandlers) {
169169
if (this.transactionTemplate != null) {
170170
return this.retryTemplate.execute(context ->
171-
this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch)));
171+
this.transactionTemplate.execute(status ->
172+
pollMessage()
173+
.filter(message -> {
174+
if (!this.hasHandlers) {
175+
status.setRollbackOnly();
176+
return false;
177+
}
178+
return true;
179+
})
180+
.map(this::dispatch)));
172181
}
173182
else {
174183
return pollMessage()

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.messaging.MessageHandler;
4949
import org.springframework.messaging.support.GenericMessage;
5050
import org.springframework.retry.support.RetryTemplate;
51+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
5152
import org.springframework.test.annotation.DirtiesContext;
5253
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5354
import org.springframework.transaction.PlatformTransactionManager;
@@ -109,6 +110,8 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
109110

110111
private PostgresSubscribableChannel postgresSubscribableChannel;
111112

113+
private ThreadPoolTaskExecutor taskExecutor;
114+
112115
private String groupId;
113116

114117
@BeforeEach
@@ -121,15 +124,26 @@ void setUp(TestInfo testInfo) {
121124
POSTGRES_CONTAINER.getPassword())
122125
.unwrap(PgConnection.class));
123126

127+
128+
this.taskExecutor = new ThreadPoolTaskExecutor();
129+
this.taskExecutor.setCorePoolSize(10);
130+
this.taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
131+
this.taskExecutor.setAwaitTerminationSeconds(10);
132+
this.taskExecutor.afterPropertiesSet();
133+
124134
this.groupId = testInfo.getDisplayName();
125135

126136
this.postgresSubscribableChannel =
127137
new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber);
138+
this.postgresSubscribableChannel.setBeanName("testPostgresChannel");
139+
this.postgresSubscribableChannel.setDispatcherExecutor(this.taskExecutor);
140+
this.postgresSubscribableChannel.afterPropertiesSet();
128141
}
129142

130143
@AfterEach
131144
void tearDown() {
132145
this.postgresChannelMessageTableSubscriber.stop();
146+
this.taskExecutor.shutdown();
133147
}
134148

135149

@@ -172,13 +186,13 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
172186
postgresChannelMessageTableSubscriber.start();
173187
MessageHandler messageHandler =
174188
message -> {
175-
try {
176-
throw new RuntimeException("An error has occurred");
177-
}
178-
finally {
179-
latch.countDown();
180-
}
181-
};
189+
try {
190+
throw new RuntimeException("An error has occurred");
191+
}
192+
finally {
193+
latch.countDown();
194+
}
195+
};
182196
postgresSubscribableChannel.subscribe(messageHandler);
183197

184198
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
@@ -189,6 +203,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
189203
// Stop subscriber to unlock records from TX for the next verification
190204
postgresChannelMessageTableSubscriber.stop();
191205
postgresSubscribableChannel.unsubscribe(messageHandler);
206+
this.taskExecutor.shutdown();
192207

193208
assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2);
194209
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");

0 commit comments

Comments
 (0)