Skip to content

Commit e8bb91c

Browse files
committed
use either stream instead new type
1 parent 9107381 commit e8bb91c

File tree

1 file changed

+5
-16
lines changed

1 file changed

+5
-16
lines changed

src/pg/mod.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -732,22 +732,11 @@ impl AsyncPgConnection {
732732
pub fn notification_stream(
733733
&mut self,
734734
) -> impl futures_core::Stream<Item = diesel::pg::PgNotification> + '_ {
735-
NotificationStream(self.notification_rx.as_mut())
736-
}
737-
}
738-
739-
struct NotificationStream<'a>(Option<&'a mut mpsc::UnboundedReceiver<diesel::pg::PgNotification>>);
740-
741-
impl futures_core::Stream for NotificationStream<'_> {
742-
type Item = diesel::pg::PgNotification;
743-
744-
fn poll_next(
745-
mut self: std::pin::Pin<&mut Self>,
746-
cx: &mut std::task::Context<'_>,
747-
) -> std::task::Poll<Option<Self::Item>> {
748-
match &mut self.0 {
749-
Some(rx) => rx.poll_recv(cx),
750-
None => std::task::Poll::Pending,
735+
match &mut self.notification_rx {
736+
None => Either::Left(futures_util::stream::pending()),
737+
Some(rx) => Either::Right(futures_util::stream::unfold(rx, async |rx| {
738+
rx.recv().await.map(move |item| (item, rx))
739+
})),
751740
}
752741
}
753742
}

0 commit comments

Comments
 (0)