Skip to content

Commit 5a7cccf

Browse files
bertschneiderspring-builds
authored andcommitted
GH-10362: Fix PostgresSubscribableChannel for race condition
Fixes: #10362 Unsubscribing message handlers while processing an in-flight message may lead to a `MessageDeliveryException`. There is a possibility in the `PostgresSubscribableChannel` to have a polled message dispatched when no subscribers to the channel anymore. The `doPollAndDispatchMessage()` can enter to the `pollMessage()` when it sees `hasHandlers`. However, the last subscriber to the channel is unsubscribed in parallel. * Fix `PostgresSubscribableChannel` using a `ReadWriteLock` to block `unsubscribe()` until currently polled message is processed. Or opposite: do not let to `doPollAndDispatchMessage()` to enter `pollMessage()` logic until `unsubscribe()` is done. This way the `hasHandlers` state is guarded by lock synchronization. Signed-off-by: Norbert Schneider <[email protected]> # Conflicts: # spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java (cherry picked from commit 145e2a5)
1 parent 8d81b02 commit 5a7cccf

File tree

2 files changed

+76
-22
lines changed

2 files changed

+76
-22
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Optional;
2020
import java.util.concurrent.Executor;
21+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2122

2223
import org.springframework.core.log.LogAccessor;
2324
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -47,6 +48,7 @@
4748
* @author Rafael Winterhalter
4849
* @author Artem Bilan
4950
* @author Igor Lovich
51+
* @author Norbert Schneider
5052
*
5153
* @since 6.0
5254
*/
@@ -65,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6567

6668
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
6769

70+
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();
71+
6872
private TransactionTemplate transactionTemplate;
6973

7074
private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
@@ -156,12 +160,18 @@ public boolean subscribe(MessageHandler handler) {
156160

157161
@Override
158162
public boolean unsubscribe(MessageHandler handle) {
159-
boolean unsubscribed = super.unsubscribe(handle);
160-
if (this.dispatcher.getHandlerCount() == 0) {
161-
this.messageTableSubscriber.unsubscribe(this);
162-
this.hasHandlers = false;
163+
this.hasHandlersLock.writeLock().lock();
164+
try {
165+
boolean unsubscribed = super.unsubscribe(handle);
166+
if (this.dispatcher.getHandlerCount() == 0) {
167+
this.messageTableSubscriber.unsubscribe(this);
168+
this.hasHandlers = false;
169+
}
170+
return unsubscribed;
171+
}
172+
finally {
173+
this.hasHandlersLock.writeLock().unlock();
163174
}
164-
return unsubscribed;
165175
}
166176

167177
@Override
@@ -201,25 +211,24 @@ private Optional<?> pollAndDispatchMessage() {
201211
}
202212

203213
private Optional<?> doPollAndDispatchMessage() {
204-
if (this.hasHandlers) {
205-
if (this.transactionTemplate != null) {
206-
return this.retryTemplate.execute(context ->
207-
this.transactionTemplate.execute(status ->
208-
pollMessage()
209-
.filter(message -> {
210-
if (!this.hasHandlers) {
211-
status.setRollbackOnly();
212-
return false;
213-
}
214-
return true;
215-
})
216-
.map(this::dispatch)));
217-
}
218-
else {
219-
return pollMessage()
220-
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
214+
this.hasHandlersLock.readLock().lock();
215+
try {
216+
if (this.hasHandlers) {
217+
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
218+
if (transactionTemplateToUse != null) {
219+
return this.retryTemplate.execute(context ->
220+
transactionTemplateToUse.execute(status ->
221+
pollMessage().map(this::dispatch)));
222+
}
223+
else {
224+
return pollMessage()
225+
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
226+
}
221227
}
222228
}
229+
finally {
230+
this.hasHandlersLock.readLock().unlock();
231+
}
223232
return Optional.empty();
224233
}
225234

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
30+
import java.util.stream.IntStream;
3031

3132
import javax.sql.DataSource;
3233

@@ -70,6 +71,7 @@
7071
* @author Igor Lovich
7172
* @author Adama Sorho
7273
* @author Johannes Edmeier
74+
* @author Norbert Schneider
7375
*
7476
* @since 6.0
7577
*/
@@ -299,6 +301,49 @@ public void testRenewConnection() throws Exception {
299301
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
300302
}
301303

304+
@Test
305+
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
306+
int numberOfMessages = 200;
307+
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
308+
AtomicInteger receivedMessages = new AtomicInteger();
309+
AtomicReference<Throwable> receivedException = new AtomicReference<>();
310+
311+
postgresChannelMessageTableSubscriber.start();
312+
postgresSubscribableChannel.setErrorHandler(receivedException::set);
313+
314+
MessageHandler messageHandlerOne = message -> {
315+
receivedMessages.getAndIncrement();
316+
messagesUntilUnsubscribe.countDown();
317+
};
318+
postgresSubscribableChannel.subscribe(messageHandlerOne);
319+
320+
MessageHandler messageHandlerTwo = message -> {
321+
receivedMessages.getAndIncrement();
322+
messagesUntilUnsubscribe.countDown();
323+
};
324+
postgresSubscribableChannel.subscribe(messageHandlerTwo);
325+
326+
IntStream.range(0, numberOfMessages).forEach(i -> {
327+
taskExecutor.execute(() ->
328+
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
329+
});
330+
331+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
332+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
333+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
334+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
335+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
336+
337+
assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
338+
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
339+
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);
340+
341+
int undeliveredMessages = messageStore.messageGroupSize(groupId);
342+
int processedMessages = undeliveredMessages + receivedMessages.get();
343+
assertThat(processedMessages).isEqualTo(numberOfMessages);
344+
assertThat(receivedException).hasNullValue();
345+
}
346+
302347
@Configuration
303348
@EnableIntegration
304349
public static class Config {

0 commit comments

Comments
 (0)