@@ -729,6 +729,47 @@ impl AsyncPgConnection {
729
729
. on_connection_event ( event) ;
730
730
}
731
731
732
+ /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
733
+ ///
734
+ /// The returned stream yields all notifications received by the connection, not only notifications received
735
+ /// after calling the function. The returned stream will never close, so no notifications will just result
736
+ /// in a pending state.
737
+ ///
738
+ /// If there's no connection available to poll, the stream will yield no notifications and be pending forever.
739
+ /// This can happen if you created the [`AsyncPgConnection`] by the [`try_from`] constructor.
740
+ ///
741
+ /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
742
+ /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
743
+ /// [`AsyncPgConnection`]: crate::pg::AsyncPgConnection
744
+ /// [`try_from`]: crate::pg::AsyncPgConnection::try_from
745
+ ///
746
+ /// ```rust
747
+ /// # include!("../doctest_setup.rs");
748
+ /// # use scoped_futures::ScopedFutureExt;
749
+ /// #
750
+ /// # #[tokio::main(flavor = "current_thread")]
751
+ /// # async fn main() {
752
+ /// # run_test().await.unwrap();
753
+ /// # }
754
+ /// #
755
+ /// # async fn run_test() -> QueryResult<()> {
756
+ /// # use diesel_async::RunQueryDsl;
757
+ /// # use futures_util::StreamExt;
758
+ /// # let conn = &mut connection_no_transaction().await;
759
+ /// // register the notifications channel we want to receive notifications for
760
+ /// diesel::sql_query("LISTEN example_channel").execute(conn).await?;
761
+ /// // send some notification (usually done from a different connection/thread/application)
762
+ /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(conn).await?;
763
+ ///
764
+ /// let mut notifications = std::pin::pin!(conn.notifications_stream());
765
+ /// let mut notification = notifications.next().await.unwrap().unwrap();
766
+ ///
767
+ /// assert_eq!(notification.channel, "example_channel");
768
+ /// assert_eq!(notification.payload, "additional data");
769
+ /// println!("Notification received from process with id {}", notification.process_id);
770
+ /// # Ok(())
771
+ /// # }
772
+ /// ```
732
773
pub fn notifications_stream (
733
774
& mut self ,
734
775
) -> impl futures_core:: Stream < Item = QueryResult < diesel:: pg:: PgNotification > > + ' _ {
0 commit comments