Skip to content

Commit 3c4a0af

Browse files
committed
Add a notification API to the blocking client
This mirrors the implementation in the old 0.15 release, but is quite a bit simpler now that we're built on the nonblocking API!
1 parent fd3a99c commit 3c4a0af

File tree

6 files changed

+292
-8
lines changed

6 files changed

+292
-8
lines changed

postgres/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fallible-iterator = "0.2"
3535
futures = "0.3"
3636
tokio-postgres = { version = "0.5.3", path = "../tokio-postgres" }
3737

38-
tokio = { version = "0.2", features = ["rt-core"] }
38+
tokio = { version = "0.2", features = ["rt-core", "time"] }
3939
log = "0.4"
4040

4141
[dev-dependencies]

postgres/src/client.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::connection::Connection;
22
use crate::{
3-
CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction,
4-
TransactionBuilder,
3+
CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
4+
ToStatement, Transaction, TransactionBuilder,
55
};
66
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
77
use tokio_postgres::types::{ToSql, Type};
@@ -471,6 +471,13 @@ impl Client {
471471
TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
472472
}
473473

474+
/// Returns a structure providing access to asynchronous notifications.
475+
///
476+
/// Use the `LISTEN` command to register this connection for notifications.
477+
pub fn notifications(&mut self) -> Notifications<'_> {
478+
Notifications::new(self.connection.as_ref())
479+
}
480+
474481
/// Constructs a cancellation token that can later be used to request
475482
/// cancellation of a query running on this connection.
476483
///
@@ -490,7 +497,7 @@ impl Client {
490497
/// thread::spawn(move || {
491498
/// // Abort the query after 5s.
492499
/// thread::sleep(Duration::from_secs(5));
493-
/// cancel_token.cancel_query(NoTls);
500+
/// let _ = cancel_token.cancel_query(NoTls);
494501
/// });
495502
///
496503
/// match client.simple_query("SELECT long_running_query()") {

postgres/src/connection.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,30 @@ impl Connection {
3434
ConnectionRef { connection: self }
3535
}
3636

37+
pub fn enter<F, T>(&self, f: F) -> T
38+
where
39+
F: FnOnce() -> T,
40+
{
41+
self.runtime.enter(f)
42+
}
43+
3744
pub fn block_on<F, T>(&mut self, future: F) -> Result<T, Error>
3845
where
3946
F: Future<Output = Result<T, Error>>,
4047
{
4148
pin_mut!(future);
49+
self.poll_block_on(|cx, _, _| future.as_mut().poll(cx))
50+
}
51+
52+
pub fn poll_block_on<F, T>(&mut self, mut f: F) -> Result<T, Error>
53+
where
54+
F: FnMut(&mut Context<'_>, &mut VecDeque<Notification>, bool) -> Poll<Result<T, Error>>,
55+
{
4256
let connection = &mut self.connection;
4357
let notifications = &mut self.notifications;
4458
self.runtime.block_on({
4559
future::poll_fn(|cx| {
46-
loop {
60+
let done = loop {
4761
match connection.as_mut().poll_next(cx) {
4862
Poll::Ready(Some(Ok(AsyncMessage::Notification(notification)))) => {
4963
notifications.push_back(notification);
@@ -53,14 +67,23 @@ impl Connection {
5367
}
5468
Poll::Ready(Some(Ok(_))) => {}
5569
Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)),
56-
Poll::Ready(None) | Poll::Pending => break,
70+
Poll::Ready(None) => break true,
71+
Poll::Pending => break false,
5772
}
58-
}
73+
};
5974

60-
future.as_mut().poll(cx)
75+
f(cx, notifications, done)
6176
})
6277
})
6378
}
79+
80+
pub fn notifications(&self) -> &VecDeque<Notification> {
81+
&self.notifications
82+
}
83+
84+
pub fn notifications_mut(&mut self) -> &mut VecDeque<Notification> {
85+
&mut self.notifications
86+
}
6487
}
6588

6689
pub struct ConnectionRef<'a> {

postgres/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub use crate::copy_out_reader::CopyOutReader;
7777
#[doc(no_inline)]
7878
pub use crate::error::Error;
7979
pub use crate::generic_client::GenericClient;
80+
#[doc(inline)]
81+
pub use crate::notifications::Notifications;
8082
#[doc(no_inline)]
8183
pub use crate::row::{Row, SimpleQueryRow};
8284
pub use crate::row_iter::RowIter;
@@ -94,6 +96,7 @@ mod copy_in_writer;
9496
mod copy_out_reader;
9597
mod generic_client;
9698
mod lazy_pin;
99+
pub mod notifications;
97100
mod row_iter;
98101
mod transaction;
99102
mod transaction_builder;

