Skip to content

Commit 819af58

Browse files
artembilanspring-builds
authored andcommitted
Some cleanup for PostgresSubscribableChannel
The `PostgresChannelMessageTableSubscriberTests.testUnsubscribeHandlerDuringDispatch()` is not stable * Fix the test to use less stress on unsubscription. It is still hard to reach consensus with a fast reader which is able to take a lock again and again in its loop. * Use `fair` for `ReentrantReadWriteLock` in the `PostgresSubscribableChannel`. However, it still doesn't show entropy in the test execution * Fix typos in the `PostgresChannelMessageTableSubscriber` * Add logging for the `inStoreCount` & `receivedCount` in the `testUnsubscribeHandlerDuringDispatch()` # Conflicts: # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java (cherry picked from commit 742e125)
1 parent 5a7cccf commit 819af58

File tree

3 files changed

+28
-34
lines changed

3 files changed

+28
-34
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,7 @@ public void start() {
168168
this.latch = new CountDownLatch(1);
169169

170170
CountDownLatch startingLatch = new CountDownLatch(1);
171-
this.future = this.taskExecutor.submit(() -> {
172-
doStart(startingLatch);
173-
});
171+
this.future = this.taskExecutor.submit(() -> doStart(startingLatch));
174172

175173
try {
176174
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
@@ -242,7 +240,7 @@ private void doStart(CountDownLatch startingLatch) {
242240
}
243241
catch (Exception e) {
244242
// The getNotifications method does not throw a meaningful message on interruption.
245-
// Therefore, we do not log an error, unless it occurred while active.
243+
// Therefore, we do not log an error unless it occurred while active.
246244
if (isActive()) {
247245
LOGGER.error(e, "Failed to poll notifications from Postgres database");
248246
}
@@ -252,7 +250,6 @@ private void doStart(CountDownLatch startingLatch) {
252250
finally {
253251
this.latch.countDown();
254252
}
255-
256253
}
257254

258255
private boolean isActive() {
@@ -285,6 +282,7 @@ public void stop() {
285282
}
286283
}
287284
catch (InterruptedException ignored) {
285+
Thread.currentThread().interrupt();
288286
}
289287
}
290288
finally {
@@ -311,8 +309,8 @@ public interface Subscription {
311309
/**
312310
* Indicate that a message was added to the represented region and
313311
* group id. Note that this method might also be invoked if there are
314-
* no new messages to read, for example if another subscription already
315-
* read those messages or if a new messages might have arrived during
312+
* no new messages to read, for example, if another subscription already
313+
* read those messages or if a new message might have arrived during
316314
* a temporary connection loss.
317315
*/
318316
void notifyUpdate();

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6767

6868
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
6969

70-
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock();
70+
private final ReentrantReadWriteLock hasHandlersLock = new ReentrantReadWriteLock(true);
7171

7272
private TransactionTemplate transactionTemplate;
7373

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import javax.sql.DataSource;
3333

3434
import org.apache.commons.dbcp2.BasicDataSource;
35+
import org.apache.commons.logging.Log;
36+
import org.apache.commons.logging.LogFactory;
3537
import org.junit.jupiter.api.AfterEach;
3638
import org.junit.jupiter.api.BeforeEach;
3739
import org.junit.jupiter.api.Test;
@@ -40,6 +42,7 @@
4042
import org.junit.jupiter.params.provider.ValueSource;
4143
import org.postgresql.jdbc.PgConnection;
4244

45+
import org.springframework.beans.factory.BeanFactory;
4346
import org.springframework.beans.factory.annotation.Autowired;
4447
import org.springframework.context.annotation.Bean;
4548
import org.springframework.context.annotation.Configuration;
@@ -79,6 +82,8 @@
7982
@DirtiesContext
8083
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest {
8184

85+
private static final Log LOGGER = LogFactory.getLog(PostgresChannelMessageTableSubscriberTests.class);
86+
8287
private static final String INTEGRATION_DB_SCRIPTS = """
8388
CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
8489
RETURNS TRIGGER AS
@@ -98,6 +103,9 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
98103
^^^ END OF SCRIPT ^^^
99104
""";
100105

106+
@Autowired
107+
private BeanFactory beanFactory;
108+
101109
@Autowired
102110
private JdbcChannelMessageStore messageStore;
103111

@@ -123,16 +131,16 @@ void setUp(TestInfo testInfo) {
123131

124132
this.taskExecutor = new ThreadPoolTaskExecutor();
125133
this.taskExecutor.setCorePoolSize(10);
126-
this.taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
127-
this.taskExecutor.setAwaitTerminationSeconds(10);
128134
this.taskExecutor.afterPropertiesSet();
129135

130136
this.groupId = testInfo.getDisplayName();
137+
messageStore.removeMessageGroup(this.groupId);
131138

132139
this.postgresSubscribableChannel =
133140
new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber);
134141
this.postgresSubscribableChannel.setBeanName("testPostgresChannel");
135142
this.postgresSubscribableChannel.setDispatcherExecutor(this.taskExecutor);
143+
this.postgresSubscribableChannel.setBeanFactory(this.beanFactory);
136144
this.postgresSubscribableChannel.afterPropertiesSet();
137145
}
138146

@@ -303,45 +311,33 @@ public void testRenewConnection() throws Exception {
303311

304312
@Test
305313
public void testUnsubscribeHandlerDuringDispatch() throws InterruptedException {
306-
int numberOfMessages = 200;
314+
int numberOfMessages = 2000;
307315
CountDownLatch messagesUntilUnsubscribe = new CountDownLatch(10);
308316
AtomicInteger receivedMessages = new AtomicInteger();
309317
AtomicReference<Throwable> receivedException = new AtomicReference<>();
310318

311319
postgresChannelMessageTableSubscriber.start();
312320
postgresSubscribableChannel.setErrorHandler(receivedException::set);
313321

314-
MessageHandler messageHandlerOne = message -> {
315-
receivedMessages.getAndIncrement();
316-
messagesUntilUnsubscribe.countDown();
317-
};
318-
postgresSubscribableChannel.subscribe(messageHandlerOne);
319-
320-
MessageHandler messageHandlerTwo = message -> {
322+
MessageHandler messageHandler = message -> {
321323
receivedMessages.getAndIncrement();
322324
messagesUntilUnsubscribe.countDown();
323325
};
324-
postgresSubscribableChannel.subscribe(messageHandlerTwo);
325-
326-
IntStream.range(0, numberOfMessages).forEach(i -> {
327-
taskExecutor.execute(() ->
328-
messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
329-
});
326+
postgresSubscribableChannel.subscribe(messageHandler);
330327

331-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
332-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
333-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
334-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
335-
taskExecutor.execute(postgresSubscribableChannel::notifyUpdate);
328+
IntStream.range(0, numberOfMessages)
329+
.forEach(i -> messageStore.addMessageToGroup(groupId, new GenericMessage<>(i)));
336330

337331
assertThat(messagesUntilUnsubscribe.await(10, TimeUnit.SECONDS)).isTrue();
338-
postgresSubscribableChannel.unsubscribe(messageHandlerOne);
339-
postgresSubscribableChannel.unsubscribe(messageHandlerTwo);
332+
postgresSubscribableChannel.unsubscribe(messageHandler);
340333

341-
int undeliveredMessages = messageStore.messageGroupSize(groupId);
342-
int processedMessages = undeliveredMessages + receivedMessages.get();
343-
assertThat(processedMessages).isEqualTo(numberOfMessages);
344334
assertThat(receivedException).hasNullValue();
335+
336+
int inStoreCount = messageStore.messageGroupSize(groupId);
337+
LOGGER.warn("inStoreCount: " + inStoreCount);
338+
int receivedCount = receivedMessages.get();
339+
LOGGER.warn("receivedCount: " + receivedCount);
340+
assertThat(inStoreCount + receivedCount).isEqualTo(numberOfMessages);
345341
}
346342

347343
@Configuration

0 commit comments

Comments
 (0)