Skip to content

Commit 1da83bd

Browse files
GH-10362: Block on PostgresSubscribableChannel.unsubscribe (#10363)
GH-10362: Fix `PostgresSubscribableChannel` for race condition Fixes: #10362 Unsubscribing message handlers while processing an in-flight message may lead to a `MessageDeliveryException`. There is a possibility in the `PostgresSubscribableChannel` to have a polled message dispatched when no subscribers to the channel any more. The `doPollAndDispatchMessage()` can enter to the `pollMessage()` when it sees `hasHandlers`. However, the last subscriber to the channel is unsubscribed in parallel. * Fix `PostgresSubscribableChannel` using a `ReadWriteLock` to block `unsubscribe()` until currently polled message is processed. Or opposite: do not let to `doPollAndDispatchMessage()` to enter `pollMessage()` logic until `unsubscribe()` is done. This way the `hasHandlers` state is guarded by lock synchronization. Signed-off-by: Norbert Schneider <[email protected]> **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent ce6e398 commit 1da83bd

File tree

2 files changed

+76
-23
lines changed

2 files changed

+76
-23
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Optional;
2121
import java.util.concurrent.Executor;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import org.jspecify.annotations.Nullable;
2425

@@ -53,6 +54,7 @@
5354
* @author Rafael Winterhalter
5455
* @author Artem Bilan
5556
* @author Igor Lovich
57+
* @author Norbert Schneider
5658
*
5759
* @since 6.0
5860
*/
@@ -71,6 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
7173

7274
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
7375

76+
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();
77+
7478
private @Nullable TransactionTemplate transactionTemplate;
7579

7680
private RetryTemplate retryTemplate =
@@ -164,12 +168,18 @@ public boolean subscribe(MessageHandler handler) {
164168

165169
@Override
166170
public boolean unsubscribe(MessageHandler handle) {
167-
boolean unsubscribed = super.unsubscribe(handle);
168-
if (this.dispatcher.getHandlerCount() == 0) {
169-
this.messageTableSubscriber.unsubscribe(this);
170-
this.hasHandlers = false;
171+
this.hasHandlersLock.writeLock().lock();
172+
try {
173+
boolean unsubscribed = super.unsubscribe(handle);
174+
if (this.dispatcher.getHandlerCount() == 0) {
175+
this.messageTableSubscriber.unsubscribe(this);
176+
this.hasHandlers = false;
177+
}
178+
return unsubscribed;
179+
}
180+
finally {
181+
this.hasHandlersLock.writeLock().unlock();
171182
}
172-
return unsubscribed;
173183
}
174184

175185
@Override
@@ -209,26 +219,24 @@ private Optional<?> pollAndDispatchMessage() {
209219
}
210220

211221
private Optional<?> doPollAndDispatchMessage() {
212-
if (this.hasHandlers) {
213-
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
214-
if (transactionTemplateToUse != null) {
215-
return executeWithRetry(() ->
216-
transactionTemplateToUse.execute(status ->
217-
pollMessage()
218-
.filter(message -> {
219-
if (!this.hasHandlers) {
220-
status.setRollbackOnly();
221-
return false;
222-
}
223-
return true;
224-
})
225-
.map(this::dispatch)));
226-
}
227-
else {
228-
return pollMessage()
229-
.map(message -> executeWithRetry(() -> dispatch(message)));
222+
this.hasHandlersLock.readLock().lock();
223+
try {
224+
if (this.hasHandlers) {
225+
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
226+
if (transactionTemplateToUse != null) {
227+
return executeWithRetry(() ->
228+
transactionTemplateToUse.execute(status ->
229+
pollMessage().map(this::dispatch)));
230+
}
231+
else {
232+
return pollMessage()
233+
.map(message -> executeWithRetry(() -> dispatch(message)));
234+
}
230235
}
231236
}
237+
finally {
238+
this.hasHandlersLock.readLock().unlock();
239+
}
232240
return Optional.empty();
233241
}
234242

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
30+
import java.util.stream.IntStream;
3031

3132
import javax.sql.DataSource;
3233

@@ -72,6 +73,7 @@
7273
* @author Igor Lovich
7374
* @author Adama Sorho
7475
* @author Johannes Edmeier
76+
* @author Norbert Schneider
7577
*
7678
* @since 6.0
7779
*/
@@ -310,6 +312,49 @@ public void testRenewConnection() throws Exception {
310312
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
311313
}
312314

315+
@Test
316+
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
317+
int numberOfMessages = 200;
318+
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
319+
AtomicInteger receivedMessages = new AtomicInteger();
320+
AtomicReference<Throwable> receivedException = new AtomicReference<>();
321+
322+
postgresChannelMessageTableSubscriber.start();
323+
postgresSubscribableChannel.setErrorHandler(receivedException::set);
324+
325+
MessageHandler messageHandlerOne = message -> {
326+
receivedMessages.getAndIncrement();
327+
messagesUntilUnsubscribe.countDown();
328+
};
329+
postgresSubscribableChannel.subscribe(messageHandlerOne);
330+
331+
MessageHandler messageHandlerTwo = message -> {
332+
receivedMessages.getAndIncrement();
333+
messagesUntilUnsubscribe.countDown();
334+
};
335+
postgresSubscribableChannel.subscribe(messageHandlerTwo);
336+
337+
IntStream.range(0, numberOfMessages).forEach(i -> {
338+
taskExecutor.execute(() ->
339+
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
340+
});
341+
342+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
343+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
344+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
345+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
346+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
347+
348+
assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
349+
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
350+
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);
351+
352+
int undeliveredMessages = messageStore.messageGroupSize(groupId);
353+
int processedMessages = undeliveredMessages + receivedMessages.get();
354+
assertThat(processedMessages).isEqualTo(numberOfMessages);
355+
assertThat(receivedException).hasNullValue();
356+
}
357+
313358
@Configuration
314359
@EnableIntegration
315360
public static class Config {

0 commit comments

Comments
 (0)