Skip to content

Commit 315fbcf

Browse files
committed
feat: update type conversion to use field instead of datatype
1 parent edcfc50 commit 315fbcf

File tree

3 files changed

+55
-43
lines changed

3 files changed

+55
-43
lines changed

arrow-pg/src/datatypes.rs

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use crate::row_encoder::RowEncoder;
1717
#[cfg(feature = "datafusion")]
1818
pub mod df;
1919

20-
pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
20+
pub fn into_pg_type(field: &Arc<Field>) -> PgWireResult<Type> {
21+
let arrow_type = field.data_type();
22+
2123
Ok(match arrow_type {
2224
DataType::Null => Type::UNKNOWN,
2325
DataType::Boolean => Type::BOOL,
@@ -43,46 +45,55 @@ pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
4345
DataType::Float64 => Type::FLOAT8,
4446
DataType::Decimal128(_, _) => Type::NUMERIC,
4547
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT,
46-
DataType::List(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) => {
47-
match field.data_type() {
48-
DataType::Boolean => Type::BOOL_ARRAY,
49-
DataType::Int8 | DataType::UInt8 => Type::CHAR_ARRAY,
50-
DataType::Int16 | DataType::UInt16 => Type::INT2_ARRAY,
51-
DataType::Int32 | DataType::UInt32 => Type::INT4_ARRAY,
52-
DataType::Int64 | DataType::UInt64 => Type::INT8_ARRAY,
53-
DataType::Timestamp(_, tz) => {
54-
if tz.is_some() {
55-
Type::TIMESTAMPTZ_ARRAY
56-
} else {
57-
Type::TIMESTAMP_ARRAY
58-
}
59-
}
60-
DataType::Time32(_) | DataType::Time64(_) => Type::TIME_ARRAY,
61-
DataType::Date32 | DataType::Date64 => Type::DATE_ARRAY,
62-
DataType::Interval(_) => Type::INTERVAL_ARRAY,
63-
DataType::FixedSizeBinary(_)
64-
| DataType::Binary
65-
| DataType::LargeBinary
66-
| DataType::BinaryView => Type::BYTEA_ARRAY,
67-
DataType::Float16 | DataType::Float32 => Type::FLOAT4_ARRAY,
68-
DataType::Float64 => Type::FLOAT8_ARRAY,
69-
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT_ARRAY,
70-
struct_type @ DataType::Struct(_) => Type::new(
71-
Type::RECORD_ARRAY.name().into(),
72-
Type::RECORD_ARRAY.oid(),
73-
Kind::Array(into_pg_type(struct_type)?),
74-
Type::RECORD_ARRAY.schema().into(),
75-
),
76-
list_type => {
77-
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
78-
"ERROR".to_owned(),
79-
"XX000".to_owned(),
80-
format!("Unsupported List Datatype {list_type}"),
81-
))));
48+
DataType::List(field)
49+
| DataType::FixedSizeList(field, _)
50+
| DataType::LargeList(field)
51+
| DataType::ListView(field)
52+
| DataType::LargeListView(field) => match field.data_type() {
53+
DataType::Boolean => Type::BOOL_ARRAY,
54+
DataType::Int8 | DataType::UInt8 => Type::CHAR_ARRAY,
55+
DataType::Int16 | DataType::UInt16 => Type::INT2_ARRAY,
56+
DataType::Int32 | DataType::UInt32 => Type::INT4_ARRAY,
57+
DataType::Int64 | DataType::UInt64 => Type::INT8_ARRAY,
58+
DataType::Timestamp(_, tz) => {
59+
if tz.is_some() {
60+
Type::TIMESTAMPTZ_ARRAY
61+
} else {
62+
Type::TIMESTAMP_ARRAY
8263
}
8364
}
65+
DataType::Time32(_) | DataType::Time64(_) => Type::TIME_ARRAY,
66+
DataType::Date32 | DataType::Date64 => Type::DATE_ARRAY,
67+
DataType::Interval(_) => Type::INTERVAL_ARRAY,
68+
DataType::FixedSizeBinary(_)
69+
| DataType::Binary
70+
| DataType::LargeBinary
71+
| DataType::BinaryView => Type::BYTEA_ARRAY,
72+
DataType::Float16 | DataType::Float32 => Type::FLOAT4_ARRAY,
73+
DataType::Float64 => Type::FLOAT8_ARRAY,
74+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::TEXT_ARRAY,
75+
DataType::Struct(_) => Type::new(
76+
Type::RECORD_ARRAY.name().into(),
77+
Type::RECORD_ARRAY.oid(),
78+
Kind::Array(into_pg_type(field)?),
79+
Type::RECORD_ARRAY.schema().into(),
80+
),
81+
list_type => {
82+
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
83+
"ERROR".to_owned(),
84+
"XX000".to_owned(),
85+
format!("Unsupported List Datatype {list_type}"),
86+
))));
87+
}
88+
},
89+
DataType::Dictionary(_, value_type) => {
90+
let field = Arc::new(Field::new(
91+
Field::LIST_FIELD_DEFAULT_NAME,
92+
*value_type.clone(),
93+
true,
94+
));
95+
into_pg_type(&field)?
8496
}
85-
DataType::Dictionary(_, value_type) => into_pg_type(value_type)?,
8697
DataType::Struct(fields) => {
8798
let name: String = fields
8899
.iter()
@@ -94,7 +105,7 @@ pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
94105
fields
95106
.iter()
96107
.map(|x| {
97-
into_pg_type(x.data_type())
108+
into_pg_type(x)
98109
.map(|_type| postgres_types::Field::new(x.name().clone(), _type))
99110
})
100111
.collect::<Result<Vec<_>, PgWireError>>()?,
@@ -117,7 +128,7 @@ pub fn arrow_schema_to_pg_fields(schema: &Schema, format: &Format) -> PgWireResu
117128
.iter()
118129
.enumerate()
119130
.map(|(idx, f)| {
120-
let pg_type = into_pg_type(f.data_type())?;
131+
let pg_type = into_pg_type(f)?;
121132
Ok(FieldInfo::new(
122133
f.name().into(),
123134
None,

arrow-pg/src/datatypes/df.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::iter;
22
use std::sync::Arc;
33

44
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
5-
use datafusion::arrow::datatypes::{DataType, Date32Type};
5+
use datafusion::arrow::datatypes::{DataType, Date32Type, Field};
66
use datafusion::arrow::record_batch::RecordBatch;
77
use datafusion::common::ParamValues;
88
use datafusion::prelude::*;
@@ -61,7 +61,7 @@ where
6161
if let Some(ty) = pg_type_hint {
6262
Ok(ty.clone())
6363
} else if let Some(infer_type) = inferenced_type {
64-
into_pg_type(infer_type)
64+
into_pg_type(&Arc::new(Field::new("item", infer_type.clone(), true)))
6565
} else {
6666
Ok(Type::UNKNOWN)
6767
}

datafusion-postgres/src/handlers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ impl ExtendedQueryHandler for DfSessionService {
391391
for param_type in ordered_param_types(&params).iter() {
392392
// Fixed: Use &params
393393
if let Some(datatype) = param_type {
394-
let pgtype = into_pg_type(datatype)?;
394+
let pgtype =
395+
into_pg_type(&Arc::new(Field::new("item", (*datatype).clone(), true)))?;
395396
param_types.push(pgtype);
396397
} else {
397398
param_types.push(Type::UNKNOWN);

0 commit comments

Comments
 (0)