Skip to content

Commit 5d08e8e

Browse files
committed
refactor: move datafusion encoding logic into arrow-pg
1 parent ccb214d commit 5d08e8e

File tree

13 files changed

+65
-27
lines changed

13 files changed

+65
-27
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/target
22
.direnv
33
.envrc
4-
.vscode
4+
.vscode
5+
.aider*

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

arrow-pg/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ readme = "../README.md"
1313
rust-version.workspace = true
1414

1515
[features]
16-
default = []
17-
datafusion = ["dep:datafusion-core"]
16+
default = ["arrow"]
17+
arrow = ["dep:arrow"]
18+
datafusion = ["dep:datafusion"]
1819

1920
[dependencies]
20-
arrow.workspace = true
21+
arrow = { workspace = true, optional = true }
2122
bytes.workspace = true
22-
chrono.workspace = true
23-
datafusion-core = { workspace = true, optional = true }
23+
chrono.workspace = true
24+
datafusion = { workspace = true, optional = true }
2425
futures.workspace = true
2526
pgwire = { workspace = true, features = ["server-api"] }
2627
postgres-types.workspace = true

arrow-pg/src/datatypes.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::sync::Arc;
22

3-
use arrow::datatypes::*;
4-
use arrow::record_batch::RecordBatch;
3+
#[cfg(not(feature = "datafusion"))]
4+
use arrow::{datatypes::*, record_batch::RecordBatch};
5+
#[cfg(feature = "datafusion")]
6+
use datafusion::arrow::{datatypes::*, record_batch::RecordBatch};
7+
58
use pgwire::api::portal::Format;
69
use pgwire::api::results::FieldInfo;
710
use pgwire::api::Type;
@@ -11,6 +14,9 @@ use postgres_types::Kind;
1114

1215
use crate::row_encoder::RowEncoder;
1316

17+
#[cfg(feature = "datafusion")]
18+
pub mod df;
19+
1420
pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
1521
Ok(match arrow_type {
1622
DataType::Null => Type::UNKNOWN,

datafusion-postgres/src/datatypes.rs renamed to arrow-pg/src/datatypes/df.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use pgwire::messages::data::DataRow;
1717
use rust_decimal::prelude::ToPrimitive;
1818
use rust_decimal::Decimal;
1919

20-
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type};
20+
use super::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type};
2121

22-
pub(crate) async fn encode_dataframe<'a>(
22+
pub async fn encode_dataframe<'a>(
2323
df: DataFrame,
2424
format: &Format,
2525
) -> PgWireResult<QueryResponse<'a>> {
@@ -51,7 +51,7 @@ pub(crate) async fn encode_dataframe<'a>(
5151
/// If the type is empty or unknown, we fallback to datafusion inferenced type
5252
/// from `inferenced_types`.
5353
/// An error will be raised when neither sources can provide type information.
54-
pub(crate) fn deserialize_parameters<S>(
54+
pub fn deserialize_parameters<S>(
5555
portal: &Portal<S>,
5656
inferenced_types: &[Option<&DataType>],
5757
) -> PgWireResult<ParamValues>

arrow-pg/src/encoder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use std::io::Write;
33
use std::str::FromStr;
44
use std::sync::Arc;
55

6-
use arrow::array::*;
7-
use arrow::datatypes::*;
6+
#[cfg(not(feature = "datafusion"))]
7+
use arrow::{array::*, datatypes::*};
88
use bytes::BufMut;
99
use bytes::BytesMut;
1010
use chrono::{NaiveDate, NaiveDateTime};
11+
#[cfg(feature = "datafusion")]
12+
use datafusion::arrow::{array::*, datatypes::*};
1113
use pgwire::api::results::DataRowEncoder;
1214
use pgwire::api::results::FieldFormat;
1315
use pgwire::error::PgWireError;

arrow-pg/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
//! Arrow data encoding and type mapping for Postgres(pgwire).
2+
3+
// #[cfg(all(feature = "arrow", feature = "datafusion"))]
4+
// compile_error!("Feature arrow and datafusion cannot be enabled at same time. Use no-default-features when activating datafusion");
5+
16
pub mod datatypes;
27
pub mod encoder;
38
mod error;

arrow-pg/src/list_encoder.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
11
use std::{str::FromStr, sync::Arc};
22

3-
use arrow::array::{
4-
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
5-
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
6-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
7-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
8-
};
3+
#[cfg(not(feature = "datafusion"))]
94
use arrow::{
5+
array::{
6+
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
7+
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
8+
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
9+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
10+
},
1011
datatypes::{
1112
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
1213
Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
1314
Time64NanosecondType, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
1415
},
1516
temporal_conversions::{as_date, as_time},
1617
};
18+
#[cfg(feature = "datafusion")]
19+
use datafusion::arrow::{
20+
array::{
21+
timezone::Tz, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
22+
LargeBinaryArray, PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
23+
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
24+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
25+
},
26+
datatypes::{
27+
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
28+
Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
29+
Time64NanosecondType, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
30+
},
31+
temporal_conversions::{as_date, as_time},
32+
};
33+
1734
use bytes::{BufMut, BytesMut};
1835
use chrono::{DateTime, TimeZone, Utc};
1936
use pgwire::api::results::FieldFormat;

arrow-pg/src/row_encoder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use std::sync::Arc;
22

3+
#[cfg(not(feature = "datafusion"))]
34
use arrow::array::RecordBatch;
5+
#[cfg(feature = "datafusion")]
6+
use datafusion::arrow::array::RecordBatch;
7+
48
use pgwire::{
59
api::results::{DataRowEncoder, FieldInfo},
610
error::PgWireResult,

arrow-pg/src/struct_encoder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use std::sync::Arc;
22

3+
#[cfg(not(feature = "datafusion"))]
34
use arrow::array::{Array, StructArray};
5+
#[cfg(feature = "datafusion")]
6+
use datafusion::arrow::array::{Array, StructArray};
7+
48
use bytes::{BufMut, BytesMut};
59
use pgwire::api::results::FieldFormat;
610
use pgwire::error::PgWireResult;

0 commit comments

Comments
 (0)