Skip to content

Commit 616cde4

Browse files
committed
feat: pass arrow field all the way the any encoder
1 parent 315fbcf commit 616cde4

File tree

8 files changed

+206
-109
lines changed

8 files changed

+206
-109
lines changed

Cargo.lock

Lines changed: 70 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ documentation = "https://docs.rs/crate/datafusion-postgres/"
1414

1515
[workspace.dependencies]
1616
arrow = "56"
17+
arrow-schema = "56"
1718
bytes = "1.10.1"
1819
chrono = { version = "0.4", features = ["std"] }
1920
datafusion = { version = "50", default-features = false }

arrow-pg/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@ readme = "../README.md"
1313
rust-version.workspace = true
1414

1515
[features]
16-
default = ["arrow"]
16+
default = ["arrow", "geo"]
1717
arrow = ["dep:arrow"]
1818
datafusion = ["dep:datafusion"]
19+
geo = ["postgres-types/with-geo-types-0_7", "dep:geoarrow-schema"]
1920
# for testing
2021
_duckdb = []
2122
_bundled = ["duckdb/bundled"]
2223

2324

2425
[dependencies]
2526
arrow = { workspace = true, optional = true }
27+
arrow-schema = { workspace = true }
28+
geoarrow-schema = { version = "0.6", optional = true }
2629
bytes.workspace = true
2730
chrono.workspace = true
2831
datafusion = { workspace = true, optional = true }

arrow-pg/src/datatypes.rs

Lines changed: 94 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::Arc;
22

33
#[cfg(not(feature = "datafusion"))]
44
use arrow::{datatypes::*, record_batch::RecordBatch};
5+
use arrow_schema::extension::ExtensionType;
56
#[cfg(feature = "datafusion")]
67
use datafusion::arrow::{datatypes::*, record_batch::RecordBatch};
78

