Skip to content

Commit 459f5aa

Browse files
committed
test: add a notifications stream test
1 parent 297c383 commit 459f5aa

File tree

3 files changed

+57
-1
lines changed

3 files changed

+57
-1
lines changed

src/pg/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ impl AsyncPgConnection {
729729
.on_connection_event(event);
730730
}
731731

732-
pub fn notification_stream(
732+
pub fn notifications_stream(
733733
&mut self,
734734
) -> impl futures_core::Stream<Item = QueryResult<diesel::pg::PgNotification>> + '_ {
735735
match &mut self.notification_rx {

tests/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::fmt::Debug;
77
#[cfg(feature = "postgres")]
88
mod custom_types;
99
mod instrumentation;
10+
mod notifications;
1011
#[cfg(any(feature = "bb8", feature = "deadpool", feature = "mobc"))]
1112
mod pooling;
1213
#[cfg(feature = "async-connection-wrapper")]

tests/notifications.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#[cfg(feature = "postgres")]
2+
#[tokio::test]
3+
async fn notifications_arrive() {
4+
use diesel_async::RunQueryDsl;
5+
use futures_util::{StreamExt, TryStreamExt};
6+
7+
let conn = &mut super::connection_without_transaction().await;
8+
9+
diesel::sql_query("LISTEN test_notifications")
10+
.execute(conn)
11+
.await
12+
.unwrap();
13+
14+
diesel::sql_query("NOTIFY test_notifications, 'first'")
15+
.execute(conn)
16+
.await
17+
.unwrap();
18+
19+
diesel::sql_query("NOTIFY test_notifications, 'second'")
20+
.execute(conn)
21+
.await
22+
.unwrap();
23+
24+
let notifications = conn
25+
.notifications_stream()
26+
.take(2)
27+
.try_collect::<Vec<_>>()
28+
.await
29+
.unwrap();
30+
31+
assert_eq!(2, notifications.len());
32+
assert_eq!(notifications[0].channel, "test_notifications");
33+
assert_eq!(notifications[1].channel, "test_notifications");
34+
assert_eq!(notifications[0].payload, "first");
35+
assert_eq!(notifications[1].payload, "second");
36+
37+
let next_notification = tokio::time::timeout(
38+
std::time::Duration::from_secs(1),
39+
std::pin::pin!(conn.notifications_stream()).next(),
40+
)
41+
.await;
42+
43+
assert!(
44+
next_notification.is_err(),
45+
"Got a next notification, while not expecting one: {next_notification:?}"
46+
);
47+
48+
diesel::sql_query("NOTIFY test_notifications")
49+
.execute(conn)
50+
.await
51+
.unwrap();
52+
53+
let next_notification = std::pin::pin!(conn.notifications_stream()).next().await;
54+
assert_eq!(next_notification.unwrap().unwrap().payload, "");
55+
}

0 commit comments

Comments
 (0)