Skip to content

Commit 5577066

Browse files
authored
BAEL-7988: Event-Driven LISTEN/NOTIFY Support in Java using PostgreSQL (#18582)
* BAEL-7988: Event-Driven LISTEN/NOTIFY Support in Java using PostgreSQL * Updated the tests to actually assert on the notifications * Used Awaitility in pgsql test * Swapped Thread.sleep for a blocking call * Test to show notifications from triggers
1 parent e9866ac commit 5577066

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed

persistence-modules/core-java-persistence-4/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,29 @@
4747
<artifactId>spring-boot-starter</artifactId>
4848
<version>${springframework.boot.spring-boot-starter.version}</version>
4949
</dependency>
50+
<dependency>
51+
<groupId>org.postgresql</groupId>
52+
<artifactId>postgresql</artifactId>
53+
<version>${postgresql.version}</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.impossibl.pgjdbc-ng</groupId>
57+
<artifactId>pgjdbc-ng</artifactId>
58+
<version>${pgjdbc-ng.version}</version>
59+
</dependency>
5060

5161
<dependency>
5262
<groupId>org.mockito</groupId>
5363
<artifactId>mockito-junit-jupiter</artifactId>
5464
<version>5.16.0</version>
5565
<scope>test</scope>
5666
</dependency>
67+
<dependency>
68+
<groupId>org.awaitility</groupId>
69+
<artifactId>awaitility</artifactId>
70+
<version>${awaitility.version}</version>
71+
<scope>test</scope>
72+
</dependency>
5773
</dependencies>
5874

5975
<build>
@@ -70,8 +86,10 @@
7086
</build>
7187

7288
<properties>
89+
<awaitility.version>4.3.0</awaitility.version>
7390
<h2.version>2.3.230</h2.version>
7491
<postgresql.version>42.7.3</postgresql.version>
92+
<pgjdbc-ng.version>0.8.9</pgjdbc-ng.version>
7593
<jdbc-stream.version>0.1.1</jdbc-stream.version>
7694
<junit-bom.version>5.11.1</junit-bom.version>
7795
<mysql-connecter-j.version>8.0.33</mysql-connecter-j.version>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package com.baeldung.listennotify;
2+
3+
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
4+
import org.junit.jupiter.api.Test;
5+
import org.postgresql.PGNotification;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.sql.*;
10+
import java.time.Duration;
11+
import java.util.HashSet;
12+
import java.util.Set;
13+
14+
import static org.awaitility.Awaitility.await;
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
17+
public class ListenNotifyLiveTest {
18+
private static final Logger LOG = LoggerFactory.getLogger(ListenNotifyLiveTest.class);
19+
20+
private static final String POSTGRES_URL = "jdbc:postgresql://localhost:5432/postgres";
21+
private static final String PGJDBC_URL = "jdbc:pgsql://localhost:5432/postgres";
22+
private static final String USERNAME = "postgres";
23+
private static final String PASSWORD = "mysecretpassword";
24+
25+
private void sendNotifications() throws SQLException{
26+
try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
27+
try (Statement statement = connection.createStatement()) {
28+
statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
29+
}
30+
31+
try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
32+
statement.setString(1, "my_channel");
33+
statement.setString(2, "Hello, pg_notify!");
34+
statement.execute();
35+
}
36+
}
37+
}
38+
39+
@Test
40+
void whenUsingPostgresqlDriver_thenNotificationsAreReceived() throws SQLException, InterruptedException {
41+
try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
42+
try (Statement statement = connection.createStatement()) {
43+
statement.execute("LISTEN my_channel");
44+
}
45+
46+
sendNotifications();
47+
48+
var pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
49+
Set<String> receivedNotifications = new HashSet<>();
50+
51+
while (receivedNotifications.size() < 2) {
52+
PGNotification[] notifications = pgConnection.getNotifications(0);
53+
if (notifications != null) {
54+
LOG.info("Received {} notifications", notifications.length);
55+
for (PGNotification notification : notifications) {
56+
LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
57+
notification.getName(),
58+
notification.getParameter(),
59+
notification.getPID());
60+
receivedNotifications.add(notification.getParameter());
61+
}
62+
}
63+
}
64+
65+
assertEquals(Set.of("Hello, NOTIFY!", "Hello, pg_notify!"), receivedNotifications);
66+
}
67+
}
68+
69+
@Test
70+
void whenUsingPgsqlDriver_thenNotificationsAreReceivedViaListener() throws SQLException, InterruptedException {
71+
try (Connection connection = DriverManager.getConnection(PGJDBC_URL, USERNAME, PASSWORD)) {
72+
try (Statement statement = connection.createStatement()) {
73+
statement.execute("LISTEN my_channel");
74+
}
75+
76+
var pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
77+
78+
Listener pgNotificationListener = new Listener();
79+
pgConnection.addNotificationListener(pgNotificationListener);
80+
81+
sendNotifications();
82+
83+
await()
84+
.atMost(Duration.ofSeconds(5))
85+
.until(() -> pgNotificationListener.receivedNotifications.size() == 2);
86+
87+
assertEquals(Set.of("Hello, NOTIFY!", "Hello, pg_notify!"), pgNotificationListener.receivedNotifications);
88+
89+
}
90+
}
91+
92+
@Test
93+
void whenUsingTriggers_thenNotificationsAreSent() throws SQLException {
94+
try (Connection connection = DriverManager.getConnection(POSTGRES_URL, USERNAME, PASSWORD)) {
95+
// First set up the database state
96+
try (Statement statement = connection.createStatement()) {
97+
statement.execute("CREATE TABLE IF NOT EXISTS listen_notify_trigger(id INT PRIMARY KEY)");
98+
statement.execute("TRUNCATE listen_notify_trigger");
99+
100+
statement.execute("""
101+
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $$
102+
BEGIN
103+
PERFORM pg_notify('table_change', TG_TABLE_NAME);
104+
RETURN NEW;
105+
END;
106+
$$ LANGUAGE plpgsql;
107+
""");
108+
109+
statement.execute("""
110+
CREATE OR REPLACE TRIGGER table_change
111+
AFTER INSERT OR UPDATE OR DELETE ON listen_notify_trigger
112+
FOR EACH ROW EXECUTE PROCEDURE notify_table_change();
113+
""");
114+
}
115+
116+
try (Statement statement = connection.createStatement()) {
117+
statement.execute("LISTEN table_change");
118+
}
119+
120+
try (Statement statement = connection.createStatement()) {
121+
statement.execute("INSERT INTO listen_notify_trigger(id) VALUES (1)");
122+
}
123+
124+
var pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
125+
Set<String> receivedNotifications = new HashSet<>();
126+
127+
while (receivedNotifications.isEmpty()) {
128+
PGNotification[] notifications = pgConnection.getNotifications(0);
129+
if (notifications != null) {
130+
LOG.info("Received {} notifications", notifications.length);
131+
for (PGNotification notification : notifications) {
132+
LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
133+
notification.getName(),
134+
notification.getParameter(),
135+
notification.getPID());
136+
receivedNotifications.add(notification.getName() + " - " + notification.getParameter());
137+
}
138+
}
139+
}
140+
141+
assertEquals(Set.of("table_change - listen_notify_trigger"), receivedNotifications);
142+
}
143+
}
144+
145+
private static class Listener implements PGNotificationListener {
146+
Set<String> receivedNotifications = new HashSet<>();
147+
148+
@Override
149+
public void notification(int processId, String channelName, String payload) {
150+
LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
151+
channelName, payload, processId);
152+
receivedNotifications.add(payload);
153+
}
154+
}
155+
}

0 commit comments

Comments
 (0)