Skip to content

Commit 33dad8d

Browse files
committed
GH-10362: Block on PostgresSubscribableChannel.unsubscribe
Unsubscribing message handlers while processing an in-flight message may lead to a MessageDeliveryException. Fixes: GH-10362 Signed-off-by: Norbert Schneider <[email protected]>
1 parent b683f9b commit 33dad8d

File tree

2 files changed

+67
-14
lines changed

2 files changed

+67
-14
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Optional;
2121
import java.util.concurrent.Executor;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import org.jspecify.annotations.Nullable;
2425

@@ -53,6 +54,7 @@
5354
* @author Rafael Winterhalter
5455
* @author Artem Bilan
5556
* @author Igor Lovich
57+
* @author Norbert Schneider
5658
*
5759
* @since 6.0
5860
*/
@@ -71,6 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
7173

7274
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
7375

76+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
77+
7478
private @Nullable TransactionTemplate transactionTemplate;
7579

7680
private RetryTemplate retryTemplate =
@@ -153,23 +157,35 @@ protected void onInit() {
153157

154158
@Override
155159
public boolean subscribe(MessageHandler handler) {
156-
boolean subscribed = super.subscribe(handler);
157-
if (this.dispatcher.getHandlerCount() == 1) {
158-
this.messageTableSubscriber.subscribe(this);
159-
this.hasHandlers = true;
160-
notifyUpdate();
160+
this.lock.writeLock().lock();
161+
try {
162+
boolean subscribed = super.subscribe(handler);
163+
if (this.dispatcher.getHandlerCount() == 1) {
164+
this.messageTableSubscriber.subscribe(this);
165+
this.hasHandlers = true;
166+
notifyUpdate();
167+
}
168+
return subscribed;
169+
}
170+
finally {
171+
this.lock.writeLock().unlock();
161172
}
162-
return subscribed;
163173
}
164174

165175
@Override
166176
public boolean unsubscribe(MessageHandler handle) {
167-
boolean unsubscribed = super.unsubscribe(handle);
168-
if (this.dispatcher.getHandlerCount() == 0) {
169-
this.messageTableSubscriber.unsubscribe(this);
170-
this.hasHandlers = false;
177+
this.lock.writeLock().lock();
178+
try {
179+
boolean unsubscribed = super.unsubscribe(handle);
180+
if (this.dispatcher.getHandlerCount() == 0) {
181+
this.messageTableSubscriber.unsubscribe(this);
182+
this.hasHandlers = false;
183+
}
184+
return unsubscribed;
185+
}
186+
finally {
187+
this.lock.writeLock().unlock();
171188
}
172-
return unsubscribed;
173189
}
174190

175191
@Override
@@ -188,8 +204,14 @@ public void notifyUpdate() {
188204
this.executor.execute(() -> {
189205
Optional<?> dispatchedMessage;
190206
do {
191-
dispatchedMessage = pollAndDispatchMessage();
192-
} while (dispatchedMessage.isPresent());
207+
this.lock.readLock().lock();
208+
try {
209+
dispatchedMessage = pollAndDispatchMessage();
210+
}
211+
finally {
212+
this.lock.readLock().unlock();
213+
}
214+
} while (this.hasHandlers && dispatchedMessage.isPresent());
193215
});
194216
}
195217

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
30+
import java.util.stream.IntStream;
3031

3132
import javax.sql.DataSource;
3233

@@ -72,7 +73,7 @@
7273
* @author Igor Lovich
7374
* @author Adama Sorho
7475
* @author Johannes Edmeier
75-
*
76+
* @author Norbert Schneider
7677
* @since 6.0
7778
*/
7879
@SpringJUnitConfig
@@ -310,6 +311,36 @@ public void testRenewConnection() throws Exception {
310311
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
311312
}
312313

314+
@Test
315+
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
316+
int numberOfMessages = 300;
317+
CountDownLatch receivedMessageCount = new CountDownLatch(numberOfMessages);
318+
AtomicReference<Throwable> receivedException = new AtomicReference<>();
319+
320+
postgresChannelMessageTableSubscriber.start();
321+
postgresSubscribableChannel.setErrorHandler(receivedException::set);
322+
MessageHandler messageHandler = message -> {
323+
receivedMessageCount.countDown();
324+
};
325+
postgresSubscribableChannel.subscribe(messageHandler);
326+
327+
IntStream.range(0, numberOfMessages).forEach(i -> {
328+
taskExecutor.execute(() -> {
329+
messageStore.addMessageToGroup(groupId, new GenericMessage<>(String.valueOf(i)));
330+
});
331+
});
332+
333+
Thread.sleep(50);
334+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
335+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
336+
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
337+
Thread.sleep(50);
338+
postgresSubscribableChannel.unsubscribe(messageHandler);
339+
340+
assertThat(receivedMessageCount.await(2, TimeUnit.SECONDS)).isFalse();
341+
assertThat(receivedException).hasNullValue();
342+
}
343+
313344
@Configuration
314345
@EnableIntegration
315346
public static class Config {

0 commit comments

Comments
 (0)