Skip to content

Commit 5a07f7d

Browse files
committed
Some cleanup for PostgresSubscribableChannel
The `PostgresChannelMessageTableSubscriberTests.testUnsubscribeHandlerDuringDispatch()` is not stable * Fix the test to use less stress on unsubscription. It is still hard to reach consensus with a fast reader which is able to take a lock again and again in its loop. * Use `fair` for `ReentrantReadWriteLock` in the `PostgresSubscribableChannel`. However, it still doesn't show entropy in the test execution * Fix typos in the `PostgresChannelMessageTableSubscriber` * Add logging for the `inStoreCount` & `receivedCount` in the `testUnsubscribeHandlerDuringDispatch()` **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent 3e691c6 commit 5a07f7d

File tree

3 files changed

+29
-37
lines changed

3 files changed

+29
-37
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,7 @@ public void start() {
168168
this.latch = new CountDownLatch(1);
169169

170170
CountDownLatch startingLatch = new CountDownLatch(1);
171-
this.future = this.taskExecutor.submit(() -> {
172-
doStart(startingLatch);
173-
});
171+
this.future = this.taskExecutor.submit(() -> doStart(startingLatch));
174172

175173
try {
176174
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
@@ -242,7 +240,7 @@ private void doStart(CountDownLatch startingLatch) {
242240
}
243241
catch (Exception e) {
244242
// The getNotifications method does not throw a meaningful message on interruption.
245-
// Therefore, we do not log an error, unless it occurred while active.
243+
// Therefore, we do not log an error unless it occurred while active.
246244
if (isActive()) {
247245
LOGGER.error(e, "Failed to poll notifications from Postgres database");
248246
}
@@ -252,7 +250,6 @@ private void doStart(CountDownLatch startingLatch) {
252250
finally {
253251
this.latch.countDown();
254252
}
255-
256253
}
257254

258255
private boolean isActive() {
@@ -285,6 +282,7 @@ public void stop() {
285282
}
286283
}
287284
catch (InterruptedException ignored) {
285+
Thread.currentThread().interrupt();
288286
}
289287
}
290288
finally {
@@ -311,8 +309,8 @@ public interface Subscription {
311309
/**
312310
* Indicate that a message was added to the represented region and
313311
* group id. Note that this method might also be invoked if there are
314-
* no new messages to read, for example if another subscription already
315-
* read those messages or if a new messages might have arrived during
312+
* no new messages to read, for example, if another subscription already
313+
* read those messages or if a new message might have arrived during
316314
* a temporary connection loss.
317315
*/
318316
void notifyUpdate();

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
7373

7474
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
7575

76-
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();
76+
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock(true);
7777

7878
private @Nullable TransactionTemplate transactionTemplate;
7979

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import javax.sql.DataSource;
3333

3434
import org.apache.commons.dbcp2.BasicDataSource;
35+
import org.apache.commons.logging.Log;
36+
import org.apache.commons.logging.LogFactory;
3537
import org.junit.jupiter.api.AfterEach;
3638
import org.junit.jupiter.api.BeforeEach;
3739
import org.junit.jupiter.api.Test;
@@ -40,6 +42,7 @@
4042
import org.junit.jupiter.params.provider.ValueSource;
4143
import org.postgresql.jdbc.PgConnection;
4244

45+
import org.springframework.beans.factory.BeanFactory;
4346
import org.springframework.beans.factory.annotation.Autowired;
4447
import org.springframework.context.annotation.Bean;
4548
import org.springframework.context.annotation.Configuration;
@@ -52,7 +55,6 @@
5255
import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
5356
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
5457
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
55-
import org.springframework.integration.test.support.TestApplicationContextAware;
5658
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
5759
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
5860
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
@@ -79,7 +81,9 @@
7981
*/
8082
@SpringJUnitConfig
8183
@DirtiesContext
82-
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest, TestApplicationContextAware {
84+
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest {
85+
86+
private static final Log LOGGER = LogFactory.getLog(PostgresChannelMessageTableSubscriberTests.class);
8387

8488
private static final String INTEGRATION_DB_SCRIPTS = """
8589
CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
@@ -99,6 +103,9 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
99103
^^^ END OF SCRIPT ^^^
100104
""";
101105

106+
@Autowired
107+
private BeanFactory beanFactory;
108+
102109
@Autowired
103110
private JdbcChannelMessageStore messageStore;
104111

@@ -124,17 +131,16 @@ void setUp(TestInfo testInfo) {
124131

125132
this.taskExecutor = new ThreadPoolTaskExecutor();
126133
this.taskExecutor.setCorePoolSize(10);
127-
this.taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
128-
this.taskExecutor.setAwaitTerminationSeconds(10);
129134
this.taskExecutor.afterPropertiesSet();
130135

131136
this.groupId = testInfo.getDisplayName();
137+
messageStore.removeMessageGroup(this.groupId);
132138

133139
this.postgresSubscribableChannel =
134140
new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber);
135141
this.postgresSubscribableChannel.setBeanName("testPostgresChannel");
136142
this.postgresSubscribableChannel.setDispatcherExecutor(this.taskExecutor);
137-
this.postgresSubscribableChannel.setBeanFactory(TEST_INTEGRATION_CONTEXT);
143+
this.postgresSubscribableChannel.setBeanFactory(this.beanFactory);
138144
this.postgresSubscribableChannel.afterPropertiesSet();
139145
}
140146

@@ -314,45 +320,33 @@ public void testRenewConnection() throws Exception {
314320

315321
@Test
316322
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
317-
int numberOfMessages = 200;
323+
int numberOfMessages = 2000;
318324
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
319325
AtomicInteger receivedMessages = new AtomicInteger();
320326
AtomicReference<Throwable> receivedException = new AtomicReference<>();
321327

322328
postgresChannelMessageTableSubscriber.start();
323329
postgresSubscribableChannel.setErrorHandler(receivedException::set);
324330

325-
MessageHandler messageHandlerOne = message -> {
326-
receivedMessages.getAndIncrement();
327-
messagesUntilUnsubscribe.countDown();
328-
};
329-
postgresSubscribableChannel.subscribe(messageHandlerOne);
330-
331-
MessageHandler messageHandlerTwo = message -> {
331+
MessageHandler messageHandler = message -> {
332332
receivedMessages.getAndIncrement();
333333
messagesUntilUnsubscribe.countDown();
334334
};
335-
postgresSubscribableChannel.subscribe(messageHandlerTwo);
336-
337-
IntStream.range(0, numberOfMessages).forEach(i -> {
338-
taskExecutor.execute(() ->
339-
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
340-
});
335+
postgresSubscribableChannel.subscribe(messageHandler);
341336

342-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
343-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
344-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
345-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
346-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
337+
IntStream.range(0, numberOfMessages)
338+
.forEach(i -> messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
347339

348340
assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
349-
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
350-
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);
341+
postgresSubscribableChannel.unsubscribe(messageHandler);
351342

352-
int undeliveredMessages = messageStore.messageGroupSize(groupId);
353-
int processedMessages = undeliveredMessages + receivedMessages.get();
354-
assertThat(processedMessages).isEqualTo(numberOfMessages);
355343
assertThat(receivedException).hasNullValue();
344+
345+
int inStoreCount = messageStore.messageGroupSize(groupId);
346+
LOGGER.warn("inStoreCount: " + inStoreCount);
347+
int receivedCount = receivedMessages.get();
348+
LOGGER.warn("receivedCount: " + receivedCount);
349+
assertThat(inStoreCount + receivedCount).isEqualTo(numberOfMessages);
356350
}
357351

358352
@Configuration

0 commit comments

Comments
 (0)