Skip to content

Commit 486bd03

Browse files
committed
fix ltree binary copyout parsing
1 parent 9ed55c8 commit 486bd03

File tree

4 files changed

+35
-44
lines changed

4 files changed

+35
-44
lines changed

connectorx-python/connectorx/tests/test_postgres.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ def test_postgres_with_index_col(postgres_url: str) -> None:
450450

451451

452452
def test_postgres_types_binary(postgres_url: str) -> None:
453-
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext FROM test_types"
453+
query = "SELECT test_date, test_timestamp, test_timestamptz, test_int16, test_int64, test_float32, test_numeric, test_bpchar, test_char, test_varchar, test_uuid, test_time, test_json, test_jsonb, test_bytea, test_enum, test_f4array, test_f8array, test_narray, test_i2array, test_i4array, test_i8array, test_citext, test_ltree FROM test_types"
454454
df = read_sql(postgres_url, query)
455455
expected = pd.DataFrame(
456456
index=range(4),
@@ -549,7 +549,7 @@ def test_postgres_types_binary(postgres_url: str) -> None:
549549
dtype="object",
550550
),
551551
"test_citext": pd.Series(["str_citext", "", "s", None], dtype="object"),
552-
# "test_ltree": pd.Series(["A.B.C.D", "A.B.E", "A", None], dtype="object"), # waiting for https://github.com/sfackler/rust-postgres/issues/960
552+
"test_ltree": pd.Series(["A.B.C.D", "A.B.E", "A", None], dtype="object"),
553553
},
554554
)
555555
assert_frame_equal(df, expected, check_names=True)

connectorx/src/sources/postgres/mod.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod typesystem;
66

77
pub use self::errors::PostgresSourceError;
88
pub use connection::rewrite_tls_args;
9-
pub use typesystem::PostgresTypeSystem;
9+
pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
1010

1111
use crate::constants::DB_BUFFER_SIZE;
1212
use crate::{
@@ -88,6 +88,7 @@ where
8888
queries: Vec<CXQuery<String>>,
8989
names: Vec<String>,
9090
schema: Vec<PostgresTypeSystem>,
91+
pg_schema: Vec<postgres::types::Type>,
9192
_protocol: PhantomData<P>,
9293
}
9394

@@ -109,6 +110,7 @@ where
109110
queries: vec![],
110111
names: vec![],
111112
schema: vec![],
113+
pg_schema: vec![],
112114
_protocol: PhantomData,
113115
}
114116
}
@@ -153,19 +155,23 @@ where
153155

154156
let stmt = conn.prepare(first_query.as_str())?;
155157

156-
let (names, types) = stmt
158+
let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
157159
.columns()
158160
.iter()
159-
.map(|col| {
160-
(
161-
col.name().to_string(),
162-
PostgresTypeSystem::from(col.type_()),
163-
)
164-
})
161+
.map(|col| (col.name().to_string(), col.type_().clone()))
165162
.unzip();
166163

167164
self.names = names;
168-
self.schema = types;
165+
self.schema = pg_types
166+
.iter()
167+
.map(|t| PostgresTypeSystem::from(t))
168+
.collect();
169+
self.pg_schema = self
170+
.schema
171+
.iter()
172+
.zip(pg_types.iter())
173+
.map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
174+
.collect();
169175
}
170176

