Skip to content

Commit e605227

Browse files
authored
feat: include optional FormatOptions in arrow encode (#229)
* feat: include optional FormatOptions in arrow encode * chore: remove unused imports/exports * test: update duckdb example * feat: use FieldInfo in encode functions * feat: simplify Encoder trait function * fix: lint fix * feat: include FormatOptions in encoder api * chore: update pgwire to a released version
1 parent edcfc50 commit e605227

File tree

11 files changed

+216
-242
lines changed

11 files changed

+216
-242
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ bytes = "1.10.1"
1818
chrono = { version = "0.4", features = ["std"] }
1919
datafusion = { version = "50", default-features = false }
2020
futures = "0.3"
21-
pgwire = { version = "0.34", default-features = false }
21+
pgwire = { version = "0.35", default-features = false }
2222
postgres-types = "0.2"
2323
rust_decimal = { version = "1.39", features = ["db-postgres"] }
2424
tokio = { version = "1", default-features = false }

arrow-pg/examples/duckdb.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use pgwire::api::stmt::{NoopQueryParser, StoredStatement};
1919
use pgwire::api::{ClientInfo, PgWireServerHandlers, Type};
2020
use pgwire::error::{PgWireError, PgWireResult};
2121
use pgwire::tokio::process_socket;
22+
use pgwire::types::format::FormatOptions;
2223
use tokio::net::TcpListener;
2324

2425
pub struct DuckDBBackend {
@@ -45,7 +46,7 @@ impl AuthSource for DummyAuthSource {
4546

4647
#[async_trait]
4748
impl SimpleQueryHandler for DuckDBBackend {
48-
async fn do_query<C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response>>
49+
async fn do_query<C>(&self, client: &mut C, query: &str) -> PgWireResult<Vec<Response>>
4950
where
5051
C: ClientInfo + Unpin + Send + Sync,
5152
{
@@ -59,9 +60,12 @@ impl SimpleQueryHandler for DuckDBBackend {
5960
.query_arrow(params![])
6061
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
6162
let schema = ret.get_schema();
63+
let format_options = FormatOptions::from_client_metadata(client.metadata());
64+
6265
let header = Arc::new(arrow_schema_to_pg_fields(
6366
schema.as_ref(),
6467
&Format::UnifiedText,
68+
Some(Arc::new(format_options)),
6569
)?);
6670

6771
let header_ref = header.clone();
@@ -155,7 +159,7 @@ impl ExtendedQueryHandler for DuckDBBackend {
155159

156160
async fn do_query<C>(
157161
&self,
158-
_client: &mut C,
162+
client: &mut C,
159163
portal: &Portal<Self::Statement>,
160164
_max_rows: usize,
161165
) -> PgWireResult<Response>
@@ -178,9 +182,11 @@ impl ExtendedQueryHandler for DuckDBBackend {
178182
.query_arrow(params![])
179183
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
180184
let schema = ret.get_schema();
185+
let format_options = FormatOptions::from_client_metadata(client.metadata());
181186
let header = Arc::new(arrow_schema_to_pg_fields(
182187
schema.as_ref(),
183188
&Format::UnifiedText,
189+
Some(Arc::new(format_options)),
184190
)?);
185191

186192
let header_ref = header.clone();

arrow-pg/src/datatypes.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use pgwire::api::results::FieldInfo;
1010
use pgwire::api::Type;
1111
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
1212
use pgwire::messages::data::DataRow;
13+
use pgwire::types::format::FormatOptions;
1314
use postgres_types::Kind;
1415

1516
use crate::row_encoder::RowEncoder;
@@ -111,20 +112,25 @@ pub fn into_pg_type(arrow_type: &DataType) -> PgWireResult<Type> {
111112
})
112113
}
113114

114-
pub fn arrow_schema_to_pg_fields(schema: &Schema, format: &Format) -> PgWireResult<Vec<FieldInfo>> {
115+
pub fn arrow_schema_to_pg_fields(
116+
schema: &Schema,
117+
format: &Format,
118+
data_format_options: Option<Arc<FormatOptions>>,
119+
) -> PgWireResult<Vec<FieldInfo>> {
120+
let _ = data_format_options;
115121
schema
116122
.fields()
117123
.iter()
118124
.enumerate()
119125
.map(|(idx, f)| {
120126
let pg_type = into_pg_type(f.data_type())?;
121-
Ok(FieldInfo::new(
122-
f.name().into(),
123-
None,
124-
None,
125-
pg_type,
126-
format.format_for(idx),
127-
))
127+
let mut field_info =
128+
FieldInfo::new(f.name().into(), None, None, pg_type, format.format_for(idx));
129+
if let Some(data_format_options) = &data_format_options {
130+
field_info = field_info.with_format_options(data_format_options.clone());
131+
}
132+
133+
Ok(field_info)
128134
})
129135
.collect::<PgWireResult<Vec<FieldInfo>>>()
130136
}

arrow-pg/src/datatypes/df.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,22 @@ use pgwire::api::results::QueryResponse;
1313
use pgwire::api::Type;
1414
use pgwire::error::{PgWireError, PgWireResult};
1515
use pgwire::messages::data::DataRow;
16+
use pgwire::types::format::FormatOptions;
1617
use rust_decimal::prelude::ToPrimitive;
1718
use rust_decimal::Decimal;
1819

1920
use super::{arrow_schema_to_pg_fields, encode_recordbatch, into_pg_type};
2021

21-
pub async fn encode_dataframe(df: DataFrame, format: &Format) -> PgWireResult<QueryResponse> {
22-
let fields = Arc::new(arrow_schema_to_pg_fields(df.schema().as_arrow(), format)?);
22+
pub async fn encode_dataframe(
23+
df: DataFrame,
24+
format: &Format,
25+
data_format_options: Option<Arc<FormatOptions>>,
26+
) -> PgWireResult<QueryResponse> {
27+
let fields = Arc::new(arrow_schema_to_pg_fields(
28+
df.schema().as_arrow(),
29+
format,
30+
data_format_options,
31+
)?);
2332

2433
let recordbatch_stream = df
2534
.execute_stream()

0 commit comments

Comments
 (0)