Skip to content

Commit fa6494f

Browse files
joshistespring-builds
authored andcommitted
GH-9061: renew connection in PostgresChannelMessageTableSubscriber
Fixes: #9061 `PostgresChannelMessageTableSubscriber` never renews the connection. This causes problems on DB failover. With this change the connection is renewed when notifications are not received for a certain time. (cherry picked from commit 642278d)
1 parent 226bca3 commit fa6494f

File tree

2 files changed

+77
-17
lines changed

2 files changed

+77
-17
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.sql.SQLException;
2020
import java.sql.Statement;
21+
import java.time.Duration;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import java.util.concurrent.CompletableFuture;
@@ -64,6 +65,7 @@
6465
* @author Artem Bilan
6566
* @author Igor Lovich
6667
* @author Christian Tzolov
68+
* @author Johannes Edmeier
6769
*
6870
* @since 6.0
6971
*/
@@ -88,6 +90,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
8890
@Nullable
8991
private volatile PgConnection connection;
9092

93+
private Duration notificationTimeout = Duration.ofSeconds(60);
94+
9195
/**
9296
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
9397
* @param connectionSupplier The connection supplier for the targeted Postgres database.
@@ -136,6 +140,19 @@ public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
136140
this.taskExecutor = taskExecutor;
137141
}
138142

143+
/**
144+
* Set the timeout for the notification polling.
145+
* If for the specified duration no notificiation are received the underlying connection is closed and re-established.
146+
* Setting a value of {@code Duration.ZERO} will disable the timeout and wait forever.
147+
* This might cause problems in DB failover scenarios.
148+
* @param notificationTimeout the timeout for the notification polling.
149+
* @since 6.1.8
150+
*/
151+
public void setNotificationTimeout(Duration notificationTimeout) {
152+
Assert.notNull(notificationTimeout, "'notificationTimeout' must not be null.");
153+
this.notificationTimeout = notificationTimeout;
154+
}
155+
139156
/**
140157
* Add a new subscription to this subscriber.
141158
* @param subscription The subscription to register.
@@ -213,24 +230,28 @@ private void doStart(CountDownLatch startingLatch) {
213230
while (isActive()) {
214231
startingLatch.countDown();
215232

216-
PGNotification[] notifications = conn.getNotifications(0);
233+
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
217234
// Unfortunately, there is no good way of interrupting a notification
218235
// poll but by closing its connection.
219236
if (!isActive()) {
220237
return;
221238
}
222-
if (notifications != null) {
223-
for (PGNotification notification : notifications) {
224-
String parameter = notification.getParameter();
225-
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
226-
if (subscriptions == null) {
227-
continue;
228-
}
229-
for (Subscription subscription : subscriptions) {
230-
subscription.notifyUpdate();
231-
}
239+
if (notifications == null || notifications.length == 0) {
240+
//We did not receive any notifications within the timeout period.
241+
//We will close the connection and re-establish it.
242+
break;
243+
}
244+
for (PGNotification notification : notifications) {
245+
String parameter = notification.getParameter();
246+
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
247+
if (subscriptions == null) {
248+
continue;
249+
}
250+
for (Subscription subscription : subscriptions) {
251+
subscription.notifyUpdate();
232252
}
233253
}
254+
234255
}
235256
}
236257
finally {

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.integration.jdbc.channel;
1818

1919
import java.sql.DriverManager;
20+
import java.sql.SQLException;
21+
import java.time.Duration;
2022
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,7 @@
6264
* @author Artem Bilan
6365
* @author Igor Lovich
6466
* @author Adama Sorho
67+
* @author Johannes Edmeier
6568
*
6669
* @since 6.0
6770
*/
@@ -102,15 +105,14 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
102105

103106
private String groupId;
104107

108+
private ConnectionSupplier connectionSupplier;
109+
105110
@BeforeEach
106111
void setUp(TestInfo testInfo) {
107112
// Not initiated as a bean to allow for registrations prior and post the life cycle
108-
this.postgresChannelMessageTableSubscriber =
109-
new PostgresChannelMessageTableSubscriber(() ->
110-
DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
111-
POSTGRES_CONTAINER.getUsername(),
112-
POSTGRES_CONTAINER.getPassword())
113-
.unwrap(PgConnection.class));
113+
this.connectionSupplier = new ConnectionSupplier();
114+
this.postgresChannelMessageTableSubscriber = new PostgresChannelMessageTableSubscriber(connectionSupplier);
115+
this.postgresChannelMessageTableSubscriber.setNotificationTimeout(Duration.ofSeconds(5));
114116

115117

116118
this.taskExecutor = new ThreadPoolTaskExecutor();
@@ -263,6 +265,26 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup
263265
assertThat(payloads).containsExactly("1");
264266
}
265267

268+
@Test
269+
public void testRenewConnection() throws Exception {
270+
CountDownLatch latch = new CountDownLatch(2);
271+
List<Object> payloads = new ArrayList<>();
272+
CountDownLatch connectionLatch = new CountDownLatch(2);
273+
connectionSupplier.onGetConnection = connectionLatch::countDown;
274+
postgresChannelMessageTableSubscriber.start();
275+
postgresSubscribableChannel.subscribe(message -> {
276+
payloads.add(message.getPayload());
277+
latch.countDown();
278+
});
279+
280+
assertThat(connectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
281+
282+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
283+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
284+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
285+
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
286+
}
287+
266288
@Configuration
267289
@EnableIntegration
268290
public static class Config {
@@ -302,4 +324,21 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
302324

303325
}
304326

327+
private static class ConnectionSupplier implements PgConnectionSupplier {
328+
329+
Runnable onGetConnection;
330+
331+
@Override
332+
public PgConnection get() throws SQLException {
333+
var conn = DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
334+
POSTGRES_CONTAINER.getUsername(),
335+
POSTGRES_CONTAINER.getPassword())
336+
.unwrap(PgConnection.class);
337+
if (this.onGetConnection != null) {
338+
this.onGetConnection.run();
339+
}
340+
return conn;
341+
}
342+
343+
}
305344
}

0 commit comments

Comments
 (0)