Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -53,6 +54,7 @@
* @author Rafael Winterhalter
* @author Artem Bilan
* @author Igor Lovich
* @author Norbert Schneider
*
* @since 6.0
*/
Expand All @@ -71,6 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel

private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();

private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();

private @Nullable TransactionTemplate transactionTemplate;

private RetryTemplate retryTemplate =
Expand Down Expand Up @@ -164,12 +168,18 @@ public boolean subscribe(MessageHandler handler) {

@Override
public boolean unsubscribe(MessageHandler handle) {
boolean unsubscribed = super.unsubscribe(handle);
if (this.dispatcher.getHandlerCount() == 0) {
this.messageTableSubscriber.unsubscribe(this);
this.hasHandlers = false;
this.hasHandlersLock.writeLock().lock();
try {
boolean unsubscribed = super.unsubscribe(handle);
if (this.dispatcher.getHandlerCount() == 0) {
this.messageTableSubscriber.unsubscribe(this);
this.hasHandlers = false;
}
return unsubscribed;
}
finally {
this.hasHandlersLock.writeLock().unlock();
}
return unsubscribed;
}

@Override
Expand Down Expand Up @@ -209,26 +219,24 @@ private Optional<?> pollAndDispatchMessage() {
}

private Optional<?> doPollAndDispatchMessage() {
if (this.hasHandlers) {
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
if (transactionTemplateToUse != null) {
return executeWithRetry(() ->
transactionTemplateToUse.execute(status ->
pollMessage()
.filter(message -> {
if (!this.hasHandlers) {
status.setRollbackOnly();
return false;
}
return true;
})
.map(this::dispatch)));
}
else {
return pollMessage()
.map(message -> executeWithRetry(() -> dispatch(message)));
this.hasHandlersLock.readLock().lock();
try {
if (this.hasHandlers) {
TransactionTemplate transactionTemplateToUse = this.transactionTemplate;
if (transactionTemplateToUse != null) {
return executeWithRetry(() ->
transactionTemplateToUse.execute(status ->
pollMessage().map(this::dispatch)));
}
else {
return pollMessage()
.map(message -> executeWithRetry(() -> dispatch(message)));
}
}
}
finally {
this.hasHandlersLock.readLock().unlock();
}
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import javax.sql.DataSource;

Expand Down Expand Up @@ -72,6 +73,7 @@
* @author Igor Lovich
* @author Adama Sorho
* @author Johannes Edmeier
* @author Norbert Schneider
*
* @since 6.0
*/
Expand Down Expand Up @@ -310,6 +312,49 @@ public void testRenewConnection() throws Exception {
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
}

@Test
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
int numberOfMessages = 200;
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
AtomicInteger receivedMessages = new AtomicInteger();
AtomicReference<Throwable> receivedException = new AtomicReference<>();

postgresChannelMessageTableSubscriber.start();
postgresSubscribableChannel.setErrorHandler(receivedException::set);

MessageHandler messageHandlerOne = message -> {
receivedMessages.getAndIncrement();
messagesUntilUnsubscribe.countDown();
};
postgresSubscribableChannel.subscribe(messageHandlerOne);

MessageHandler messageHandlerTwo = message -> {
receivedMessages.getAndIncrement();
messagesUntilUnsubscribe.countDown();
};
postgresSubscribableChannel.subscribe(messageHandlerTwo);

IntStream.range(0, numberOfMessages).forEach(i -> {
taskExecutor.execute(() ->
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
});

taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);

assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);

int undeliveredMessages = messageStore.messageGroupSize(groupId);
int processedMessages = undeliveredMessages + receivedMessages.get();
assertThat(processedMessages).isEqualTo(numberOfMessages);
assertThat(receivedException).hasNullValue();
}

@Configuration
@EnableIntegration
public static class Config {
Expand Down