Skip to content

Commit 3356713

Browse files
committed
Add raw_prepare
1 parent d08dc13 commit 3356713

File tree

1 file changed

+121
-17
lines changed

1 file changed

+121
-17
lines changed

postgres-tokio/src/lib.rs

Lines changed: 121 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ use std::io;
1919
use tokio_core::reactor::Handle;
2020

2121
#[doc(inline)]
22-
pub use postgres_shared::params;
22+
pub use postgres_shared::{params, types};
2323

2424
use error::{ConnectError, Error, DbError};
2525
use params::{ConnectParams, IntoConnectParams};
2626
use stream::PostgresStream;
27+
use types::{Oid, Type};
2728

2829
pub mod error;
2930
mod stream;
@@ -44,23 +45,19 @@ struct InnerConnection {
4445
}
4546

4647
impl InnerConnection {
47-
fn read(self) -> BoxFuture<(backend::Message<Vec<u8>>, InnerConnection), (io::Error, InnerConnection)> {
48+
fn read(self) -> BoxFuture<(backend::Message<Vec<u8>>, InnerConnection), io::Error> {
4849
self.into_future()
49-
.then(|r| {
50-
let (m, mut s) = match r {
51-
Ok((m, s)) => (m, s),
52-
Err((e, s)) => return Either::A(Err((e, s)).into_future()),
53-
};
54-
50+
.map_err(|e| e.0)
51+
.and_then(|(m, mut s)| {
5552
match m {
5653
Some(backend::Message::ParameterStatus(body)) => {
5754
let name = match body.name() {
5855
Ok(name) => name.to_owned(),
59-
Err(e) => return Either::A(Err((e, s)).into_future()),
56+
Err(e) => return Either::A(Err(e).into_future()),
6057
};
6158
let value = match body.value() {
6259
Ok(value) => value.to_owned(),
63-
Err(e) => return Either::A(Err((e, s)).into_future()),
60+
Err(e) => return Either::A(Err(e).into_future()),
6461
};
6562
s.parameters.insert(name, value);
6663
Either::B(s.read())
@@ -72,7 +69,7 @@ impl InnerConnection {
7269
Some(m) => Either::A(Ok((m, s)).into_future()),
7370
None => {
7471
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF");
75-
Either::A(Err((err, s)).into_future())
72+
Either::A(Err(err).into_future())
7673
}
7774
}
7875
})
@@ -162,7 +159,7 @@ impl Connection {
162159

163160
fn handle_auth(self, params: ConnectParams) -> BoxFuture<Connection, ConnectError> {
164161
self.0.read()
165-
.map_err(|e| e.0.into())
162+
.map_err(ConnectError::Io)
166163
.and_then(move |(m, s)| {
167164
let response = match m {
168165
backend::Message::AuthenticationOk => Ok(None),
@@ -215,7 +212,7 @@ impl Connection {
215212
fn handle_auth_response(self, message: Vec<u8>) -> BoxFuture<Connection, ConnectError> {
216213
self.0.send(message)
217214
.and_then(|s| s.flush())
218-
.and_then(|s| s.read().map_err(|e| e.0))
215+
.and_then(|s| s.read())
219216
.map_err(ConnectError::Io)
220217
.and_then(|(m, s)| {
221218
match m {
@@ -229,7 +226,7 @@ impl Connection {
229226

230227
fn finish_startup(self) -> BoxFuture<Connection, ConnectError> {
231228
self.0.read()
232-
.map_err(|e| ConnectError::Io(e.0))
229+
.map_err(ConnectError::Io)
233230
.and_then(|(m, mut s)| {
234231
match m {
235232
backend::Message::BackendKeyData(body) => {
@@ -262,7 +259,7 @@ impl Connection {
262259
// This has its own read_rows since it will need to handle multiple query completions
263260
fn simple_read_rows(self, mut rows: Vec<RowData>) -> BoxFuture<(Vec<RowData>, Connection), Error> {
264261
self.0.read()
265-
.map_err(|e| Error::Io(e.0))
262+
.map_err(Error::Io)
266263
.and_then(|(m, s)| {
267264
match m {
268265
backend::Message::ReadyForQuery(_) => {
@@ -287,9 +284,10 @@ impl Connection {
287284
.boxed()
288285
}
289286

287+
#[allow(dead_code)]
290288
fn read_rows(self, mut rows: Vec<RowData>) -> BoxFuture<(Vec<RowData>, Connection), Error> {
291289
self.0.read()
292-
.map_err(|e| Error::Io(e.0))
290+
.map_err(Error::Io)
293291
.and_then(|(m, s)| {
294292
match m {
295293
backend::Message::EmptyQueryResponse |
@@ -314,7 +312,7 @@ impl Connection {
314312
where T: 'static + Send
315313
{
316314
self.0.read()
317-
.map_err(|e| Error::Io(e.0))
315+
.map_err(Error::Io)
318316
.and_then(|(m, s)| {
319317
match m {
320318
backend::Message::ReadyForQuery(_) => Ok((t, Connection(s))),
@@ -339,11 +337,117 @@ impl Connection {
339337
self.simple_query(query).map(|r| r.1).boxed()
340338
}
341339

340+
fn raw_prepare(self,
341+
name: &str,
342+
query: &str)
343+
-> BoxFuture<(Vec<Type>, Vec<Column>, Connection), Error> {
344+
let mut parse = vec![];
345+
let mut describe = vec![];
346+
let mut sync = vec![];
347+
frontend::parse(name, query, None, &mut parse)
348+
.and_then(|()| frontend::describe(b'S', name, &mut describe))
349+
.and_then(|()| Ok(frontend::sync(&mut sync)))
350+
.into_future()
351+
.and_then(move |()| self.0.send(parse))
352+
.and_then(|s| s.send(describe))
353+
.and_then(|s| s.send(sync))
354+
.and_then(|s| s.flush())
355+
.and_then(|s| s.read())
356+
.map_err(Error::Io)
357+
.boxed() // work around nonlinear trans blowup
358+
.and_then(|(m, s)| {
359+
match m {
360+
backend::Message::ParseComplete => Either::A(Ok(s).into_future()),
361+
backend::Message::ErrorResponse(body) => {
362+
Either::B(Connection(s).ready_err(body))
363+
}
364+
_ => Either::A(Err(bad_message()).into_future()),
365+
}
366+
})
367+
.and_then(|s| s.read().map_err(Error::Io))
368+
.and_then(|(m, s)| {
369+
match m {
370+
backend::Message::ParameterDescription(body) => {
371+
body.parameters().collect::<Vec<_>>()
372+
.map(|p| (p, s))
373+
.map_err(Error::Io)
374+
}
375+
_ => Err(bad_message()),
376+
}
377+
})
378+
.and_then(|(p, s)| s.read().map(|(m, s)| (p, m, s)).map_err(Error::Io))
379+
.boxed() // work around nonlinear trans blowup
380+
.and_then(|(p, m, s)| {
381+
match m {
382+
backend::Message::RowDescription(body) => {
383+
body.fields()
384+
.map(|f| (f.name().to_owned(), f.type_oid()))
385+
.collect::<Vec<_>>()
386+
.map(|d| (p, d, s))
387+
.map_err(Error::Io)
388+
}
389+
backend::Message::NoData => Ok((p, vec![], s)),
390+
_ => Err(bad_message()),
391+
}
392+
})
393+
.and_then(|(p, r, s)| Connection(s).ready((p, r)))
394+
.and_then(|((p, r), s)| {
395+
s.get_types(p.into_iter(), vec![], |&p| p, |_, t| t)
396+
.map(|(p, s)| (p, r, s))
397+
})
398+
.and_then(|(p, r, s)| {
399+
s.get_types(r.into_iter(),
400+
vec![],
401+
|f| f.1,
402+
|f, t| Column { name: f.0, type_: t })
403+
.map(|(r, s)| (p, r, s))
404+
})
405+
.boxed()
406+
}
407+
408+
fn get_types<T, U, I, F, G>(self,
409+
mut raw: I,
410+
mut out: Vec<U>,
411+
mut get_oid: F,
412+
mut build: G)
413+
-> BoxFuture<(Vec<U>, Connection), Error>
414+
where T: 'static + Send,
415+
U: 'static + Send,
416+
I: 'static + Send + Iterator<Item = T>,
417+
F: 'static + Send + FnMut(&T) -> Oid,
418+
G: 'static + Send + FnMut(T, Type) -> U
419+
{
420+
match raw.next() {
421+
Some(v) => {
422+
let oid = get_oid(&v);
423+
self.get_type(oid)
424+
.and_then(move |(ty, s)| {
425+
out.push(build(v, ty));
426+
s.get_types(raw, out, get_oid, build)
427+
})
428+
.boxed()
429+
}
430+
None => Ok((out, self)).into_future().boxed(),
431+
}
432+
}
433+
434+
fn get_type(self, oid: Oid) -> BoxFuture<(Type, Connection), Error> {
435+
if let Some(type_) = Type::from_oid(oid) {
436+
return Ok((type_, self)).into_future().boxed();
437+
};
438+
unimplemented!()
439+
}
440+
342441
pub fn cancel_data(&self) -> CancelData {
343442
self.0.cancel_data
344443
}
345444
}
346445

446+
struct Column {
447+
name: String,
448+
type_: Type,
449+
}
450+
347451
fn connect_err(fields: &mut ErrorFields) -> ConnectError {
348452
match DbError::new(fields) {
349453
Ok(err) => ConnectError::Db(Box::new(err)),

0 commit comments

Comments
 (0)