Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-pg-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ include = [
[dependencies]
async-trait = "0.1"
datafusion = { workspace = true, features = ["sql"] }
arrow-pg = { path = "../arrow-pg", version = "0.12.2", default-features = false, features = ["datafusion"] }
futures.workspace = true
log = "0.4"
postgres-types.workspace = true
Expand Down
49 changes: 20 additions & 29 deletions datafusion-pg-catalog/src/pg_catalog/pg_attribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use datafusion::arrow::array::{
ArrayRef, BooleanArray, Int16Array, Int32Array, RecordBatch, StringArray,
ArrayRef, BooleanArray, Int16Array, Int32Array, RecordBatch, StringArray, UInt32Array,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream;
use postgres_types::Oid;
use postgres_types::{Oid, Type};
use tokio::sync::RwLock;

use crate::pg_catalog::catalog_info::CatalogInfo;
Expand All @@ -36,7 +36,7 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
let schema = Arc::new(Schema::new(vec![
Field::new("attrelid", DataType::Int32, false), // OID of the relation this column belongs to
Field::new("attname", DataType::Utf8, false), // Column name
Field::new("atttypid", DataType::Int32, false), // OID of the column data type
Field::new("atttypid", DataType::UInt32, false), // OID of the column data type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this as int32? Because there is no native unsigned integer in postgres, using uint32 may introduces issues when we want to use this field with other UDFs.

Field::new("attstattarget", DataType::Int32, false), // Statistics target
Field::new("attlen", DataType::Int16, false), // Length of the type
Field::new("attnum", DataType::Int16, false), // Column number (positive for regular columns)
Expand Down Expand Up @@ -179,7 +179,7 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
let arrays: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(attrelids)),
Arc::new(StringArray::from(attnames)),
Arc::new(Int32Array::from(atttypids)),
Arc::new(UInt32Array::from(atttypids)),
Arc::new(Int32Array::from(attstattargets)),
Arc::new(Int16Array::from(attlens)),
Arc::new(Int16Array::from(attnums)),
Expand Down Expand Up @@ -211,31 +211,22 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
}

/// Map DataFusion data types to PostgreSQL type information
fn datafusion_to_pg_type(data_type: &DataType) -> (i32, i16, bool, &'static str, &'static str) {
match data_type {
DataType::Boolean => (16, 1, true, "c", "p"), // bool
DataType::Int8 => (18, 1, true, "c", "p"), // char
DataType::Int16 => (21, 2, true, "s", "p"), // int2
DataType::Int32 => (23, 4, true, "i", "p"), // int4
DataType::Int64 => (20, 8, true, "d", "p"), // int8
DataType::UInt8 => (18, 2, true, "s", "p"), // char
DataType::UInt16 => (21, 4, true, "i", "p"), // int2
DataType::UInt32 => (23, 8, true, "d", "p"), // int4
DataType::UInt64 => (20, -1, false, "i", "m"), // int8
DataType::Float32 => (700, 4, true, "i", "p"), // float4
DataType::Float64 => (701, 8, true, "d", "p"), // float8
DataType::Utf8 => (25, -1, false, "i", "x"), // text
DataType::LargeUtf8 => (25, -1, false, "i", "x"), // text
DataType::Binary => (17, -1, false, "i", "x"), // bytea
DataType::LargeBinary => (17, -1, false, "i", "x"), // bytea
DataType::Date32 => (1082, 4, true, "i", "p"), // date
DataType::Date64 => (1082, 4, true, "i", "p"), // date
DataType::Time32(_) => (1083, 8, true, "d", "p"), // time
DataType::Time64(_) => (1083, 8, true, "d", "p"), // time
DataType::Timestamp(_, _) => (1114, 8, true, "d", "p"), // timestamp
DataType::Decimal128(_, _) => (1700, -1, false, "i", "m"), // numeric
DataType::Decimal256(_, _) => (1700, -1, false, "i", "m"), // numeric
_ => (25, -1, false, "i", "x"), // Default to text for unknown types
fn datafusion_to_pg_type(data_type: &DataType) -> (u32, i16, bool, &'static str, &'static str) {
match arrow_pg::datatypes::into_pg_type(data_type) {
Ok(t @ Type::BOOL) => (t.oid(), 1, true, "c", "p"),
Ok(t @ Type::CHAR) => (t.oid(), 1, true, "c", "p"),
Ok(t @ Type::INT2) => (t.oid(), 2, true, "s", "p"),
Ok(t @ Type::INT4) => (t.oid(), 4, true, "i", "p"),
Ok(t @ Type::INT8) => (t.oid(), 8, true, "d", "p"),
Ok(t @ Type::FLOAT4) => (t.oid(), 4, true, "i", "p"),
Ok(t @ Type::FLOAT8) => (t.oid(), 8, true, "d", "p"),
Ok(t @ Type::TEXT) => (t.oid(), -1, false, "i", "x"),
Ok(t @ Type::BYTEA) => (t.oid(), -1, false, "i", "x"),
Ok(t @ Type::DATE) => (t.oid(), 4, true, "i", "p"),
Ok(t @ Type::TIME) => (t.oid(), 8, true, "d", "p"),
Ok(t @ Type::TIMESTAMP) => (t.oid(), 8, true, "d", "p"),
Ok(t @ Type::NUMERIC) => (t.oid(), -1, false, "i", "m"),
_ => (Type::TEXT.oid(), -1, false, "i", "x"), // Default to text for unknown types
}
}
}
Expand Down
Loading