Skip to content

Commit 2c82432

Browse files
committed
feat: use FieldInfo in encode functions
1 parent 7dbd7b9 commit 2c82432

File tree

6 files changed

+36
-21
lines changed

6 files changed

+36
-21
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ chrono = { version = "0.4", features = ["std"] }
2020
datafusion = { version = "50", default-features = false }
2121
futures = "0.3"
2222
#pgwire = { version = "0.34", default-features = false }
23-
pgwire = { git = "https://github.com/sunng87/pgwire", rev = "d89089947a56ebfe2e89631925facdd7a85c25b4", default-features = false }
23+
pgwire = { git = "https://github.com/sunng87/pgwire", rev = "37a32a05d2aed55bd013f3c6f93d786368350e0b", default-features = false }
2424
postgres-types = "0.2"
2525
rust_decimal = { version = "1.39", features = ["db-postgres"] }
2626
tokio = { version = "1", default-features = false }

arrow-pg/src/encoder.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use chrono::{NaiveDate, NaiveDateTime};
1212
use datafusion::arrow::{array::*, datatypes::*};
1313
use pgwire::api::results::DataRowEncoder;
1414
use pgwire::api::results::FieldFormat;
15+
use pgwire::api::results::FieldInfo;
1516
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
1617
use pgwire::types::ToSqlText;
1718
use postgres_types::{ToSql, Type};
@@ -291,9 +292,11 @@ pub fn encode_value<T: Encoder>(
291292
encoder: &mut T,
292293
arr: &Arc<dyn Array>,
293294
idx: usize,
294-
type_: &Type,
295-
format: FieldFormat,
295+
pg_field: &FieldInfo,
296296
) -> PgWireResult<()> {
297+
let type_ = pg_field.datatype();
298+
let format = pg_field.format();
299+
297300
match arr.data_type() {
298301
DataType::Null => encoder.encode_field_with_type_and_format(&None::<i8>, type_, format)?,
299302
DataType::Boolean => {
@@ -413,6 +416,7 @@ pub fn encode_value<T: Encoder>(
413416
let value = ts_array
414417
.value_as_datetime_with_tz(idx, tz)
415418
.map(|d| d.fixed_offset());
419+
416420
encoder.encode_field_with_type_and_format(&value, type_, format)?;
417421
} else {
418422
let value = ts_array.value_as_datetime(idx);
@@ -494,7 +498,7 @@ pub fn encode_value<T: Encoder>(
494498
return encoder.encode_field_with_type_and_format(&None::<&[i8]>, type_, format);
495499
}
496500
let array = arr.as_any().downcast_ref::<ListArray>().unwrap().value(idx);
497-
let value = encode_list(array, type_, format)?;
501+
let value = encode_list(array, pg_field)?;
498502
encoder.encode_field_with_type_and_format(&value, type_, format)?
499503
}
500504
DataType::Struct(_) => {
@@ -506,7 +510,7 @@ pub fn encode_value<T: Encoder>(
506510
))));
507511
}
508512
};
509-
let value = encode_struct(arr, idx, fields, format)?;
513+
let value = encode_struct(arr, idx, fields, pg_field)?;
510514
encoder.encode_field_with_type_and_format(&value, type_, format)?
511515
}
512516
DataType::Dictionary(_, value_type) => {
@@ -537,7 +541,7 @@ pub fn encode_value<T: Encoder>(
537541
))
538542
})?;
539543

