Skip to content

Commit 297c383

Browse files
committed
add error to notification_stream api
1 parent e8bb91c commit 297c383

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

src/pg/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub struct AsyncPgConnection {
170170
transaction_state: Arc<Mutex<AnsiTransactionManager>>,
171171
metadata_cache: Arc<Mutex<PgMetadataCache>>,
172172
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
173-
notification_rx: Option<mpsc::UnboundedReceiver<diesel::pg::PgNotification>>,
173+
notification_rx: Option<mpsc::UnboundedReceiver<QueryResult<diesel::pg::PgNotification>>>,
174174
shutdown_channel: Option<oneshot::Sender<()>>,
175175
// a sync mutex is fine here as we only hold it for a really short time
176176
instrumentation: Arc<std::sync::Mutex<DynInstrumentation>>,
@@ -509,7 +509,7 @@ impl AsyncPgConnection {
509509
async fn setup(
510510
conn: tokio_postgres::Client,
511511
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
512-
notification_rx: Option<mpsc::UnboundedReceiver<diesel::pg::PgNotification>>,
512+
notification_rx: Option<mpsc::UnboundedReceiver<QueryResult<diesel::pg::PgNotification>>>,
513513
shutdown_channel: Option<oneshot::Sender<()>>,
514514
instrumentation: Arc<std::sync::Mutex<DynInstrumentation>>,
515515
) -> ConnectionResult<Self> {
@@ -731,7 +731,7 @@ impl AsyncPgConnection {
731731

732732
pub fn notification_stream(
733733
&mut self,
734-
) -> impl futures_core::Stream<Item = diesel::pg::PgNotification> + '_ {
734+
) -> impl futures_core::Stream<Item = QueryResult<diesel::pg::PgNotification>> + '_ {
735735
match &mut self.notification_rx {
736736
None => Either::Left(futures_util::stream::pending()),
737737
Some(rx) => Either::Right(futures_util::stream::unfold(rx, async |rx| {
@@ -987,7 +987,7 @@ fn drive_connection<S>(
987987
mut conn: tokio_postgres::Connection<tokio_postgres::Socket, S>,
988988
) -> (
989989
broadcast::Receiver<Arc<tokio_postgres::Error>>,
990-
mpsc::UnboundedReceiver<diesel::pg::PgNotification>,
990+
mpsc::UnboundedReceiver<QueryResult<diesel::pg::PgNotification>>,
991991
oneshot::Sender<()>,
992992
)
993993
where
@@ -996,23 +996,25 @@ where
996996
let (error_tx, error_rx) = tokio::sync::broadcast::channel(1);
997997
let (notification_tx, notification_rx) = tokio::sync::mpsc::unbounded_channel();
998998
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
999+
let mut conn = futures_util::stream::poll_fn(move |cx| conn.poll_message(cx));
9991000

10001001
tokio::spawn(async move {
1001-
let mut conn = futures_util::stream::poll_fn(|cx| conn.poll_message(cx));
1002-
10031002
loop {
10041003
match futures_util::future::select(&mut shutdown_rx, conn.next()).await {
10051004
Either::Left(_) | Either::Right((None, _)) => break,
10061005
Either::Right((Some(Ok(tokio_postgres::AsyncMessage::Notification(notif))), _)) => {
1007-
let _: Result<_, _> = notification_tx.send(diesel::pg::PgNotification {
1006+
let _: Result<_, _> = notification_tx.send(Ok(diesel::pg::PgNotification {
10081007
process_id: notif.process_id(),
10091008
channel: notif.channel().to_owned(),
10101009
payload: notif.payload().to_owned(),
1011-
});
1010+
}));
10121011
}
10131012
Either::Right((Some(Ok(_)), _)) => {}
10141013
Either::Right((Some(Err(e)), _)) => {
1015-
let _ = error_tx.send(Arc::new(e));
1014+
let e = Arc::new(e);
1015+
let _: Result<_, _> = error_tx.send(e.clone());
1016+
let _: Result<_, _> =
1017+
notification_tx.send(Err(error_helper::from_tokio_postgres_error(e)));
10161018
break;
10171019
}
10181020
}

0 commit comments

Comments
 (0)