Skip to content

Commit f24c917

Browse files
committed
GH-10362: Block on PostgresSubscribableChannel.unsubscribe
Unsubscribing message handlers while processing an in-flight message may lead to a MessageDeliveryException. Fixes: GH-10362 Signed-off-by: Norbert Schneider <[email protected]>
1 parent cca5580 commit f24c917

File tree

2 files changed

+68
-24
lines changed

2 files changed

+68
-24
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Optional;
2121
import java.util.concurrent.Executor;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import org.jspecify.annotations.Nullable;
2425

@@ -53,6 +54,7 @@
5354
* @author Rafael Winterhalter
5455
* @author Artem Bilan
5556
* @author Igor Lovich
57+
* @author Norbert Schneider
5658
*
5759
* @since 6.0
5860
*/
@@ -71,6 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
7173

7274
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
7375

76+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
77+
7478
private @Nullable TransactionTemplate transactionTemplate;
7579

7680
private RetryTemplate retryTemplate =
@@ -164,12 +168,18 @@ public boolean subscribe(MessageHandler handler) {
164168

165169
@Override
166170
public boolean unsubscribe(MessageHandler handle) {
167-
boolean unsubscribed = super.unsubscribe(handle);
168-
if (this.dispatcher.getHandlerCount() == 0) {
169-
this.messageTableSubscriber.unsubscribe(this);
170-
this.hasHandlers = false;
171+
this.lock.writeLock().lock();
172+
try {
173+
boolean unsubscribed = super.unsubscribe(handle);
174+
if (this.dispatcher.getHandlerCount() == 0) {
175+
this.messageTableSubscriber.unsubscribe(this);
176+
this.hasHandlers = false;
177+
}
178+
return unsubscribed;
179+
}
180+
finally {
181+
this.lock.writeLock().unlock();
171182
}
172-
return unsubscribed;
173183
}
174184

175185
@Override
@@ -209,26 +219,24 @@ private Optional<?> pollAndDispatchMessage() {
209219
}
210220

211221
private Optional<?> doPollAndDispatchMessage() {
212-
if (this.hasHandlers) {
213-
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
214-
if (transactionTemplateToUse != null) {
215-
return executeWithRetry(() ->
216-
transactionTemplateToUse.execute(status ->
217-
pollMessage()
218-
.filter(message -> {
219-
if (!this.hasHandlers) {
220-
status.setRollbackOnly();
221-
return false;
222-
}
223-
return true;
224-
})
225-
.map(this::dispatch)));
226-
}
227-
else {
228-
return pollMessage()
229-
.map(message -> executeWithRetry(() -> dispatch(message)));
222+
this.lock.readLock().lock();
223+
try {
224+
if (this.hasHandlers) {
225+
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
226+
if (transactionTemplateToUse != null) {
227+
return executeWithRetry(() ->
228+
transactionTemplateToUse.execute(status ->
229+
pollMessage().map(this::dispatch)));
230+
}
231+
else {
232+
return pollMessage()
233+
.map(message -> executeWithRetry(() -> dispatch(message)));
234+
}
230235
}
231236
}
237+
finally {
238+
this.lock.readLock().unlock();
239+
}
232240
return Optional.empty();
233241
}
234242

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
30+
import java.util.stream.IntStream;
3031

3132
import javax.sql.DataSource;
3233

@@ -72,7 +73,7 @@
7273
* @author Igor Lovich
7374
* @author Adama Sorho
7475
* @author Johannes Edmeier
75-
*
76+
* @author Norbert Schneider
7677
* @since 6.0
7778
*/
7879
@SpringJUnitConfig
@@ -310,6 +311,41 @@ public void testRenewConnection() throws Exception {
310311
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
311312
}
312313

314+
@Test
315+
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
316+
int numberOfMessages = 150;
317+
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
318+
AtomicInteger receivedMessages = new AtomicInteger();
319+
AtomicReference<Throwable> receivedException = new AtomicReference<>();
320+
321+
postgresChannelMessageTableSubscriber.start();
322+
postgresSubscribableChannel.setErrorHandler(receivedException::set);
323+
MessageHandler messageHandler = message -> {
324+
receivedMessages.getAndIncrement();
325+
messagesUntilUnsubscribe.countDown();
326+
};
327+
postgresSubscribableChannel.subscribe(messageHandler);
328+
329+
IntStream.range(0, numberOfMessages).forEach(i -> {
330+
taskExecutor.execute(() -> {
331+
messageStore.addMessageToGroup(groupId, new GenericMessage<>(String.valueOf(i)));
332+
});
333+
});
334+
335+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
336+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
337+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
338+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
339+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
340+
341+
assertThat(messagesUntilUnsubscribe.await(30, TimeUnit.SECONDS)).isTrue();
342+
postgresSubscribableChannel.unsubscribe(messageHandler);
343+
344+
int undeliveredMessages = messageStore.messageGroupSize(groupId);
345+
assertThat(numberOfMessages).isEqualTo(undeliveredMessages + receivedMessages.get());
346+
assertThat(receivedException).hasNullValue();
347+
}
348+
313349
@Configuration
314350
@EnableIntegration
315351
public static class Config {

0 commit comments

Comments
 (0)