Skip to content

Commit e005a79

Browse files
committed
Convert struct type to record
1 parent 4b8ea1c commit e005a79

File tree

4 files changed

+28
-0
lines changed

4 files changed

+28
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = ["datafusion-postgres", "datafusion-postgres-cli"]
44

55
[workspace.dependencies]
66
pgwire = "0.28"
7+
postgres-types = "0.2"
78
datafusion = { version = "46", default-features = false }
89
tokio = { version = "1", default-features = false }
910

datafusion-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ readme = "../README.md"
1717

1818
[dependencies]
1919
pgwire = { workspace = true }
20+
postgres-types = { workspace = true }
2021
datafusion = { workspace = true }
2122
futures = "0.3"
2223
async-trait = "0.1"

datafusion-postgres/src/datatypes.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use pgwire::api::portal::{Format, Portal};
1515
use pgwire::api::results::{DataRowEncoder, FieldInfo, QueryResponse};
1616
use pgwire::api::Type;
1717
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
18+
use postgres_types::Kind;
1819
use rust_decimal::prelude::ToPrimitive;
1920
use rust_decimal::{Decimal, Error};
2021
use timezone::Tz;
@@ -65,6 +66,12 @@ pub(crate) fn into_pg_type(df_type: &DataType) -> PgWireResult<Type> {
6566
DataType::Float64 => Type::FLOAT8_ARRAY,
6667
DataType::Utf8 => Type::VARCHAR_ARRAY,
6768
DataType::LargeUtf8 => Type::TEXT_ARRAY,
69+
struct_type @ DataType::Struct(_) => Type::new(
70+
Type::RECORD_ARRAY.name().into(),
71+
Type::RECORD_ARRAY.oid(),
72+
Kind::Array(into_pg_type(struct_type)?),
73+
Type::RECORD_ARRAY.schema().into(),
74+
),
6875
list_type => {
6976
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
7077
"ERROR".to_owned(),
@@ -76,6 +83,24 @@ pub(crate) fn into_pg_type(df_type: &DataType) -> PgWireResult<Type> {
7683
}
7784
DataType::Utf8View => Type::TEXT,
7885
DataType::Dictionary(_, value_type) => into_pg_type(value_type)?,
86+
DataType::Struct(fields) => {
87+
let name: String = fields
88+
.iter()
89+
.map(|x| x.name().clone())
90+
.reduce(|a, b| a + ", " + &b)
91+
.map(|x| format!("({x})"))
92+
.unwrap_or("()".to_string());
93+
let kind = Kind::Composite(
94+
fields
95+
.iter()
96+
.map(|x| {
97+
into_pg_type(x.data_type())
98+
.map(|_type| postgres_types::Field::new(x.name().clone(), _type))
99+
})
100+
.collect::<Result<Vec<_>, PgWireError>>()?,
101+
);
102+
Type::new(name, Type::RECORD.oid(), kind, Type::RECORD.schema().into())
103+
}
79104
_ => {
80105
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
81106
"ERROR".to_owned(),

0 commit comments

Comments
 (0)