Skip to content

Commit 32e09db

Browse files
committed
Change batch_execute into simple_query
Closes #413
1 parent 80ebcd1 commit 32e09db

File tree

21 files changed

+497
-243
lines changed

21 files changed

+497
-243
lines changed

postgres/src/client.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use fallible_iterator::FallibleIterator;
12
use futures::{Async, Future, Poll, Stream};
23
use std::io::{self, Read};
34
use tokio_postgres::types::{ToSql, Type};
@@ -7,7 +8,7 @@ use tokio_postgres::{MakeTlsConnect, Socket, TlsConnect};
78

89
#[cfg(feature = "runtime")]
910
use crate::Config;
10-
use crate::{CopyOutReader, Query, Statement, ToStatement, Transaction};
11+
use crate::{CopyOutReader, Query, SimpleQuery, Statement, ToStatement, Transaction};
1112

1213
pub struct Client(tokio_postgres::Client);
1314

@@ -81,12 +82,12 @@ impl Client {
8182
CopyOutReader::new(stream)
8283
}
8384

84-
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
85-
self.0.batch_execute(query).wait()
85+
pub fn simple_query(&mut self, query: &str) -> Result<SimpleQuery<'_>, Error> {
86+
Ok(SimpleQuery::new(self.0.simple_query(query)))
8687
}
8788

8889
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
89-
self.batch_execute("BEGIN")?;
90+
self.simple_query("BEGIN")?.count()?;
9091
Ok(Transaction::new(self))
9192
}
9293

postgres/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod copy_out_reader;
1010
mod portal;
1111
mod query;
1212
mod query_portal;
13+
mod simple_query;
1314
mod statement;
1415
mod to_statement;
1516
mod transaction;
@@ -25,6 +26,7 @@ pub use crate::copy_out_reader::*;
2526
pub use crate::portal::*;
2627
pub use crate::query::*;
2728
pub use crate::query_portal::*;
29+
pub use crate::simple_query::*;
2830
pub use crate::statement::*;
2931
pub use crate::to_statement::*;
3032
pub use crate::transaction::*;

postgres/src/simple_query.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use fallible_iterator::FallibleIterator;
2+
use futures::stream::{self, Stream};
3+
use std::marker::PhantomData;
4+
use tokio_postgres::impls;
5+
use tokio_postgres::{Error, SimpleQueryMessage};
6+
7+
pub struct SimpleQuery<'a> {
8+
it: stream::Wait<impls::SimpleQuery>,
9+
_p: PhantomData<&'a mut ()>,
10+
}
11+
12+
// no-op impl to extend borrow until drop
13+
impl<'a> Drop for SimpleQuery<'a> {
14+
fn drop(&mut self) {}
15+
}
16+
17+
impl<'a> SimpleQuery<'a> {
18+
pub(crate) fn new(stream: impls::SimpleQuery) -> SimpleQuery<'a> {
19+
SimpleQuery {
20+
it: stream.wait(),
21+
_p: PhantomData,
22+
}
23+
}
24+
25+
/// A convenience API which collects the resulting messages into a `Vec` and returns them.
26+
pub fn into_vec(self) -> Result<Vec<SimpleQueryMessage>, Error> {
27+
self.collect()
28+
}
29+
}
30+
31+
impl<'a> FallibleIterator for SimpleQuery<'a> {
32+
type Item = SimpleQueryMessage;
33+
type Error = Error;
34+
35+
fn next(&mut self) -> Result<Option<SimpleQueryMessage>, Error> {
36+
match self.it.next() {
37+
Some(Ok(row)) => Ok(Some(row)),
38+
Some(Err(e)) => Err(e),
39+
None => Ok(None),
40+
}
41+
}
42+
}

