Skip to content

Commit e1de0c2

Browse files
committed
transaction support
1 parent b0946fa commit e1de0c2

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed

postgres-tokio/src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ use Connection;
88
pub use postgres_shared::error::*;
99

1010
#[derive(Debug)]
11-
pub enum Error {
11+
pub enum Error<C = Connection> {
1212
Io(io::Error),
13-
Db(Box<DbError>, Connection),
14-
Conversion(Box<error::Error + Sync + Send>, Connection),
13+
Db(Box<DbError>, C),
14+
Conversion(Box<error::Error + Sync + Send>, C),
1515
}
1616

1717
impl fmt::Display for Error {

postgres-tokio/src/lib.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,9 +726,44 @@ impl Row {
726726
}
727727
}
728728

729+
#[derive(Debug)]
729730
pub struct Transaction(Connection);
730731

731732
impl Transaction {
733+
pub fn batch_execute(self, query: &str) -> BoxFuture<Transaction, Error<Transaction>> {
734+
self.0.batch_execute(query)
735+
.map(Transaction)
736+
.map_err(transaction_err)
737+
.boxed()
738+
}
739+
740+
pub fn prepare(self, query: &str) -> BoxFuture<(Statement, Transaction), Error<Transaction>> {
741+
self.0.prepare(query)
742+
.map(|(s, c)| (s, Transaction(c)))
743+
.map_err(transaction_err)
744+
.boxed()
745+
}
746+
747+
pub fn execute(self,
748+
statement: &Statement,
749+
params: &[&ToSql])
750+
-> BoxFuture<(u64, Transaction), Error<Transaction>> {
751+
self.0.execute(statement, params)
752+
.map(|(n, c)| (n, Transaction(c)))
753+
.map_err(transaction_err)
754+
.boxed()
755+
}
756+
757+
pub fn query(self,
758+
statement: &Statement,
759+
params: &[&ToSql])
760+
-> BoxStateStream<Row, Transaction, Error<Transaction>> {
761+
self.0.query(statement, params)
762+
.map_state(Transaction)
763+
.map_err(transaction_err)
764+
.boxed()
765+
}
766+
732767
pub fn commit(self) -> BoxFuture<Connection, Error> {
733768
self.finish("COMMIT")
734769
}
@@ -756,3 +791,11 @@ fn bad_message<T>() -> T
756791
{
757792
io::Error::new(io::ErrorKind::InvalidInput, "unexpected message").into()
758793
}
794+
795+
fn transaction_err(e: Error) -> Error<Transaction> {
796+
match e {
797+
Error::Io(e) => Error::Io(e),
798+
Error::Db(e, c) => Error::Db(e, Transaction(c)),
799+
Error::Conversion(e, c) => Error::Conversion(e, Transaction(c))
800+
}
801+
}

postgres-tokio/src/test.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,33 @@ fn query() {
134134
.and_then(|c| c.prepare("SELECT id, name FROM foo ORDER BY id"))
135135
.and_then(|(s, c)| c.query(&s, &[]).collect())
136136
.and_then(|(r, c)| {
137+
assert_eq!(r[0].get::<i32, _>("id"), 1);
137138
assert_eq!(r[0].get::<String, _>("name"), "joe");
139+
assert_eq!(r[1].get::<i32, _>("id"), 2);
138140
assert_eq!(r[1].get::<String, _>("name"), "bob");
139141
c.prepare("")
140142
})
141143
.and_then(|(s, c)| c.query(&s, &[]).collect())
142144
.map(|(r, _)| assert!(r.is_empty()));
143145
l.run(done).unwrap();
144146
}
147+
148+
#[test]
149+
fn transaction() {
150+
let mut l = Core::new().unwrap();
151+
let done = Connection::connect("postgres://postgres@localhost", &l.handle())
152+
.then(|c| c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);"))
153+
.then(|c| c.unwrap().transaction())
154+
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('joe');"))
155+
.then(|t| t.unwrap().rollback())
156+
.then(|c| c.unwrap().transaction())
157+
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('bob');"))
158+
.then(|t| t.unwrap().commit())
159+
.then(|c| c.unwrap().prepare("SELECT name FROM foo"))
160+
.and_then(|(s, c)| c.query(&s, &[]).collect())
161+
.map(|(r, _)| {
162+
assert_eq!(r.len(), 1);
163+
assert_eq!(r[0].get::<String, _>("name"), "bob");
164+
});
165+
l.run(done).unwrap();
166+
}

0 commit comments

Comments
 (0)