postgres/src/notifications.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! Asynchronous notifications.
2+
3+
use crate::connection::ConnectionRef;
4+
use crate::{Error, Notification};
5+
use fallible_iterator::FallibleIterator;
6+
use futures::{ready, FutureExt};
7+
use std::task::Poll;
8+
use std::time::Duration;
9+
use tokio::time::{self, Delay, Instant};
10+
11+
/// Notifications from a PostgreSQL backend.
12+
pub struct Notifications<'a> {
13+
connection: ConnectionRef<'a>,
14+
}
15+
16+
impl<'a> Notifications<'a> {
17+
pub(crate) fn new(connection: ConnectionRef<'a>) -> Notifications<'a> {
18+
Notifications { connection }
19+
}
20+
21+
/// Returns the number of already buffered pending notifications.
22+
pub fn len(&self) -> usize {
23+
self.connection.notifications().len()
24+
}
25+
26+
/// Determines if there are any already buffered pending notifications.
27+
pub fn is_empty(&self) -> bool {
28+
self.connection.notifications().is_empty()
29+
}
30+
31+
/// Returns a nonblocking iterator over notifications.
32+
///
33+
/// If there are no already buffered pending notifications, this iterator will poll the connection but will not
34+
/// block waiting on notifications over the network. A return value of `None` either indicates that there are no
35+
/// pending notifications or that the server has disconnected.
36+
///
37+
/// # Note
38+
///
39+
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
40+
pub fn iter(&mut self) -> Iter<'_> {
41+
Iter {
42+
connection: self.connection.as_ref(),
43+
}
44+
}
45+
46+
/// Returns a blocking iterator over notifications.
47+
///
48+
/// If there are no already buffered pending notifications, this iterator will block indefinitely waiting on the
49+
/// PostgreSQL backend server to send one. It will only return `None` if the server has disconnected.
50+
pub fn blocking_iter(&mut self) -> BlockingIter<'_> {
51+
BlockingIter {
52+
connection: self.connection.as_ref(),
53+
}
54+
}
55+
56+
/// Returns an iterator over notifications which blocks a limited amount of time.
57+
///
58+
/// If there are no already buffered pending notifications, this iterator will block waiting on the PostgreSQL
59+
/// backend server to send one up to the provided timeout. A return value of `None` either indicates that there are
60+
/// no pending notifications or that the server has disconnected.
61+
///
62+
/// # Note
63+
///
64+
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
65+
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
66+
TimeoutIter {
67+
delay: self.connection.enter(|| time::delay_for(timeout)),
68+
timeout,
69+
connection: self.connection.as_ref(),
70+
}
71+
}
72+
}
73+
74+
/// A nonblocking iterator over pending notifications.
75+
pub struct Iter<'a> {
76+
connection: ConnectionRef<'a>,
77+
}
78+
79+
impl<'a> FallibleIterator for Iter<'a> {
80+
type Item = Notification;
81+
type Error = Error;
82+
83+
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
84+
if let Some(notification) = self.connection.notifications_mut().pop_front() {
85+
return Ok(Some(notification));
86+
}
87+
88+
self.connection
89+
.poll_block_on(|_, notifications, _| Poll::Ready(Ok(notifications.pop_front())))
90+
}
91+
92+
fn size_hint(&self) -> (usize, Option<usize>) {
93+
(self.connection.notifications().len(), None)
94+
}
95+
}
96+
97+
/// A blocking iterator over pending notifications.
98+
pub struct BlockingIter<'a> {
99+
connection: ConnectionRef<'a>,
100+
}
101+
102+
impl<'a> FallibleIterator for BlockingIter<'a> {
103+
type Item = Notification;
104+
type Error = Error;
105+
106+
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
107+
if let Some(notification) = self.connection.notifications_mut().pop_front() {
108+
return Ok(Some(notification));
109+
}
110+
111+
self.connection
112+
.poll_block_on(|_, notifications, done| match notifications.pop_front() {
113+
Some(notification) => Poll::Ready(Ok(Some(notification))),
114+
None if done => Poll::Ready(Ok(None)),
115+
None => Poll::Pending,
116+
})
117+
}
118+
119+
fn size_hint(&self) -> (usize, Option<usize>) {
120+
(self.connection.notifications().len(), None)
121+
}
122+
}
123+
124+
/// A time-limited blocking iterator over pending notifications.
125+
pub struct TimeoutIter<'a> {
126+
connection: ConnectionRef<'a>,
127+
delay: Delay,
128+
timeout: Duration,
129+
}
130+
131+
impl<'a> FallibleIterator for TimeoutIter<'a> {
132+
type Item = Notification;
133+
type Error = Error;
134+
135+
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
136+
if let Some(notification) = self.connection.notifications_mut().pop_front() {
137+
self.delay.reset(Instant::now() + self.timeout);
138+
return Ok(Some(notification));
139+
}
140+
141+
let delay = &mut self.delay;
142+
let timeout = self.timeout;
143+
self.connection.poll_block_on(|cx, notifications, done| {
144+
match notifications.pop_front() {
145+
Some(notification) => {
146+
delay.reset(Instant::now() + timeout);
147+
return Poll::Ready(Ok(Some(notification)));
148+
}
149+
None if done => return Poll::Ready(Ok(None)),
150+
None => {}
151+
}
152+
153+
ready!(delay.poll_unpin(cx));
154+
Poll::Ready(Ok(None))
155+
})
156+
}
157+
158+
fn size_hint(&self) -> (usize, Option<usize>) {
159+
(self.connection.notifications().len(), None)
160+
}
161+
}