171177
#[throws(PostgresSourceError)]
@@ -199,6 +205,7 @@ where
199205
conn,
200206
&query,
201207
&self.schema,
208+
&self.pg_schema,
202209
));
203210
}
204211
ret
@@ -215,6 +222,7 @@ where
215222
conn: PgConn<C>,
216223
query: CXQuery<String>,
217224
schema: Vec<PostgresTypeSystem>,
225+
pg_schema: Vec<postgres::types::Type>,
218226
nrows: usize,
219227
ncols: usize,
220228
_protocol: PhantomData<P>,
@@ -227,11 +235,17 @@ where
227235
C::Stream: Send,
228236
<C::TlsConnect as TlsConnect<Socket>>::Future: Send,
229237
{
230-
pub fn new(conn: PgConn<C>, query: &CXQuery<String>, schema: &[PostgresTypeSystem]) -> Self {
238+
pub fn new(
239+
conn: PgConn<C>,
240+
query: &CXQuery<String>,
241+
schema: &[PostgresTypeSystem],
242+
pg_schema: &[postgres::types::Type],
243+
) -> Self {
231244
Self {
232245
conn,
233246
query: query.clone(),
234247
schema: schema.to_vec(),
248+
pg_schema: pg_schema.to_vec(),
235249
nrows: 0,
236250
ncols: schema.len(),
237251
_protocol: PhantomData,
@@ -259,8 +273,7 @@ where
259273
fn parser(&mut self) -> Self::Parser<'_> {
260274
let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
261275
let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
262-
let pg_schema: Vec<_> = self.schema.iter().map(|&dt| dt.into()).collect();
263-
let iter = BinaryCopyOutIter::new(reader, &pg_schema);
276+
let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
264277

265278
PostgresBinarySourcePartitionParser::new(iter, &self.schema)
266279
}

connectorx/src/sources/postgres/typesystem.rs

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -103,38 +103,16 @@ impl<'a> From<&'a Type> for PostgresTypeSystem {
103103
}
104104
}
105105

106-
// Link PostgresDTypes back to the one defined by the postgres crate.
107-
impl<'a> From<PostgresTypeSystem> for Type {
108-
fn from(ty: PostgresTypeSystem) -> Type {
106+
pub struct PostgresTypePairs<'a>(pub &'a Type, pub &'a PostgresTypeSystem);
107+
108+
// Link (postgres::Type, connectorx::PostgresTypes) back to the one defiend by the postgres crate.
109+
impl<'a> From<PostgresTypePairs<'a>> for Type {
110+
fn from(ty: PostgresTypePairs) -> Type {
109111
use PostgresTypeSystem::*;
110-
match ty {
111-
Int2(_) => Type::INT2,
112-
Int4(_) => Type::INT4,
113-
Int8(_) => Type::INT8,
114-
Float4(_) => Type::FLOAT4,
115-
Float8(_) => Type::FLOAT8,
116-
Numeric(_) => Type::NUMERIC,
117-
Int2Array(_) => Type::INT2_ARRAY,
118-
Int4Array(_) => Type::INT4_ARRAY,
119-
Int8Array(_) => Type::INT8_ARRAY,
120-
Float4Array(_) => Type::FLOAT4_ARRAY,
121-
Float8Array(_) => Type::FLOAT8_ARRAY,
122-
NumericArray(_) => Type::NUMERIC_ARRAY,
123-
Bool(_) => Type::BOOL,
124-
Text(_) => Type::TEXT,
125-
BpChar(_) => Type::BPCHAR,
126-
VarChar(_) => Type::VARCHAR,
127-
Char(_) => Type::CHAR,
128-
ByteA(_) => Type::BYTEA,
129-
Date(_) => Type::DATE,
130-
Time(_) => Type::TIME,
131-
Timestamp(_) => Type::TIMESTAMP,
132-
TimestampTz(_) => Type::TIMESTAMPTZ,
133-
UUID(_) => Type::UUID,
134-
JSON(_) => Type::JSON,
135-
JSONB(_) => Type::JSONB,
112+
match ty.1 {
136113
Enum(_) => Type::TEXT,
137114
HSTORE(_) => Type::TEXT, // hstore is not supported in binary protocol (since no corresponding inner TYPE)
115+
_ => ty.0.clone(),
138116
}
139117
}
140118
}

docs/databases/postgres.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ cx.read_sql(conn, query) # read data from
4141
| JSON | object | |
4242
| JSONB | object | |
4343
| ENUM | object | need to convert enum column to text manually (`::text`) when using `csv` and `cursor` protocol |
44-
| ltree | object | binary protocol returns with a hex char prefix. Check https://github.com/sfu-db/connector-x/pull/382 and https://github.com/sfackler/rust-postgres/issues/960 for status |
44+
| ltree | object | |
4545
| INT2[] | object | list of i64 |
4646
| INT4[] | object | list of i64 |
4747
| INT8[] | object | list of i64 |

0 commit comments

Comments
 (0)