postgres/src/test.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use fallible_iterator::FallibleIterator;
12
use std::io::Read;
23
use tokio_postgres::types::Type;
34
use tokio_postgres::NoTls;
@@ -47,7 +48,9 @@ fn transaction_commit() {
4748
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
4849

4950
client
50-
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
51+
.simple_query("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
52+
.unwrap()
53+
.count()
5154
.unwrap();
5255

5356
let mut transaction = client.transaction().unwrap();
@@ -72,7 +75,9 @@ fn transaction_rollback() {
7275
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
7376

7477
client
75-
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
78+
.simple_query("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
79+
.unwrap()
80+
.count()
7681
.unwrap();
7782

7883
let mut transaction = client.transaction().unwrap();
@@ -96,7 +101,9 @@ fn transaction_drop() {
96101
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
97102

98103
client
99-
.batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
104+
.simple_query("CREATE TEMPORARY TABLE foo (id SERIAL PRIMARY KEY)")
105+
.unwrap()
106+
.count()
100107
.unwrap();
101108

102109
let mut transaction = client.transaction().unwrap();
@@ -120,7 +127,9 @@ fn nested_transactions() {
120127
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
121128

122129
client
123-
.batch_execute("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)")
130+
.simple_query("CREATE TEMPORARY TABLE foo (id INT PRIMARY KEY)")
131+
.unwrap()
132+
.count()
124133
.unwrap();
125134

126135
let mut transaction = client.transaction().unwrap();
@@ -177,7 +186,9 @@ fn copy_in() {
177186
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
178187

179188
client
180-
.batch_execute("CREATE TEMPORARY TABLE foo (id INT, name TEXT)")
189+
.simple_query("CREATE TEMPORARY TABLE foo (id INT, name TEXT)")
190+
.unwrap()
191+
.count()
181192
.unwrap();
182193

183194
client
@@ -206,13 +217,12 @@ fn copy_out() {
206217
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
207218

208219
client
209-
.batch_execute(
210-
"
211-
CREATE TEMPORARY TABLE foo (id INT, name TEXT);
212-
213-
INSERT INTO foo (id, name) VALUES (1, 'steven'), (2, 'timothy');
214-
",
220+
.simple_query(
221+
"CREATE TEMPORARY TABLE foo (id INT, name TEXT);
222+
INSERT INTO foo (id, name) VALUES (1, 'steven'), (2, 'timothy');",
215223
)
224+
.unwrap()
225+
.count()
216226
.unwrap();
217227

218228
let mut reader = client
@@ -224,21 +234,20 @@ fn copy_out() {
224234

225235
assert_eq!(s, "1\tsteven\n2\ttimothy\n");
226236

227-
client.batch_execute("SELECT 1").unwrap();
237+
client.simple_query("SELECT 1").unwrap().count().unwrap();
228238
}
229239

230240
#[test]
231241
fn portal() {
232242
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
233243

234244
client
235-
.batch_execute(
236-
"
237-
CREATE TEMPORARY TABLE foo (id INT);
238-
239-
INSERT INTO foo (id) VALUES (1), (2), (3);
240-
",
245+
.simple_query(
246+
"CREATE TEMPORARY TABLE foo (id INT);
247+
INSERT INTO foo (id) VALUES (1), (2), (3);",
241248
)
249+
.unwrap()
250+
.count()
242251
.unwrap();
243252

244253
let mut transaction = client.transaction().unwrap();

postgres/src/transaction.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use fallible_iterator::FallibleIterator;
12
use futures::Future;
23
use std::io::Read;
34
use tokio_postgres::types::{ToSql, Type};
45
use tokio_postgres::Error;
56

6-
use crate::{Client, CopyOutReader, Portal, Query, QueryPortal, Statement, ToStatement};
7+
use crate::{
8+
Client, CopyOutReader, Portal, Query, QueryPortal, SimpleQuery, Statement, ToStatement,
9+
};
710

811
pub struct Transaction<'a> {
912
client: &'a mut Client,
@@ -30,12 +33,14 @@ impl<'a> Transaction<'a> {
3033

3134
pub fn commit(mut self) -> Result<(), Error> {
3235
self.done = true;
33-
if self.depth == 0 {
34-
self.client.batch_execute("COMMIT")
36+
let it = if self.depth == 0 {
37+
self.client.simple_query("COMMIT")?
3538
} else {
3639
self.client
37-
.batch_execute(&format!("RELEASE sp{}", self.depth))
38-
}
40+
.simple_query(&format!("RELEASE sp{}", self.depth))?
41+
};
42+
it.count()?;
43+
Ok(())
3944
}
4045

4146
pub fn rollback(mut self) -> Result<(), Error> {
@@ -44,12 +49,14 @@ impl<'a> Transaction<'a> {
4449
}
4550

4651
fn rollback_inner(&mut self) -> Result<(), Error> {
47-
if self.depth == 0 {
48-
self.client.batch_execute("ROLLBACK")
52+
let it = if self.depth == 0 {
53+
self.client.simple_query("ROLLBACK")?
4954
} else {
5055
self.client
51-
.batch_execute(&format!("ROLLBACK TO sp{}", self.depth))
52-
}
56+
.simple_query(&format!("ROLLBACK TO sp{}", self.depth))?
57+
};
58+
it.count()?;
59+
Ok(())
5360
}
5461

5562
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
@@ -120,14 +127,15 @@ impl<'a> Transaction<'a> {
120127
self.client.copy_out(query, params)
121128
}
122129

123-
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
124-
self.client.batch_execute(query)
130+
pub fn simple_query(&mut self, query: &str) -> Result<SimpleQuery<'_>, Error> {
131+
self.client.simple_query(query)
125132
}
126133

127134
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
128135
let depth = self.depth + 1;
129136
self.client
130-
.batch_execute(&format!("SAVEPOINT sp{}", depth))?;
137+
.simple_query(&format!("SAVEPOINT sp{}", depth))?
138+
.count()?;
131139
Ok(Transaction {
132140
client: self.client,
133141
depth,

tokio-postgres-native-tls/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,6 @@ fn runtime() {
100100
let connection = connection.map_err(|e| panic!("{}", e));
101101
runtime.spawn(connection);
102102

103-
let execute = client.batch_execute("SELECT 1");
103+
let execute = client.simple_query("SELECT 1").for_each(|_| Ok(()));
104104
runtime.block_on(execute).unwrap();
105105
}

tokio-postgres-openssl/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,6 @@ fn runtime() {
8585
let connection = connection.map_err(|e| panic!("{}", e));
8686
runtime.spawn(connection);
8787

88-
let execute = client.batch_execute("SELECT 1");
88+
let execute = client.simple_query("SELECT 1").for_each(|_| Ok(()));
8989
runtime.block_on(execute).unwrap();
9090
}

tokio-postgres/src/error/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ enum Kind {
336336
Tls,
337337
ToSql,
338338
FromSql,
339+
Column,
339340
CopyInStream,
340341
Closed,
341342
Db,
@@ -373,6 +374,7 @@ impl fmt::Display for Error {
373374
Kind::Tls => "error performing TLS handshake",
374375
Kind::ToSql => "error serializing a value",
375376
Kind::FromSql => "error deserializing a value",
377+
Kind::Column => "invalid column",
376378
Kind::CopyInStream => "error from a copy_in stream",
377379
Kind::Closed => "connection closed",
378380
Kind::Db => "db error",
@@ -451,6 +453,10 @@ impl Error {
451453
Error::new(Kind::FromSql, Some(e))
452454
}
453455

456+
pub(crate) fn column() -> Error {
457+
Error::new(Kind::Column, None)
458+
}
459+
454460
pub(crate) fn copy_in_stream<E>(e: E) -> Error
455461
where
456462
E: Into<Box<dyn error::Error + Sync + Send>>,

tokio-postgres/src/impls.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::error;
55
use tokio_io::{AsyncRead, AsyncWrite};
66

77
use crate::proto;
8-
use crate::{Client, Connection, Error, Portal, Row, Statement, TlsConnect};
8+
use crate::{Client, Connection, Error, Portal, Row, SimpleQueryMessage, Statement, TlsConnect};
99
#[cfg(feature = "runtime")]
1010
use crate::{MakeTlsConnect, Socket};
1111

@@ -187,3 +187,16 @@ impl Stream for CopyOut {
187187
self.0.poll()
188188
}
189189
}
190+
191+
/// The stream returned by `Client::simple_query`.
192+
#[must_use = "streams do nothing unless polled"]
193+
pub struct SimpleQuery(pub(crate) proto::SimpleQueryStream);
194+
195+
impl Stream for SimpleQuery {
196+
type Item = SimpleQueryMessage;
197+
type Error = Error;
198+
199+
fn poll(&mut self) -> Poll<Option<SimpleQueryMessage>, Error> {
200+
self.0.poll()
201+
}
202+
}

tokio-postgres/src/lib.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
#![warn(rust_2018_idioms, clippy::all)]
103103

104104
use bytes::IntoBuf;
105-
use futures::{try_ready, Async, Future, Poll, Stream};
105+
use futures::{Future, Poll, Stream};
106106
use std::error::Error as StdError;
107107
use std::sync::atomic::{AtomicUsize, Ordering};
108108
use tokio_io::{AsyncRead, AsyncWrite};
@@ -240,19 +240,21 @@ impl Client {
240240
impls::CopyOut(self.0.copy_out(&statement.0, params))
241241
}
242242

243-
/// Executes a sequence of SQL statements.
243+
/// Executes a sequence of SQL statements using the simple query protocol.
244244
///
245245
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
246-
/// point. This is intended for the execution of batches of non-dynamic statements, for example, the creation of
247-
/// a schema for a fresh database.
246+
/// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
247+
/// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a stream over the
248+
/// rows, this method returns a stream over an enum which indicates either the completion of one of the commands,
249+
/// or a row of data. This preserves the framing between the separate statements in the request.
248250
///
249251
/// # Warning
250252
///
251253
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
252254
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
253255
/// them to this method!
254-
pub fn batch_execute(&mut self, query: &str) -> BatchExecute {
255-
BatchExecute(self.0.batch_execute(query))
256+
pub fn simple_query(&mut self, query: &str) -> impls::SimpleQuery {
257+
impls::SimpleQuery(self.0.simple_query(query))
256258
}
257259

258260
/// A utility method to wrap a future in a database transaction.
@@ -445,18 +447,11 @@ where
445447
}
446448
}
447449

448-
#[must_use = "futures do nothing unless polled"]
449-
pub struct BatchExecute(proto::SimpleQueryStream);
450-
451-
impl Future for BatchExecute {
452-
type Item = ();
453-
type Error = Error;
454-
455-
fn poll(&mut self) -> Poll<(), Error> {
456-
while let Some(_) = try_ready!(self.0.poll()) {}
457-
458-
Ok(Async::Ready(()))
459-
}
450+
pub enum SimpleQueryMessage {
451+
Row(SimpleQueryRow),
452+
CommandComplete(u64),
453+
#[doc(hidden)]
454+
__NonExhaustive,
460455
}
461456

462457
/// An asynchronous notification.

0 commit comments

Comments
 (0)