Skip to content

Commit a2609f6

Browse files
committed
Refactor notification API
1 parent 008f14b commit a2609f6

File tree

2 files changed

+73
-27
lines changed

2 files changed

+73
-27
lines changed

src/notification.rs

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
use debug_builders::DebugStruct;
44
use std::fmt;
55

6-
use {Result, Connection, NotificationsNew};
6+
use {desynchronized, Result, Connection, NotificationsNew};
77
use message::BackendMessage::NotificationResponse;
8+
use error::Error;
89

910
/// An asynchronous notification.
1011
#[derive(Clone, Debug)]
@@ -25,11 +26,49 @@ pub struct Notifications<'conn> {
2526
impl<'a> fmt::Debug for Notifications<'a> {
2627
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
2728
DebugStruct::new(fmt, "Notifications")
28-
.field("pending", &self.conn.conn.borrow().notifications.len())
29+
.field("pending", &self.len())
2930
.finish()
3031
}
3132
}
3233

34+
impl<'conn> Notifications<'conn> {
35+
/// Returns the number of pending notifications.
36+
pub fn len(&self) -> usize {
37+
self.conn.conn.borrow().notifications.len()
38+
}
39+
40+
/// Returns an iterator over pending notifications.
41+
///
42+
/// # Note
43+
///
44+
/// This iterator may start returning `Some` after previously returning
45+
/// `None` if more notifications are received.
46+
pub fn iter<'a>(&'a self) -> Iter<'a> {
47+
Iter {
48+
conn: self.conn,
49+
}
50+
}
51+
52+
/// Returns an iterator over notifications, blocking until one is received
53+
/// if none are pending.
54+
///
55+
/// The iterator will never return `None`.
56+
pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> {
57+
BlockingIter {
58+
conn: self.conn,
59+
}
60+
}
61+
}
62+
63+
impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> {
64+
type Item = Notification;
65+
type IntoIter = Iter<'a>;
66+
67+
fn into_iter(self) -> Iter<'a> {
68+
self.iter()
69+
}
70+
}
71+
3372
impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
3473
fn new(conn: &'conn Connection) -> Notifications<'conn> {
3574
Notifications {
@@ -38,41 +77,47 @@ impl<'conn> NotificationsNew<'conn> for Notifications<'conn> {
3877
}
3978
}
4079

41-
impl<'conn> Iterator for Notifications<'conn> {
80+
/// An iterator over pending notifications.
81+
pub struct Iter<'a> {
82+
conn: &'a Connection,
83+
}
84+
85+
impl<'a> Iterator for Iter<'a> {
4286
type Item = Notification;
4387

44-
/// Returns the oldest pending notification or `None` if there are none.
45-
///
46-
/// ## Note
47-
///
48-
/// `next` may return `Some` notification after returning `None` if a new
49-
/// notification was received.
5088
fn next(&mut self) -> Option<Notification> {
5189
self.conn.conn.borrow_mut().notifications.pop_front()
5290
}
5391
}
5492

55-
impl<'conn> Notifications<'conn> {
56-
/// Returns the oldest pending notification.
57-
///
58-
/// If no notifications are pending, blocks until one arrives.
59-
pub fn next_block(&mut self) -> Result<Notification> {
60-
if let Some(notification) = self.next() {
61-
return Ok(notification);
62-
}
93+
/// An iterator over notifications which will block if none are pending.
94+
pub struct BlockingIter<'a> {
95+
conn: &'a Connection,
96+
}
6397

98+
impl<'a> Iterator for BlockingIter<'a> {
99+
type Item = Result<Notification>;
100+
101+
fn next(&mut self) -> Option<Result<Notification>> {
64102
let mut conn = self.conn.conn.borrow_mut();
65-
check_desync!(conn);
66-
match try!(conn.read_message_with_notification()) {
67-
NotificationResponse { pid, channel, payload } => {
68-
Ok(Notification {
103+
if conn.is_desynchronized() {
104+
return Some(Err(Error::IoError(desynchronized())));
105+
}
106+
107+
if let Some(notification) = conn.notifications.pop_front() {
108+
return Some(Ok(notification));
109+
}
110+
111+
match conn.read_message_with_notification() {
112+
Ok(NotificationResponse { pid, channel, payload }) => {
113+
Some(Ok(Notification {
69114
pid: pid,
70115
channel: channel,
71116
payload: payload
72-
})
117+
}))
73118
}
119+
Err(err) => Some(Err(Error::IoError(err))),
74120
_ => unreachable!()
75121
}
76122
}
77123
}
78-

tests/test.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ fn test_custom_notice_handler() {
551551
#[test]
552552
fn test_notification_iterator_none() {
553553
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
554-
assert!(conn.notifications().next().is_none());
554+
assert!(conn.notifications().iter().next().is_none());
555555
}
556556

557557
fn check_notification(expected: Notification, actual: Notification) {
@@ -562,7 +562,8 @@ fn check_notification(expected: Notification, actual: Notification) {
562562
#[test]
563563
fn test_notification_iterator_some() {
564564
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
565-
let mut it = conn.notifications();
565+
let notifications = conn.notifications();
566+
let mut it = notifications.iter();
566567
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel", &[]));
567568
or_panic!(conn.execute("LISTEN test_notification_iterator_one_channel2", &[]));
568569
or_panic!(conn.execute("NOTIFY test_notification_iterator_one_channel, 'hello'", &[]));
@@ -600,12 +601,12 @@ fn test_notifications_next_block() {
600601
or_panic!(conn.execute("NOTIFY test_notifications_next_block, 'foo'", &[]));
601602
});
602603

603-
let mut notifications = conn.notifications();
604+
let notifications = conn.notifications();
604605
check_notification(Notification {
605606
pid: 0,
606607
channel: "test_notifications_next_block".to_string(),
607608
payload: "foo".to_string()
608-
}, or_panic!(notifications.next_block()));
609+
}, or_panic!(notifications.blocking_iter().next().unwrap()));
609610
}
610611

611612
#[test]

0 commit comments

Comments
 (0)