Skip to content

Commit 7810b96

Browse files
committed
Unknown type support
1 parent f6cb7ea commit 7810b96

File tree

3 files changed

+394
-22
lines changed

3 files changed

+394
-22
lines changed

postgres-tokio/src/lib.rs

Lines changed: 244 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,27 @@ use std::sync::mpsc::{self, Sender, Receiver};
3030
use tokio_core::reactor::Handle;
3131

3232
#[doc(inline)]
33-
pub use postgres_shared::{params, types, Column, RowIndex};
33+
pub use postgres_shared::{params, Column, RowIndex};
3434

35-
use error::{ConnectError, Error, DbError};
35+
use error::{ConnectError, Error, DbError, SqlState};
3636
use params::{ConnectParams, IntoConnectParams};
3737
use stream::PostgresStream;
38-
use types::{Oid, Type, ToSql, SessionInfo, IsNull, FromSql, WrongType};
38+
use types::{Oid, Type, ToSql, SessionInfo, IsNull, FromSql, WrongType, Other, Kind, Field};
3939
use tls::Handshake;
4040

4141
pub mod error;
4242
mod stream;
4343
pub mod tls;
44+
#[macro_use]
45+
pub mod types;
4446

4547
#[cfg(test)]
4648
mod test;
4749

50+
const TYPEINFO_QUERY: &'static str = "__typeinfo";
51+
const TYPEINFO_ENUM_QUERY: &'static str = "__typeinfo_enum";
52+
const TYPEINFO_COMPOSITE_QUERY: &'static str = "__typeinfo_composite";
53+
4854
pub enum TlsMode {
4955
Require(Box<Handshake>),
5056
Prefer(Box<Handshake>),
@@ -85,8 +91,12 @@ struct InnerConnection {
8591
close_receiver: Receiver<(u8, String)>,
8692
close_sender: Sender<(u8, String)>,
8793
parameters: HashMap<String, String>,
94+
types: HashMap<Oid, Other>,
8895
cancel_data: CancelData,
8996
next_stmt_id: u32,
97+
has_typeinfo_query: bool,
98+
has_typeinfo_enum_query: bool,
99+
has_typeinfo_composite_query: bool,
90100
}
91101

92102
impl InnerConnection {
@@ -176,11 +186,15 @@ impl Connection {
176186
close_sender: sender,
177187
close_receiver: receiver,
178188
parameters: HashMap::new(),
189+
types: HashMap::new(),
179190
cancel_data: CancelData {
180191
process_id: 0,
181192
secret_key: 0,
182193
},
183194
next_stmt_id: 0,
195+
has_typeinfo_query: false,
196+
has_typeinfo_enum_query: false,
197+
has_typeinfo_composite_query: false,
184198
}), params)
185199
})
186200
.and_then(|(s, params)| s.startup(params))
@@ -505,7 +519,217 @@ impl Connection {
505519
if let Some(type_) = Type::from_oid(oid) {
506520
return Ok((type_, self)).into_future().boxed();
507521
};
508-
unimplemented!()
522+
523+
let other = self.0.types.get(&oid).map(Clone::clone);
524+
if let Some(other) = other {
525+
return Ok((Type::Other(other), self)).into_future().boxed();
526+
}
527+
528+
self.get_unknown_type(oid)
529+
.map(move |(ty, mut c)| {
530+
c.0.types.insert(oid, ty.clone());
531+
(Type::Other(ty), c)
532+
})
533+
.boxed()
534+
}
535+
536+
fn get_unknown_type(self, oid: Oid) -> BoxFuture<(Other, Connection), Error> {
537+
self.setup_typeinfo_query()
538+
.and_then(move |c| c.raw_execute(TYPEINFO_QUERY, "", &[Type::Oid], &[&oid]))
539+
.and_then(|c| c.read_rows().collect())
540+
.and_then(move |(r, c)| {
541+
let get = |idx| r.get(0).and_then(|r| r.get(idx));
542+
let m = HashMap::new();
543+
let info = SessionInfo::new(&m);
544+
545+
let name = match String::from_sql_nullable(&Type::Name, get(0), &info) {
546+
Ok(v) => v,
547+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
548+
};
549+
let type_ = match i8::from_sql_nullable(&Type::Char, get(1), &info) {
550+
Ok(v) => v,
551+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
552+
};
553+
let elem_oid = match Oid::from_sql_nullable(&Type::Oid, get(2), &info) {
554+
Ok(v) => v,
555+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
556+
};
557+
let rngsubtype = match Option::<Oid>::from_sql_nullable(&Type::Oid, get(3), &info) {
558+
Ok(v) => v,
559+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
560+
};
561+
let basetype = match Oid::from_sql_nullable(&Type::Oid, get(4), &info) {
562+
Ok(v) => v,
563+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
564+
};
565+
let schema = match String::from_sql_nullable(&Type::Name, get(5), &info) {
566+
Ok(v) => v,
567+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
568+
};
569+
let relid = match Oid::from_sql_nullable(&Type::Oid, get(6), &info) {
570+
Ok(v) => v,
571+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
572+
};
573+
574+
let kind = if type_ == b'p' as i8 {
575+
Either::A(Ok((Kind::Pseudo, c)).into_future())
576+
} else if type_ == b'e' as i8 {
577+
Either::B(c.get_enum_variants(oid).map(|(v, c)| (Kind::Enum(v), c)).boxed())
578+
} else if basetype != 0 {
579+
Either::B(c.get_type(basetype).map(|(t, c)| (Kind::Domain(t), c)).boxed())
580+
} else if elem_oid != 0 {
581+
Either::B(c.get_type(elem_oid).map(|(t, c)| (Kind::Array(t), c)).boxed())
582+
} else if relid != 0 {
583+
Either::B(c.get_composite_fields(relid).map(|(f, c)| (Kind::Composite(f), c)).boxed())
584+
} else if let Some(rngsubtype) = rngsubtype {
585+
Either::B(c.get_type(rngsubtype).map(|(t, c)| (Kind::Range(t), c)).boxed())
586+
} else {
587+
Either::A(Ok((Kind::Simple, c)).into_future())
588+
};
589+
590+
Either::B(kind.map(move |(k, c)| (Other::new(name, oid, k, schema), c)))
591+
})
592+
.boxed()
593+
}
594+
595+
fn setup_typeinfo_query(self) -> BoxFuture<Connection, Error> {
596+
if self.0.has_typeinfo_query {
597+
return Ok(self).into_future().boxed();
598+
}
599+
600+
self.raw_prepare(TYPEINFO_QUERY,
601+
"SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \
602+
t.typbasetype, n.nspname, t.typrelid \
603+
FROM pg_catalog.pg_type t \
604+
LEFT OUTER JOIN pg_catalog.pg_range r ON \
605+
r.rngtypid = t.oid \
606+
INNER JOIN pg_catalog.pg_namespace n ON \
607+
t.typnamespace = n.oid \
608+
WHERE t.oid = $1")
609+
.or_else(|e| {
610+
match e {
611+
// Range types weren't added until Postgres 9.2, so pg_range may not exist
612+
Error::Db(e, c) => {
613+
if e.code != SqlState::UndefinedTable {
614+
return Either::B(Err(Error::Db(e, c)).into_future());
615+
}
616+
617+
Either::A(c.raw_prepare(TYPEINFO_QUERY,
618+
"SELECT t.typname, t.typtype, t.typelem, \
619+
NULL::OID, t.typbasetype, n.nspname, \
620+
t.typrelid \
621+
FROM pg_catalog.pg_type t \
622+
INNER JOIN pg_catalog.pg_namespace n \
623+
ON t.typnamespace = n.oid \
624+
WHERE t.oid = $1"))
625+
}
626+
e => Either::B(Err(e).into_future()),
627+
}
628+
})
629+
.map(|(_, _, mut c)| {
630+
c.0.has_typeinfo_query = true;
631+
c
632+
})
633+
.boxed()
634+
}
635+
636+
fn get_enum_variants(self, oid: Oid) -> BoxFuture<(Vec<String>, Connection), Error> {
637+
self.setup_typeinfo_enum_query()
638+
.and_then(move |c| c.raw_execute(TYPEINFO_ENUM_QUERY, "", &[Type::Oid], &[&oid]))
639+
.and_then(|c| c.read_rows().collect())
640+
.and_then(|(r, c)| {
641+
let mut variants = vec![];
642+
let m = HashMap::new();
643+
let info = SessionInfo::new(&m);
644+
for row in r {
645+
let variant = match String::from_sql_nullable(&Type::Name, row.get(0), &info) {
646+
Ok(v) => v,
647+
Err(e) => return Err(Error::Conversion(e, c)),
648+
};
649+
variants.push(variant);
650+
}
651+
Ok((variants, c))
652+
})
653+
.boxed()
654+
}
655+
656+
fn setup_typeinfo_enum_query(self) -> BoxFuture<Connection, Error> {
657+
if self.0.has_typeinfo_enum_query {
658+
return Ok(self).into_future().boxed();
659+
}
660+
661+
self.raw_prepare(TYPEINFO_ENUM_QUERY,
662+
"SELECT enumlabel \
663+
FROM pg_catalog.pg_enum \
664+
WHERE enumtypid = $1 \
665+
ORDER BY enumsortorder")
666+
.or_else(|e| {
667+
match e {
668+
Error::Db(e, c) => {
669+
if e.code != SqlState::UndefinedColumn {
670+
return Either::B(Err(Error::Db(e, c)).into_future());
671+
}
672+
673+
Either::A(c.raw_prepare(TYPEINFO_ENUM_QUERY,
674+
"SELECT enumlabel \
675+
FROM pg_catalog.pg_enum \
676+
WHERE enumtypid = $1 \
677+
ORDER BY oid"))
678+
}
679+
e => Either::B(Err(e).into_future()),
680+
}
681+
})
682+
.map(|(_, _, mut c)| {
683+
c.0.has_typeinfo_enum_query = true;
684+
c
685+
})
686+
.boxed()
687+
}
688+
689+
fn get_composite_fields(self, oid: Oid) -> BoxFuture<(Vec<Field>, Connection), Error> {
690+
self.setup_typeinfo_composite_query()
691+
.and_then(move |c| c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[Type::Oid], &[&oid]))
692+
.and_then(|c| c.read_rows().collect())
693+
.and_then(|(r, c)| {
694+
futures::stream::iter(r.into_iter().map(Ok))
695+
.fold((vec![], c), |(mut fields, c), row| {
696+
let m = HashMap::new();
697+
let info = SessionInfo::new(&m);
698+
let name = match String::from_sql_nullable(&Type::Name, row.get(0), &info) {
699+
Ok(name) => name,
700+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
701+
};
702+
let oid = match Oid::from_sql_nullable(&Type::Oid, row.get(1), &info) {
703+
Ok(oid) => oid,
704+
Err(e) => return Either::A(Err(Error::Conversion(e, c)).into_future()),
705+
};
706+
Either::B(c.get_type(oid)
707+
.map(move |(ty, c)| {
708+
fields.push(Field::new(name, ty));
709+
(fields, c)
710+
}))
711+
})
712+
})
713+
.boxed()
714+
}
715+
716+
fn setup_typeinfo_composite_query(self) -> BoxFuture<Connection, Error> {
717+
if self.0.has_typeinfo_composite_query {
718+
return Ok(self).into_future().boxed();
719+
}
720+
721+
self.raw_prepare(TYPEINFO_COMPOSITE_QUERY,
722+
"SELECT attname, atttypid \
723+
FROM pg_catalog.pg_attribute \
724+
WHERE attrelid = $1 \
725+
AND NOT attisdropped \
726+
AND attnum > 0 \
727+
ORDER BY attnum")
728+
.map(|(_, _, mut c)| {
729+
c.0.has_typeinfo_composite_query = true;
730+
c
731+
})
732+
.boxed()
509733
}
510734

511735
fn raw_execute(self,
@@ -601,6 +825,21 @@ impl Connection {
601825
.boxed()
602826
}
603827

828+
fn read_rows(self) -> BoxStateStream<RowData, Connection, Error> {
829+
futures_state_stream::unfold(self, |c| {
830+
c.read_row()
831+
.and_then(|(r, c)| {
832+
match r {
833+
Some(data) => {
834+
let event = StreamEvent::Next((data, c));
835+
Either::A(Ok(event).into_future())
836+
},
837+
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
838+
}
839+
})
840+
}).boxed()
841+
}
842+
604843
fn read_row(self) -> BoxFuture<(Option<RowData>, Connection), Error> {
605844
self.0.read()
606845
.map_err(Error::Io)
@@ -654,24 +893,7 @@ impl Connection {
654893
-> BoxStateStream<Row, Connection, Error> {
655894
let columns = statement.columns.clone();
656895
self.raw_execute(&statement.name, "", &statement.params, params)
657-
.map(|c| {
658-
futures_state_stream::unfold((c, columns), |(c, columns)| {
659-
c.read_row()
660-
.and_then(|(r, c)| {
661-
match r {
662-
Some(data) => {
663-
let row = Row {
664-
columns: columns.clone(),
665-
data: data,
666-
};
667-
let event = StreamEvent::Next((row, (c, columns)));
668-
Either::A(Ok(event).into_future())
669-
},
670-
None => Either::B(c.ready(()).map(|((), c)| StreamEvent::Done(c))),
671-
}
672-
})
673-
})
674-
})
896+
.map(|c| c.read_rows().map(move |r| Row { columns: columns.clone(), data: r }))
675897
.flatten_state_stream()
676898
.boxed()
677899
}

0 commit comments

Comments
 (0)