@@ -20,106 +21,110 @@ pub mod df;
2021
pub fn into_pg_type(field: &Arc<Field>) -> PgWireResult<Type> {
2122
let arrow_type = field.data_type();
2223

23-
Ok(match arrow_type {
24-
DataType::Null => Type::UNKNOWN,
25-
DataType::Boolean => Type::BOOL,
26-
DataType::Int8 | DataType::UInt8 => Type::CHAR,
27-
DataType::Int16 | DataType::UInt16 => Type::INT2,
28-
DataType::Int32 | DataType::UInt32 => Type::INT4,
29-
DataType::Int64 | DataType::UInt64 => Type::INT8,
30-
DataType::Timestamp(_, tz) => {
31-
if tz.is_some() {
32-
Type::TIMESTAMPTZ
33-
} else {
34-
Type::TIMESTAMP
35-
}
36-
}
37-
DataType::Time32(_) | DataType::Time64(_) => Type::TIME,
38-
DataType::Date32 | DataType::Date64 => Type::DATE,
39-
DataType::Interval(_) => Type::INTERVAL,
40-
DataType::Binary
41-
| DataType::FixedSizeBinary(_)
42-
| DataType::LargeBinary
43-
| DataType::BinaryView => Type::BYTEA,
44-
DataType::Float16 | DataType::Float32 => Type::FLOAT4,
45-
DataType::Float64 => Type::FLOAT8,
46-
DataType::Decimal128(_, _) => Type::NUMERIC,
47-
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT,
48-
DataType::List(field)
49-
| DataType::FixedSizeList(field, _)
50-
| DataType::LargeList(field)
51-
| DataType::ListView(field)
52-
| DataType::LargeListView(field) => match field.data_type() {
53-
DataType::Boolean => Type::BOOL_ARRAY,
54-
DataType::Int8 | DataType::UInt8 => Type::CHAR_ARRAY,
55-
DataType::Int16 | DataType::UInt16 => Type::INT2_ARRAY,
56-
DataType::Int32 | DataType::UInt32 => Type::INT4_ARRAY,
57-
DataType::Int64 | DataType::UInt64 => Type::INT8_ARRAY,
24+
match field.extension_type_name() {
25+
#[cfg(feature = "geo")]
26+
Some(geoarrow_schema::PointType::NAME) => Ok(Type::POINT),
27+
_ => Ok(match arrow_type {
28+
DataType::Null => Type::UNKNOWN,
29+
DataType::Boolean => Type::BOOL,
30+
DataType::Int8 | DataType::UInt8 => Type::CHAR,
31+
DataType::Int16 | DataType::UInt16 => Type::INT2,
32+
DataType::Int32 | DataType::UInt32 => Type::INT4,
33+
DataType::Int64 | DataType::UInt64 => Type::INT8,
5834
DataType::Timestamp(_, tz) => {
5935
if tz.is_some() {
60-
Type::TIMESTAMPTZ_ARRAY
36+
Type::TIMESTAMPTZ
6137
} else {
62-
Type::TIMESTAMP_ARRAY
38+
Type::TIMESTAMP
6339
}
6440
}
65-
DataType::Time32(_) | DataType::Time64(_) => Type::TIME_ARRAY,
66-
DataType::Date32 | DataType::Date64 => Type::DATE_ARRAY,
67-
DataType::Interval(_) => Type::INTERVAL_ARRAY,
68-
DataType::FixedSizeBinary(_)
69-
| DataType::Binary
41+
DataType::Time32(_) | DataType::Time64(_) => Type::TIME,
42+
DataType::Date32 | DataType::Date64 => Type::DATE,
43+
DataType::Interval(_) => Type::INTERVAL,
44+
DataType::Binary
45+
| DataType::FixedSizeBinary(_)
7046
| DataType::LargeBinary
71-
| DataType::BinaryView => Type::BYTEA_ARRAY,
72-
DataType::Float16 | DataType::Float32 => Type::FLOAT4_ARRAY,
73-
DataType::Float64 => Type::FLOAT8_ARRAY,
74-
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT_ARRAY,
75-
DataType::Struct(_) => Type::new(
76-
Type::RECORD_ARRAY.name().into(),
77-
Type::RECORD_ARRAY.oid(),
78-
Kind::Array(into_pg_type(field)?),
79-
Type::RECORD_ARRAY.schema().into(),
80-
),
81-
list_type => {
47+
| DataType::BinaryView => Type::BYTEA,
48+
DataType::Float16 | DataType::Float32 => Type::FLOAT4,
49+
DataType::Float64 => Type::FLOAT8,
50+
DataType::Decimal128(_, _) => Type::NUMERIC,
51+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT,
52+
DataType::List(field)
53+
| DataType::FixedSizeList(field, _)
54+
| DataType::LargeList(field)
55+
| DataType::ListView(field)
56+
| DataType::LargeListView(field) => match field.data_type() {
57+
DataType::Boolean => Type::BOOL_ARRAY,
58+
DataType::Int8 | DataType::UInt8 => Type::CHAR_ARRAY,
59+
DataType::Int16 | DataType::UInt16 => Type::INT2_ARRAY,
60+
DataType::Int32 | DataType::UInt32 => Type::INT4_ARRAY,
61+
DataType::Int64 | DataType::UInt64 => Type::INT8_ARRAY,
62+
DataType::Timestamp(_, tz) => {
63+
if tz.is_some() {
64+
Type::TIMESTAMPTZ_ARRAY
65+
} else {
66+
Type::TIMESTAMP_ARRAY
67+
}
68+
}
69+
DataType::Time32(_) | DataType::Time64(_) => Type::TIME_ARRAY,
70+
DataType::Date32 | DataType::Date64 => Type::DATE_ARRAY,
71+
DataType::Interval(_) => Type::INTERVAL_ARRAY,
72+
DataType::FixedSizeBinary(_)
73+
| DataType::Binary
74+
| DataType::LargeBinary
75+
| DataType::BinaryView => Type::BYTEA_ARRAY,
76+
DataType::Float16 | DataType::Float32 => Type::FLOAT4_ARRAY,
77+
DataType::Float64 => Type::FLOAT8_ARRAY,
78+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT_ARRAY,
79+
DataType::Struct(_) => Type::new(
80+
Type::RECORD_ARRAY.name().into(),
81+
Type::RECORD_ARRAY.oid(),
82+
Kind::Array(into_pg_type(field)?),
83+
Type::RECORD_ARRAY.schema().into(),
84+
),
85+
list_type => {
86+
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
87+
"ERROR".to_owned(),
88+
"XX000".to_owned(),
89+
format!("Unsupported List Datatype {list_type}"),
90+
))));
91+
}
92+
},
93+
DataType::Dictionary(_, value_type) => {
94+
let field = Arc::new(Field::new(
95+
Field::LIST_FIELD_DEFAULT_NAME,
96+
*value_type.clone(),
97+
true,
98+
));
99+
into_pg_type(&field)?
100+
}
101+
DataType::Struct(fields) => {
102+
let name: String = fields
103+
.iter()
104+
.map(|x| x.name().clone())
105+
.reduce(|a, b| a + ", " + &b)
106+
.map(|x| format!("({x})"))
107+
.unwrap_or("()".to_string());
108+
let kind = Kind::Composite(
109+
fields
110+
.iter()
111+
.map(|x| {
112+
into_pg_type(x)
113+
.map(|_type| postgres_types::Field::new(x.name().clone(), _type))
114+
})
115+
.collect::<Result<Vec<_>, PgWireError>>()?,
116+
);
117+
Type::new(name, Type::RECORD.oid(), kind, Type::RECORD.schema().into())
118+
}
119+
_ => {
82120
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
83121
"ERROR".to_owned(),
84122
"XX000".to_owned(),
85-
format!("Unsupported List Datatype {list_type}"),
123+
format!("Unsupported Datatype {arrow_type}"),
86124
))));
87125
}
88-
},
89-
DataType::Dictionary(_, value_type) => {
90-
let field = Arc::new(Field::new(
91-
Field::LIST_FIELD_DEFAULT_NAME,
92-
*value_type.clone(),
93-
true,
94-
));
95-
into_pg_type(&field)?
96-
}
97-
DataType::Struct(fields) => {
98-
let name: String = fields
99-
.iter()
100-
.map(|x| x.name().clone())
101-
.reduce(|a, b| a + ", " + &b)
102-
.map(|x| format!("({x})"))
103-
.unwrap_or("()".to_string());
104-
let kind = Kind::Composite(
105-
fields
106-
.iter()
107-
.map(|x| {
108-
into_pg_type(x)
109-
.map(|_type| postgres_types::Field::new(x.name().clone(), _type))
110-
})
111-
.collect::<Result<Vec<_>, PgWireError>>()?,
112-
);
113-
Type::new(name, Type::RECORD.oid(), kind, Type::RECORD.schema().into())
114-
}
115-
_ => {
116-
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
117-
"ERROR".to_owned(),
118-
"XX000".to_owned(),
119-
format!("Unsupported Datatype {arrow_type}"),
120-
))));
121-
}
122-
})
126+
}),
127+
}
123128
}
124129

125130
pub fn arrow_schema_to_pg_fields(schema: &Schema, format: &Format) -> PgWireResult<Vec<FieldInfo>> {

0 commit comments

Comments
 (0)