Skip to content

Commit 742e125

Browse files
committed
Some cleanup for PostgresSubscribableChannel
The `PostgresChannelMessageTableSubscriberTests.testUnsubscribeHandlerDuringDispatch()` is not stable * Fix the test to use less stress on unsubscription. It is still hard to reach consensus with a fast reader which is able to take a lock again and again in its loop. * Use `fair` for `ReentrantReadWriteLock` in the `PostgresSubscribableChannel`. However, it still doesn't show entropy in the test execution * Fix typos in the `PostgresChannelMessageTableSubscriber` * Add logging for the `inStoreCount` & `receivedCount` in the `testUnsubscribeHandlerDuringDispatch()` **Auto-cherry-pick to `6.4.x`** # Conflicts: # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java
1 parent 145e2a5 commit 742e125

File tree

3 files changed

+28
-34
lines changed

3 files changed

+28
-34
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,7 @@ public void start() {
168168
this.latch = new CountDownLatch(1);
169169

170170
CountDownLatch startingLatch = new CountDownLatch(1);
171-
this.future = this.taskExecutor.submit(() -> {
172-
doStart(startingLatch);
173-
});
171+
this.future = this.taskExecutor.submit(() -> doStart(startingLatch));
174172

175173
try {
176174
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
@@ -242,7 +240,7 @@ private void doStart(CountDownLatch startingLatch) {
242240
}
243241
catch (Exception e) {
244242
// The getNotifications method does not throw a meaningful message on interruption.
245-
// Therefore, we do not log an error, unless it occurred while active.
243+
// Therefore, we do not log an error unless it occurred while active.
246244
if (isActive()) {
247245
LOGGER.error(e, "Failed to poll notifications from Postgres database");
248246
}
@@ -252,7 +250,6 @@ private void doStart(CountDownLatch startingLatch) {
252250
finally {
253251
this.latch.countDown();
254252
}
255-
256253
}
257254

258255
private boolean isActive() {
@@ -285,6 +282,7 @@ public void stop() {
285282
}
286283
}
287284
catch (InterruptedException ignored) {
285+
Thread.currentThread().interrupt();
288286
}
289287
}
290288
finally {
@@ -311,8 +309,8 @@ public interface Subscription {
311309
/**
312310
* Indicate that a message was added to the represented region and
313311
* group id. Note that this method might also be invoked if there are
314-
* no new messages to read, for example if another subscription already
315-
* read those messages or if a new messages might have arrived during
312+
* no new messages to read, for example, if another subscription already
313+
* read those messages or if a new message might have arrived during
316314
* a temporary connection loss.
317315
*/
318316
void notifyUpdate();

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6767

6868
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
6969

70-
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();
70+
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock(true);
7171

7272
private TransactionTemplate transactionTemplate;
7373

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import javax.sql.DataSource;
3333

3434
import org.apache.commons.dbcp2.BasicDataSource;
35+
import org.apache.commons.logging.Log;
36+
import org.apache.commons.logging.LogFactory;
3537
import org.junit.jupiter.api.AfterEach;
3638
import org.junit.jupiter.api.BeforeEach;
3739
import org.junit.jupiter.api.Test;
@@ -40,6 +42,7 @@
4042
import org.junit.jupiter.params.provider.ValueSource;
4143
import org.postgresql.jdbc.PgConnection;
4244

45+
import org.springframework.beans.factory.BeanFactory;
4346
import org.springframework.beans.factory.annotation.Autowired;
4447
import org.springframework.context.annotation.Bean;
4548
import org.springframework.context.annotation.Configuration;
@@ -79,6 +82,8 @@
7982
@DirtiesContext
8083
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest {
8184

85+
private static final Log LOGGER = LogFactory.getLog(PostgresChannelMessageTableSubscriberTests.class);
86+
8287
private static final String INTEGRATION_DB_SCRIPTS = """
8388
CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
8489
RETURNS TRIGGER AS
@@ -97,6 +102,9 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
97102
^^^ END OF SCRIPT ^^^
98103
""";
99104

105+
@Autowired
106+
private BeanFactory beanFactory;
107+
100108
@Autowired
101109
private JdbcChannelMessageStore messageStore;
102110

@@ -122,16 +130,16 @@ void setUp(TestInfo testInfo) {
122130

123131
this.taskExecutor = new ThreadPoolTaskExecutor();
124132
this.taskExecutor.setCorePoolSize(10);
125-
this.taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
126-
this.taskExecutor.setAwaitTerminationSeconds(10);
127133
this.taskExecutor.afterPropertiesSet();
128134

129135
this.groupId = testInfo.getDisplayName();
136+
messageStore.removeMessageGroup(this.groupId);
130137

131138
this.postgresSubscribableChannel =
132139
new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber);
133140
this.postgresSubscribableChannel.setBeanName("testPostgresChannel");
134141
this.postgresSubscribableChannel.setDispatcherExecutor(this.taskExecutor);
142+
this.postgresSubscribableChannel.setBeanFactory(this.beanFactory);
135143
this.postgresSubscribableChannel.afterPropertiesSet();
136144
}
137145

@@ -310,45 +318,33 @@ public void testRenewConnection() throws Exception {
310318

311319
@Test
312320
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
313-
int numberOfMessages = 200;
321+
int numberOfMessages = 2000;
314322
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
315323
AtomicInteger receivedMessages = new AtomicInteger();
316324
AtomicReference<Throwable> receivedException = new AtomicReference<>();
317325

318326
postgresChannelMessageTableSubscriber.start();
319327
postgresSubscribableChannel.setErrorHandler(receivedException::set);
320328

321-
MessageHandler messageHandlerOne = message -> {
322-
receivedMessages.getAndIncrement();
323-
messagesUntilUnsubscribe.countDown();
324-
};
325-
postgresSubscribableChannel.subscribe(messageHandlerOne);
326-
327-
MessageHandler messageHandlerTwo = message -> {
329+
MessageHandler messageHandler = message -> {
328330
receivedMessages.getAndIncrement();
329331
messagesUntilUnsubscribe.countDown();
330332
};
331-
postgresSubscribableChannel.subscribe(messageHandlerTwo);
332-
333-
IntStream.range(0, numberOfMessages).forEach(i -> {
334-
taskExecutor.execute(() ->
335-
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
336-
});
333+
postgresSubscribableChannel.subscribe(messageHandler);
337334

338-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
339-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
340-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
341-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
342-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
335+
IntStream.range(0, numberOfMessages)
336+
.forEach(i -> messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
343337

344338
assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
345-
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
346-
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);
339+
postgresSubscribableChannel.unsubscribe(messageHandler);
347340

348-
int undeliveredMessages = messageStore.messageGroupSize(groupId);
349-
int processedMessages = undeliveredMessages + receivedMessages.get();
350-
assertThat(processedMessages).isEqualTo(numberOfMessages);
351341
assertThat(receivedException).hasNullValue();
342+
343+
int inStoreCount = messageStore.messageGroupSize(groupId);
344+
LOGGER.warn("inStoreCount: " + inStoreCount);
345+
int receivedCount = receivedMessages.get();
346+
LOGGER.warn("receivedCount: " + receivedCount);
347+
assertThat(inStoreCount + receivedCount).isEqualTo(numberOfMessages);
352348
}
353349

354350
@Configuration

0 commit comments

Comments
 (0)