From 0aa2aa3539c9b34699554b9263c2a471ca95aa61 Mon Sep 17 00:00:00 2001 From: Norbert Schneider Date: Fri, 29 Aug 2025 12:29:58 +0200 Subject: [PATCH] GH-10362: Block on PostgresSubscribableChannel.unsubscribe Unsubscribing message handlers while processing an in-flight message may lead to a MessageDeliveryException. Fixes: GH-10362 Signed-off-by: Norbert Schneider --- .../channel/PostgresSubscribableChannel.java | 54 +++++++++++-------- ...resChannelMessageTableSubscriberTests.java | 45 ++++++++++++++++ 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java index bef4ccd691..5a7426f77e 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.jspecify.annotations.Nullable; @@ -53,6 +54,7 @@ * @author Rafael Winterhalter * @author Artem Bilan * @author Igor Lovich + * @author Norbert Schneider * * @since 6.0 */ @@ -71,6 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel private final UnicastingDispatcher dispatcher = new UnicastingDispatcher(); + private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock(); + private @Nullable TransactionTemplate transactionTemplate; private RetryTemplate retryTemplate = @@ -164,12 +168,18 @@ public boolean subscribe(MessageHandler handler) { @Override public boolean unsubscribe(MessageHandler handle) { - boolean unsubscribed = super.unsubscribe(handle); - if (this.dispatcher.getHandlerCount() == 0) { - this.messageTableSubscriber.unsubscribe(this); - this.hasHandlers = false; + this.hasHandlersLock.writeLock().lock(); + try { + boolean unsubscribed = super.unsubscribe(handle); + if (this.dispatcher.getHandlerCount() == 0) { + this.messageTableSubscriber.unsubscribe(this); + this.hasHandlers = false; + } + return unsubscribed; + } + finally { + this.hasHandlersLock.writeLock().unlock(); } - return unsubscribed; } @Override @@ -209,26 +219,24 @@ private Optional pollAndDispatchMessage() { } private Optional doPollAndDispatchMessage() { - if (this.hasHandlers) { - TransactionTemplate transactionTemplateToUse = this.transactionTemplate; - if (transactionTemplateToUse != null) { - return executeWithRetry(() -> - transactionTemplateToUse.execute(status -> - pollMessage() - .filter(message -> { - if (!this.hasHandlers) { - status.setRollbackOnly(); - return false; - } - return true; - }) - .map(this::dispatch))); - } - else { - return pollMessage() - .map(message -> executeWithRetry(() -> dispatch(message))); + this.hasHandlersLock.readLock().lock(); + try { + if (this.hasHandlers) { + TransactionTemplate transactionTemplateToUse = this.transactionTemplate; + if (transactionTemplateToUse != null) { + return executeWithRetry(() -> + transactionTemplateToUse.execute(status -> + pollMessage().map(this::dispatch))); + } + else { + return pollMessage() + .map(message -> executeWithRetry(() -> dispatch(message))); + } } } + finally { + this.hasHandlersLock.readLock().unlock(); + } return Optional.empty(); } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java index 866b39d1a3..bb334495a0 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.IntStream; import javax.sql.DataSource; @@ -72,6 +73,7 @@ * @author Igor Lovich * @author Adama Sorho * @author Johannes Edmeier + * @author Norbert Schneider * * @since 6.0 */ @@ -310,6 +312,49 @@ public void testRenewConnection() throws Exception { assertThat(payloads).containsExactlyInAnyOrder("1", "2"); } + @Test + public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException { + int numberOfMessages = 200; + CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10); + AtomicInteger receivedMessages = new AtomicInteger(); + AtomicReference receivedException = new AtomicReference<>(); + + postgresChannelMessageTableSubscriber.start(); + postgresSubscribableChannel.setErrorHandler(receivedException::set); + + MessageHandler messageHandlerOne = message -> { + receivedMessages.getAndIncrement(); + messagesUntilUnsubscribe.countDown(); + }; + postgresSubscribableChannel.subscribe(messageHandlerOne); + + MessageHandler messageHandlerTwo = message -> { + receivedMessages.getAndIncrement(); + messagesUntilUnsubscribe.countDown(); + }; + postgresSubscribableChannel.subscribe(messageHandlerTwo); + + IntStream.range(0, numberOfMessages).forEach(i -> { + taskExecutor.execute(() -> + messageStore.addMessageToGroup(groupId, new GenericMessage<>(i))); + }); + + taskExecutor.execute(postgresSubscribableChannel::notifyUpdate); + taskExecutor.execute(postgresSubscribableChannel::notifyUpdate); + taskExecutor.execute(postgresSubscribableChannel::notifyUpdate); + taskExecutor.execute(postgresSubscribableChannel::notifyUpdate); + taskExecutor.execute(postgresSubscribableChannel::notifyUpdate); + + assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue(); + postgresSubscribableChannel.unsubscribe(messageHandlerOne); + postgresSubscribableChannel.unsubscribe(messageHandlerTwo); + + int undeliveredMessages = messageStore.messageGroupSize(groupId); + int processedMessages = undeliveredMessages + receivedMessages.get(); + assertThat(processedMessages).isEqualTo(numberOfMessages); + assertThat(receivedException).hasNullValue(); + } + @Configuration @EnableIntegration public static class Config {