Skip to content

Commit 40cba12

Browse files
committed
Query!
1 parent 643602d commit 40cba12

File tree

4 files changed

+74
-1
lines changed

4 files changed

+74
-1
lines changed

postgres-shared/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl<'a> RowIndex for str {
113113
}
114114
}
115115

116-
impl<'a, T> RowIndex for &'a T
116+
impl<'a, T: ?Sized> RowIndex for &'a T
117117
where T: RowIndex
118118
{
119119
#[inline]

postgres-tokio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ authors = ["Steven Fackler <[email protected]>"]
66
[dependencies]
77
fallible-iterator = "0.1.3"
88
futures = "0.1.7"
9+
futures-state-stream = { git = "https://github.com/sfackler/futures-state-stream" }
910
postgres-shared = { path = "../postgres-shared" }
1011
postgres-protocol = "0.2"
1112
tokio-core = "0.1"

postgres-tokio/src/lib.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
extern crate fallible_iterator;
22
extern crate futures;
3+
extern crate futures_state_stream;
34
extern crate postgres_shared;
45
extern crate postgres_protocol;
56
extern crate tokio_core;
@@ -9,6 +10,7 @@ extern crate tokio_uds;
910
use fallible_iterator::FallibleIterator;
1011
use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend};
1112
use futures::future::Either;
13+
use futures_state_stream::{StreamEvent, StateStream, BoxStateStream, FutureExt};
1214
use postgres_protocol::authentication;
1315
use postgres_protocol::message::{backend, frontend};
1416
use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields};
@@ -562,6 +564,30 @@ impl Connection {
562564
.boxed()
563565
}
564566

567+
fn read_row(self) -> BoxFuture<(Option<RowData>, Connection), Error> {
568+
self.0.read()
569+
.map_err(Error::Io)
570+
.and_then(|(m, s)| {
571+
let c = Connection(s);
572+
match m {
573+
backend::Message::DataRow(body) => {
574+
Either::A(body.values()
575+
.collect()
576+
.map(|r| (Some(r), c))
577+
.map_err(Error::Io)
578+
.into_future())
579+
}
580+
backend::Message::EmptyQueryResponse |
581+
backend::Message::CommandComplete(_) => Either::A(Ok((None, c)).into_future()),
582+
backend::Message::ErrorResponse(body) => {
583+
Either::B(c.ready_err(body))
584+
}
585+
_ => Either::A(Err(bad_message()).into_future()),
586+
}
587+
})
588+
.boxed()
589+
}
590+
565591
pub fn prepare(mut self, query: &str) -> BoxFuture<(Statement, Connection), Error> {
566592
let id = self.0.next_stmt_id;
567593
self.0.next_stmt_id += 1;
@@ -585,6 +611,34 @@ impl Connection {
585611
.boxed()
586612
}
587613

614+
pub fn query(self,
615+
statement: &Statement,
616+
params: &[&ToSql])
617+
-> BoxStateStream<Row, Connection, Error> {
618+
let columns = statement.columns.clone();
619+
self.raw_execute(&statement.name, "", &statement.params, params)
620+
.map(|c| {
621+
futures_state_stream::unfold((c, columns), |(c, columns)| {
622+
c.read_row()
623+
.and_then(|(r, c)| {
624+
match r {
625+
Some(data) => {
626+
let row = Row {
627+
columns: columns.clone(),
628+
data: data,
629+
};
630+
let event = StreamEvent::Next((row, (c, columns)));
631+
Either::A(Ok(event).into_future())
632+
},
633+
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
634+
}
635+
})
636+
})
637+
})
638+
.flatten_state_stream()
639+
.boxed()
640+
}
641+
588642
pub fn close(self) -> BoxFuture<(), Error> {
589643
let mut terminate = vec![];
590644
frontend::terminate(&mut terminate);

postgres-tokio/src/test.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use futures::Future;
2+
use futures_state_stream::StateStream;
23
use tokio_core::reactor::Core;
34

45
use super::*;
@@ -121,3 +122,20 @@ fn prepare_execute() {
121122
.map(|(n, _)| assert_eq!(n, 2));
122123
l.run(done).unwrap();
123124
}
125+
126+
#[test]
127+
fn query() {
128+
let mut l = Core::new().unwrap();
129+
let done = Connection::connect("postgres://postgres@localhost", &l.handle())
130+
.then(|c| {
131+
c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);
132+
INSERT INTO foo (name) VALUES ('joe'), ('bob')")
133+
})
134+
.and_then(|c| c.prepare("SELECT id, name FROM foo ORDER BY id"))
135+
.and_then(|(s, c)| c.query(&s, &[]).collect())
136+
.map(|(r, _)| {
137+
assert_eq!(r[0].get::<String, _>("name"), "joe");
138+
assert_eq!(r[1].get::<String, _>("name"), "bob");
139+
});
140+
l.run(done).unwrap();
141+
}

0 commit comments

Comments
 (0)