postgres/src/test.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,3 +309,93 @@ fn cancel_query() {
309309

310310
cancel_thread.join().unwrap();
311311
}
312+
313+
#[test]
314+
fn notifications_iter() {
315+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
316+
317+
client
318+
.batch_execute(
319+
"\
320+
LISTEN notifications_iter;
321+
NOTIFY notifications_iter, 'hello';
322+
NOTIFY notifications_iter, 'world';
323+
",
324+
)
325+
.unwrap();
326+
327+
let notifications = client.notifications().iter().collect::<Vec<_>>().unwrap();
328+
assert_eq!(notifications.len(), 2);
329+
assert_eq!(notifications[0].payload(), "hello");
330+
assert_eq!(notifications[1].payload(), "world");
331+
}
332+
333+
#[test]
334+
fn notifications_blocking_iter() {
335+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
336+
337+
client
338+
.batch_execute(
339+
"\
340+
LISTEN notifications_blocking_iter;
341+
NOTIFY notifications_blocking_iter, 'hello';
342+
",
343+
)
344+
.unwrap();
345+
346+
thread::spawn(|| {
347+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
348+
349+
thread::sleep(Duration::from_secs(1));
350+
client
351+
.batch_execute("NOTIFY notifications_blocking_iter, 'world'")
352+
.unwrap();
353+
});
354+
355+
let notifications = client
356+
.notifications()
357+
.blocking_iter()
358+
.take(2)
359+
.collect::<Vec<_>>()
360+
.unwrap();
361+
assert_eq!(notifications.len(), 2);
362+
assert_eq!(notifications[0].payload(), "hello");
363+
assert_eq!(notifications[1].payload(), "world");
364+
}
365+
366+
#[test]
367+
fn notifications_timeout_iter() {
368+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
369+
370+
client
371+
.batch_execute(
372+
"\
373+
LISTEN notifications_timeout_iter;
374+
NOTIFY notifications_timeout_iter, 'hello';
375+
",
376+
)
377+
.unwrap();
378+
379+
thread::spawn(|| {
380+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
381+
382+
thread::sleep(Duration::from_secs(1));
383+
client
384+
.batch_execute("NOTIFY notifications_timeout_iter, 'world'")
385+
.unwrap();
386+
387+
thread::sleep(Duration::from_secs(10));
388+
client
389+
.batch_execute("NOTIFY notifications_timeout_iter, '!'")
390+
.unwrap();
391+
});
392+
393+
let notifications = client
394+
.notifications()
395+
.timeout_iter(Duration::from_secs(2))
396+
.collect::<Vec<_>>()
397+
.unwrap();
398+
assert_eq!(notifications.len(), 2);
399+
assert_eq!(notifications[0].payload(), "hello");
400+
assert_eq!(notifications[1].payload(), "world");
401+
}

0 commit comments

Comments
 (0)