Skip to content

Commit 01153f7

Browse files
committed
Add a notifications iterator that waits with a timeout
Closes #105
1 parent c4becf6 commit 01153f7

File tree

3 files changed

+97
-2
lines changed

3 files changed

+97
-2
lines changed

src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use std::io::prelude::*;
6868
use std::marker::Sync as StdSync;
6969
use std::mem;
7070
use std::result;
71+
use std::time::Duration;
7172
#[cfg(feature = "unix_socket")]
7273
use std::path::PathBuf;
7374

@@ -509,6 +510,24 @@ impl InnerConnection {
509510
}
510511
}
511512

513+
fn read_message_with_notification_timeout(&mut self, timeout: Duration)
514+
-> std::io::Result<Option<BackendMessage>> {
515+
debug_assert!(!self.desynchronized);
516+
loop {
517+
match try_desync!(self, self.stream.read_message_timeout(timeout)) {
518+
Some(NoticeResponse { fields }) => {
519+
if let Ok(err) = DbError::new_raw(fields) {
520+
self.notice_handler.handle_notice(err);
521+
}
522+
}
523+
Some(ParameterStatus { parameter, value }) => {
524+
self.parameters.insert(parameter, value);
525+
}
526+
val => return Ok(val)
527+
}
528+
}
529+
}
530+
512531
fn read_message(&mut self) -> std_io::Result<BackendMessage> {
513532
loop {
514533
match try!(self.read_message_with_notification()) {

src/notification.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Asynchronous notifications.
22
33
use std::fmt;
4+
use std::time::Duration;
45

56
use {desynchronized, Result, Connection, NotificationsNew};
67
use message::BackendMessage::NotificationResponse;
@@ -48,15 +49,29 @@ impl<'conn> Notifications<'conn> {
4849
}
4950
}
5051

51-
/// Returns an iterator over notifications, blocking until one is received
52-
/// if none are pending.
52+
/// Returns an iterator over notifications that blocks until one is
53+
/// received if none are pending.
5354
///
5455
/// The iterator will never return `None`.
5556
pub fn blocking_iter<'a>(&'a self) -> BlockingIter<'a> {
5657
BlockingIter {
5758
conn: self.conn,
5859
}
5960
}
61+
62+
/// Returns an iterator over notifications that blocks for a limited time
63+
/// waiting to receive one if none are pending.
64+
///
65+
/// # Note
66+
///
67+
/// THis iterator may start returning `Some` after previously returning
68+
/// `None` if more notifications are received.
69+
pub fn timeout_iter<'a>(&'a self, timeout: Duration) -> TimeoutIter<'a> {
70+
TimeoutIter {
71+
conn: self.conn,
72+
timeout: timeout,
73+
}
74+
}
6075
}
6176

6277
impl<'a, 'conn> IntoIterator for &'a Notifications<'conn> {
@@ -121,3 +136,39 @@ impl<'a> Iterator for BlockingIter<'a> {
121136
}
122137
}
123138
}
139+
140+
/// An iterator over notifications which will block for a period of time if
141+
/// none are pending.
142+
pub struct TimeoutIter<'a> {
143+
conn: &'a Connection,
144+
timeout: Duration,
145+
}
146+
147+
impl<'a> Iterator for TimeoutIter<'a> {
148+
type Item = Result<Notification>;
149+
150+
fn next(&mut self) -> Option<Result<Notification>> {
151+
let mut conn = self.conn.conn.borrow_mut();
152+
153+
if let Some(notification) = conn.notifications.pop_front() {
154+
return Some(Ok(notification));
155+
}
156+
157+
if conn.is_desynchronized() {
158+
return Some(Err(Error::IoError(desynchronized())));
159+
}
160+
161+
match conn.read_message_with_notification_timeout(self.timeout) {
162+
Ok(Some(NotificationResponse { pid, channel, payload })) => {
163+
Some(Ok(Notification {
164+
pid: pid,
165+
channel: channel,
166+
payload: payload
167+
}))
168+
}
169+
Ok(None) => None,
170+
Err(err) => Some(Err(Error::IoError(err))),
171+
_ => unreachable!()
172+
}
173+
}
174+
}

tests/test.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use openssl::ssl::{SslContext, SslMethod};
99
use std::thread;
1010
use std::io;
1111
use std::io::prelude::*;
12+
use std::time::Duration;
1213

1314
use postgres::{HandleNotice,
1415
Connection,
@@ -609,6 +610,30 @@ fn test_notifications_next_block() {
609610
}, or_panic!(notifications.blocking_iter().next().unwrap()));
610611
}
611612

613+
#[test]
614+
fn test_notification_next_timeout() {
615+
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
616+
or_panic!(conn.execute("LISTEN test_notifications_next_timeout", &[]));
617+
618+
let _t = thread::spawn(|| {
619+
let conn = or_panic!(Connection::connect("postgres://postgres@localhost", &SslMode::None));
620+
thread::sleep_ms(500);
621+
or_panic!(conn.execute("NOTIFY test_notifications_next_timeout, 'foo'", &[]));
622+
thread::sleep_ms(1500);
623+
or_panic!(conn.execute("NOTIFY test_notifications_next_timeout, 'foo'", &[]));
624+
});
625+
626+
let notifications = conn.notifications();
627+
let mut it = notifications.timeout_iter(Duration::from_secs(1));
628+
check_notification(Notification {
629+
pid: 0,
630+
channel: "test_notifications_next_timeout".to_string(),
631+
payload: "foo".to_string()
632+
}, or_panic!(it.next().unwrap()));
633+
634+
assert!(it.next().is_none());
635+
}
636+
612637
#[test]
613638
// This test is pretty sad, but I don't think there's a better way :(
614639
fn test_cancel_query() {

0 commit comments

Comments
 (0)