Skip to content

Commit 76b7fb9

Browse files
authored
Merge pull request #5 from evgeniy-khist/postgresq-listen-notify
Use PostgreSQL Listen/Notify instead of DB polling. Resolves #3
2 parents 7322a35 + 95070ac commit 76b7fb9

12 files changed

+248
-38
lines changed

README.md

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
- [Synchronously updating projections](#4-6)
2222
- [Asynchronously sending integration events to a message broker](#4-7)
2323
- [Reliable transactional outbox with PostgreSQL](#4-7-1)
24-
- [Database polling alternatives](#4-7-2)
24+
- [Database polling](#4-7-2)
25+
- [Database polling alternative](#4-7-3)
2526
- [Adding new asynchronous event handlers](#4-8)
2627
- [Class diagrams](#4-9)
2728
- [Class diagram of the domain model](#4-9-1)
@@ -272,7 +273,7 @@ INSERT INTO ES_AGGREGATE_SNAPSHOT (AGGREGATE_ID, VERSION, JSON_DATA)
272273
VALUES (:aggregateId, :version, :jsonObj::json)
273274
```
274275
275-
Snapshotting for an aggregate type can be disabled and configured in the `application.yml`
276+
Snapshotting for an aggregate type can be disabled and configured in the [`application.yml`](src/main/resources/application.yml)
276277
277278
```yaml
278279
event-sourcing:
@@ -417,33 +418,63 @@ the events it created won't be read by the event subscription processor until tr
417418

418419
![PostgreSQL reliable transactional outbox](img/postgresql-reliable-outbox.svg)
419420

420-
Event subscription processing can be disabled and configured in the `application.yml`
421+
#### <a id="4-7-2"></a>Database polling
422+
423+
To get new events from the `ES_EVENT` table, the application has to poll the database.
424+
The shorter the polling period, the shorter the delay between persisting a new event and processing it by the subscription.
425+
But the lag is inevitable. If the polling period is 1 second, then the lag is at most 1 second.
426+
427+
The polling mechanism implementation [ScheduledEventSubscriptionProcessor](src/main/java/com/example/eventsourcing/service/ScheduledEventSubscriptionProcessor.java)
428+
uses a Spring annotation [@Scheduled](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/annotation/Scheduled.html) to poll database with a fixed period.
429+
430+
The polling event subscription processing can be enabled and configured in the [`application.yml`](src/main/resources/application.yml)
421431

422432
```yaml
423433
event-sourcing:
424-
subscriptions:
425-
enabled: true
434+
subscriptions: polling # Enable database polling subscription processing
435+
polling-subscriptions:
426436
polling-initial-delay: PT1S
427437
polling-interval: PT1S
428438
```
429439

430-
#### <a id="4-7-2"></a>Database polling alternatives
440+
#### <a id="4-7-3"></a>Database polling alternative
431441

432-
PostgreSQL `LISTEN`/`NOTIFY` functionality can be used instead of polling.
442+
To reduce the lag associated with database polling, the polling period can be set to a very low value,
443+
such as 1 second.
444+
But this means that there will be 3600 database queries per hour and 86400 per day, even if there are no new events.
433445

434-
A key limitation of the PostgreSQL JDBC driver is that it cannot receive asynchronous notifications
446+
PostgreSQL `LISTEN` and `NOTIFY` feature can be used instead of polling.
447+
This mechanism allows for sending asynchronous notifications across database connections.
448+
Notifications are not sent directly from the application,
449+
but via the database [trigger](src/main/resources/db/migration/V2__notify_trigger.sql) on a table.
450+
451+
To use this functionality an unshared [PgConnection](https://jdbc.postgresql.org/documentation/publicapi/org/postgresql/jdbc/PgConnection.html)
452+
which remains open is required.
453+
The long-lived dedicated JDBC `Connection` for receiving notifications has to be created using the `DriverManager` API,
454+
instead of getting from a pooled `DataSource`.
455+
456+
PostgreSQL JDBC driver can't receive asynchronous notifications
435457
and must poll the backend to check if any notifications were issued.
436-
A timeout can be given to the poll function,
458+
A timeout can be given to the poll function `getNotifications(int timeoutMillis)`,
437459
but then the execution of statements from other threads will block.
460+
When `timeoutMillis` = 0, blocks forever or until at least one notification has been received.
461+
It means that notification is delivered almost immediately, without a lag.
462+
If more than one notification is about to be received, these will be returned in one batch.
463+
464+
This solution significantly reduces the number of issued queries
465+
and also solves the lag problem that the polling solution suffers from.
438466
439-
Thus, the creation of a long-lived dedicated JDBC `Connection` for receiving notifications is required.
440-
This connection should not be obtained from a pooled DataSources.
441-
Instead, a dedicated `Connection` has to be created using the `DriverManager` API.
467+
The Listen/Notify mechanism implementation [PostgresChannelEventSubscriptionProcessor](src/main/java/com/example/eventsourcing/service/PostgresChannelEventSubscriptionProcessor.java)
468+
is inspired by the Spring Integration class [PostgresChannelMessageTableSubscriber](https://github.com/spring-projects/spring-integration/blob/v6.0.0/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java).
469+
470+
The Listen/Notify event subscription processing can be enabled in the [`application.yml`](src/main/resources/application.yml)
471+
472+
```yaml
473+
event-sourcing:
474+
subscriptions: postgres-channel # Enable Listen/Notify event subscription processing
475+
```
442476
443-
In practice, implementations based on PostgreSQL `LISTEN`/`NOTIFY` are quite complex.
444-
For example,
445-
[PostgresChannelMessageTableSubscriber](https://github.com/spring-projects/spring-integration/blob/v6.0.0/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java)
446-
from the Spring Integration.
477+
This mechanism is used by default as more efficient.
447478
448479
### <a id="4-8"></a>Adding new asynchronous event handlers
449480
@@ -520,7 +551,7 @@ Using PostgreSQL as an event store has a lot of advantages, but there are also d
520551
521552
7. Run E2E tests and see the output
522553
```bash
523-
E2E_TESTING=true; ./gradlew clean test -i
554+
E2E_TESTING=true ./gradlew clean test -i
524555
```
525556
526557
8. Explore the database using the Adminer database management tool at http://localhost:8181.

src/main/java/com/example/eventsourcing/PostgreSqlEventSourcingApplication.java renamed to src/main/java/com/example/eventsourcing/PostgresEventSourcingApplication.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
@SpringBootApplication
99
@EnableScheduling
1010
@EnableAsync
11-
public class PostgreSqlEventSourcingApplication {
11+
public class PostgresEventSourcingApplication {
1212

1313
public static void main(String[] args) {
14-
SpringApplication.run(PostgreSqlEventSourcingApplication.class, args);
14+
SpringApplication.run(PostgresEventSourcingApplication.class, args);
1515
}
1616
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package com.example.eventsourcing.service;
2+
3+
import com.example.eventsourcing.service.event.AsyncEventHandler;
4+
import jakarta.annotation.PostConstruct;
5+
import jakarta.annotation.PreDestroy;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.postgresql.PGNotification;
9+
import org.postgresql.jdbc.PgConnection;
10+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
11+
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
12+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
13+
import org.springframework.stereotype.Component;
14+
15+
import java.sql.DriverManager;
16+
import java.sql.SQLException;
17+
import java.sql.Statement;
18+
import java.util.List;
19+
import java.util.concurrent.*;
20+
21+
@Component
22+
@ConditionalOnProperty(name = "event-sourcing.subscriptions", havingValue = "postgres-channel")
23+
@RequiredArgsConstructor
24+
@Slf4j
25+
public class PostgresChannelEventSubscriptionProcessor {
26+
27+
private final List<AsyncEventHandler> eventHandlers;
28+
private final EventSubscriptionProcessor eventSubscriptionProcessor;
29+
private final DataSourceProperties dataSourceProperties;
30+
private final ExecutorService executor = newExecutor();
31+
private CountDownLatch latch = new CountDownLatch(0);
32+
private Future<?> future = CompletableFuture.completedFuture(null);
33+
private volatile PgConnection connection;
34+
35+
private static ExecutorService newExecutor() {
36+
CustomizableThreadFactory threadFactory =
37+
new CustomizableThreadFactory("postgres-channel-event-subscription-");
38+
threadFactory.setDaemon(true);
39+
return Executors.newSingleThreadExecutor(threadFactory);
40+
}
41+
42+
@PostConstruct
43+
public synchronized void start() {
44+
if (this.latch.getCount() > 0) {
45+
return;
46+
}
47+
this.latch = new CountDownLatch(1);
48+
this.future = executor.submit(() -> {
49+
try {
50+
while (isActive()) {
51+
try {
52+
PgConnection conn = getPgConnection();
53+
try (Statement stmt = conn.createStatement()) {
54+
stmt.execute("LISTEN channel_event_notify");
55+
} catch (Exception ex) {
56+
try {
57+
conn.close();
58+
} catch (Exception suppressed) {
59+
ex.addSuppressed(suppressed);
60+
}
61+
throw ex;
62+
}
63+
64+
this.eventHandlers.forEach(this::processNewEvents);
65+
66+
try {
67+
this.connection = conn;
68+
while (isActive()) {
69+
PGNotification[] notifications = conn.getNotifications(0);
70+
// Unfortunately, there is no good way of interrupting a notification
71+
// poll but by closing its connection.
72+
if (!isActive()) {
73+
return;
74+
}
75+
if (notifications != null) {
76+
for (PGNotification notification : notifications) {
77+
String parameter = notification.getParameter();
78+
eventHandlers.stream()
79+
.filter(eventHandler -> eventHandler.getAggregateType().toString().equals(parameter))
80+
.forEach(this::processNewEvents);
81+
}
82+
}
83+
}
84+
} finally {
85+
conn.close();
86+
}
87+
} catch (Exception e) {
88+
// The getNotifications method does not throw a meaningful message on interruption.
89+
// Therefore, we do not log an error, unless it occurred while active.
90+
if (isActive()) {
91+
log.error("Failed to poll notifications from Postgres database", e);
92+
}
93+
}
94+
}
95+
} finally {
96+
this.latch.countDown();
97+
}
98+
});
99+
}
100+
101+
private PgConnection getPgConnection() throws SQLException {
102+
return DriverManager.getConnection(
103+
dataSourceProperties.determineUrl(),
104+
dataSourceProperties.determineUsername(),
105+
dataSourceProperties.determinePassword()
106+
).unwrap(PgConnection.class);
107+
}
108+
109+
private boolean isActive() {
110+
if (Thread.interrupted()) {
111+
Thread.currentThread().interrupt();
112+
return false;
113+
}
114+
return true;
115+
}
116+
117+
private void processNewEvents(AsyncEventHandler eventHandler) {
118+
try {
119+
eventSubscriptionProcessor.processNewEvents(eventHandler);
120+
} catch (Exception e) {
121+
log.warn("Failed to handle new events for subscription %s"
122+
.formatted(eventHandler.getSubscriptionName()), e);
123+
}
124+
}
125+
126+
@PreDestroy
127+
public synchronized void stop() {
128+
if (this.future.isDone()) {
129+
return;
130+
}
131+
this.future.cancel(true);
132+
PgConnection conn = this.connection;
133+
if (conn != null) {
134+
try {
135+
conn.close();
136+
} catch (SQLException ignored) {
137+
}
138+
}
139+
try {
140+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
141+
throw new IllegalStateException(
142+
"Failed to stop %s".formatted(PostgresChannelEventSubscriptionProcessor.class.getName()));
143+
}
144+
} catch (InterruptedException ignored) {
145+
}
146+
}
147+
}

src/main/java/com/example/eventsourcing/service/ScheduledEventSubscriptionProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import java.util.List;
1111

1212
@Component
13-
@ConditionalOnProperty(name = "event-sourcing.subscriptions.enabled", havingValue = "true")
13+
@ConditionalOnProperty(name = "event-sourcing.subscriptions", havingValue = "polling")
1414
@RequiredArgsConstructor
1515
@Slf4j
1616
public class ScheduledEventSubscriptionProcessor {
@@ -19,8 +19,8 @@ public class ScheduledEventSubscriptionProcessor {
1919
private final EventSubscriptionProcessor eventSubscriptionProcessor;
2020

2121
@Scheduled(
22-
fixedDelayString = "${event-sourcing.subscriptions.polling-interval}",
23-
initialDelayString = "${event-sourcing.subscriptions.polling-initial-delay}"
22+
fixedDelayString = "${event-sourcing.polling-subscriptions.polling-interval}",
23+
initialDelayString = "${event-sourcing.polling-subscriptions.polling-initial-delay}"
2424
)
2525
public void processNewEvents() {
2626
eventHandlers.forEach(this::processNewEvents);

src/main/resources/application.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ event-sourcing:
1717
enabled: true
1818
# Create a snapshot on every nth event
1919
nth-event: 10
20-
subscriptions:
21-
enabled: true
20+
# Available subscription: polling, postgres-channel
21+
subscriptions: postgres-channel
22+
polling-subscriptions:
2223
polling-initial-delay: PT1S
2324
polling-interval: PT1S

src/main/resources/db/migration/V1__init.sql

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS ES_AGGREGATE (
44
AGGREGATE_TYPE TEXT NOT NULL
55
);
66

7-
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_AGGREGATE_TYPE on ES_AGGREGATE (AGGREGATE_TYPE);
7+
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_AGGREGATE_TYPE ON ES_AGGREGATE (AGGREGATE_TYPE);
88

99
CREATE TABLE IF NOT EXISTS ES_EVENT (
1010
ID BIGSERIAL PRIMARY KEY,
@@ -16,9 +16,9 @@ CREATE TABLE IF NOT EXISTS ES_EVENT (
1616
UNIQUE (AGGREGATE_ID, VERSION)
1717
);
1818

19-
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_TRANSACTION_ID_ID on ES_EVENT (TRANSACTION_ID, ID);
20-
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_AGGREGATE_ID on ES_EVENT (AGGREGATE_ID);
21-
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_VERSION on ES_EVENT (VERSION);
19+
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_TRANSACTION_ID_ID ON ES_EVENT (TRANSACTION_ID, ID);
20+
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_AGGREGATE_ID ON ES_EVENT (AGGREGATE_ID);
21+
CREATE INDEX IF NOT EXISTS IDX_ES_EVENT_VERSION ON ES_EVENT (VERSION);
2222

2323
CREATE TABLE IF NOT EXISTS ES_AGGREGATE_SNAPSHOT (
2424
AGGREGATE_ID UUID NOT NULL REFERENCES ES_AGGREGATE (ID),
@@ -27,8 +27,8 @@ CREATE TABLE IF NOT EXISTS ES_AGGREGATE_SNAPSHOT (
2727
PRIMARY KEY (AGGREGATE_ID, VERSION)
2828
);
2929

30-
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_SNAPSHOT_AGGREGATE_ID on ES_AGGREGATE_SNAPSHOT (AGGREGATE_ID);
31-
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_SNAPSHOT_VERSION on ES_AGGREGATE_SNAPSHOT (VERSION);
30+
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_SNAPSHOT_AGGREGATE_ID ON ES_AGGREGATE_SNAPSHOT (AGGREGATE_ID);
31+
CREATE INDEX IF NOT EXISTS IDX_ES_AGGREGATE_SNAPSHOT_VERSION ON ES_AGGREGATE_SNAPSHOT (VERSION);
3232

3333
CREATE TABLE IF NOT EXISTS ES_EVENT_SUBSCRIPTION (
3434
SUBSCRIPTION_NAME TEXT PRIMARY KEY,
@@ -57,4 +57,4 @@ CREATE TABLE IF NOT EXISTS RM_ORDER_ROUTE (
5757
PRIMARY KEY (ORDER_ID, ADDRESS)
5858
);
5959

60-
CREATE INDEX IF NOT EXISTS IDX_RM_ORDER_ROUTE_ORDER_ID on RM_ORDER_ROUTE (ORDER_ID);
60+
CREATE INDEX IF NOT EXISTS IDX_RM_ORDER_ROUTE_ORDER_ID ON RM_ORDER_ROUTE (ORDER_ID);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
CREATE OR REPLACE FUNCTION CHANNEL_EVENT_NOTIFY_FCT()
2+
RETURNS TRIGGER AS
3+
$BODY$
4+
DECLARE
5+
aggregate_type TEXT;
6+
BEGIN
7+
SELECT a.AGGREGATE_TYPE INTO aggregate_type FROM ES_AGGREGATE a WHERE a.ID = NEW.AGGREGATE_ID;
8+
PERFORM pg_notify('channel_event_notify', aggregate_type);
9+
RETURN NEW;
10+
END;
11+
$BODY$
12+
LANGUAGE PLPGSQL;
13+
14+
CREATE OR REPLACE TRIGGER CHANNEL_EVENT_NOTIFY_TRG
15+
AFTER INSERT ON ES_EVENT
16+
FOR EACH ROW
17+
EXECUTE PROCEDURE CHANNEL_EVENT_NOTIFY_FCT();
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.example.eventsourcing;
2+
3+
import org.springframework.test.context.TestPropertySource;
4+
5+
@TestPropertySource(properties = "event-sourcing.subscriptions=postgres-channel")
6+
class ListenNotifyPostgresEventSourcingApplicationTests extends PostgresEventSourcingApplicationTests {
7+
}

src/test/java/com/example/eventsourcing/OrderTestScript.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ public class OrderTestScript {
3939
private final String kafkaBrokers;
4040
private final ObjectMapper objectMapper = new ObjectMapper();
4141
private final BasicJsonTester jsonTester = new BasicJsonTester(getClass());
42-
43-
@SneakyThrows
44-
public void execute() {
42+
43+
public void execute() throws JsonProcessingException {
4544
log.info("Place a new order");
4645
UUID orderId = placeOrder("""
4746
{
@@ -193,6 +192,7 @@ public void execute() {
193192
log.info("Print integration events");
194193
var kafkaConsumer = createKafkaConsumer(KafkaTopicsConfig.TOPIC_ORDER_EVENTS);
195194
List<String> kafkaRecordValues = getKafkaRecords(kafkaConsumer, Duration.ofSeconds(10), 23);
195+
kafkaConsumer.unsubscribe();
196196
assertThat(kafkaRecordValues)
197197
.hasSizeGreaterThanOrEqualTo(23);
198198

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.example.eventsourcing;
2+
3+
import org.springframework.test.context.TestPropertySource;
4+
5+
@TestPropertySource(properties = "event-sourcing.subscriptions=polling")
6+
class PollingPostgresEventSourcingApplicationTests extends PostgresEventSourcingApplicationTests {
7+
}

0 commit comments

Comments
 (0)