540-
encode_value(encoder, values, idx, type_, format)?
544+
encode_value(encoder, values, idx, pg_field)?
541545
}
542546
_ => {
543547
return Err(PgWireError::ApiError(ToSqlError::from(format!(
@@ -588,7 +592,8 @@ mod tests {
588592

589593
let mut encoder = MockEncoder::default();
590594

591-
let result = encode_value(&mut encoder, &dict_arr, 2, &Type::TEXT, FieldFormat::Text);
595+
let pg_field = FieldInfo::new("x".to_string(), None, None, Type::TEXT, FieldFormat::Text);
596+
let result = encode_value(&mut encoder, &dict_arr, 2, &pg_field);
592597

593598
assert!(result.is_ok());
594599

arrow-pg/src/list_encoder.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion::arrow::{
3737

3838
use bytes::{BufMut, BytesMut};
3939
use chrono::{DateTime, TimeZone, Utc};
40-
use pgwire::api::results::FieldFormat;
40+
use pgwire::api::results::{FieldFormat, FieldInfo};
4141
use pgwire::error::{PgWireError, PgWireResult};
4242
use pgwire::types::{ToSqlText, QUOTE_ESCAPE};
4343
use postgres_types::{ToSql, Type};
@@ -106,11 +106,10 @@ fn encode_field<T: ToSql + ToSqlText>(
106106
Ok(EncodedValue { bytes })
107107
}
108108

109-
pub(crate) fn encode_list(
110-
arr: Arc<dyn Array>,
111-
type_: &Type,
112-
format: FieldFormat,
113-
) -> PgWireResult<EncodedValue> {
109+
pub(crate) fn encode_list(arr: Arc<dyn Array>, pg_field: &FieldInfo) -> PgWireResult<EncodedValue> {
110+
let type_ = pg_field.datatype();
111+
let format = pg_field.format();
112+
114113
match arr.data_type() {
115114
DataType::Null => {
116115
let mut bytes = BytesMut::new();
@@ -406,7 +405,7 @@ pub(crate) fn encode_list(
406405
.map_err(ToSqlError::from)?;
407406

408407
let values: PgWireResult<Vec<_>> = (0..arr.len())
409-
.map(|row| encode_struct(&arr, row, fields, format))
408+
.map(|row| encode_struct(&arr, row, fields, pg_field))
410409
.map(|x| {
411410
if matches!(format, FieldFormat::Text) {
412411
x.map(|opt| {

arrow-pg/src/row_encoder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ impl RowEncoder {
3737
for col in 0..self.rb.num_columns() {
3838
let array = self.rb.column(col);
3939
let field = &self.fields[col];
40-
let type_ = field.datatype();
41-
let format = field.format();
42-
encode_value(&mut encoder, array, self.curr_idx, type_, format).unwrap();
40+
41+
encode_value(&mut encoder, array, self.curr_idx, &field).unwrap();
4342
}
4443
self.curr_idx += 1;
4544
Some(encoder.finish())

arrow-pg/src/struct_encoder.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use arrow::array::{Array, StructArray};
66
use datafusion::arrow::array::{Array, StructArray};
77

88
use bytes::{BufMut, BytesMut};
9-
use pgwire::api::results::FieldFormat;
9+
use pgwire::api::results::{FieldFormat, FieldInfo};
1010
use pgwire::error::PgWireResult;
1111
use pgwire::types::{ToSqlText, QUOTE_CHECK, QUOTE_ESCAPE};
1212
use postgres_types::{Field, IsNull, ToSql, Type};
@@ -17,7 +17,7 @@ pub(crate) fn encode_struct(
1717
arr: &Arc<dyn Array>,
1818
idx: usize,
1919
fields: &[Field],
20-
format: FieldFormat,
20+
parent_pg_field_info: &FieldInfo,
2121
) -> PgWireResult<Option<EncodedValue>> {
2222
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
2323
if arr.is_null(idx) {
@@ -27,7 +27,19 @@ pub(crate) fn encode_struct(
2727
for (i, arr) in arr.columns().iter().enumerate() {
2828
let field = &fields[i];
2929
let type_ = field.type_();
30-
encode_value(&mut row_encoder, arr, idx, type_, format).unwrap();
30+
31+
let mut pg_field = FieldInfo::new(
32+
field.name().to_string(),
33+
None,
34+
None,
35+
type_.clone(),
36+
parent_pg_field_info.format(),
37+
);
38+
if let Some(format_options) = parent_pg_field_info.format_options() {
39+
pg_field = pg_field.with_format_options(format_options);
40+
}
41+
42+
encode_value(&mut row_encoder, arr, idx, &pg_field).unwrap();
3143
}
3244
Ok(Some(EncodedValue {
3345
bytes: row_encoder.row_buffer,

0 commit comments

Comments
 (0)