Skip to content

Commit b4c5d82

Browse files
authored
feat: arrow body format support settings geometry_output_format. (#716)
* ci: make string format of decimal same with http handler. * feat: arrow body format support settings geometry_output_format. * fix * fix * fix
1 parent 14cf166 commit b4c5d82

File tree

15 files changed

+337
-97
lines changed

15 files changed

+337
-97
lines changed

bindings/nodejs/tests/binding.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ Then("Select types should be expected native types", async function () {
216216
},
217217
]);
218218
}
219+
219220
// TimestampTz
220221
if (!(DRIVER_VERSION > [0, 30, 3] && DB_VERSION >= [1, 2, 836])) {
221222
const row = await this.conn.queryRow(`SELECT to_datetime_tz('2024-04-16 12:34:56.789 +0800'))`);

bindings/python/tests/blocking/steps/binding.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ def _(context):
185185
f"timestamp_tz: {row.values()[0]} {exp_bug}"
186186
)
187187

188+
if DRIVER_VERSION > (0, 31, 0) and DB_VERSION > (1, 2, 841):
189+
row = context.conn.query_row("SELECT st_point(60,37)")
190+
assert row.values()[0] == '{"type": "Point", "coordinates": [60,37]}', (
191+
f"geography: {row.values()}"
192+
)
193+
row = context.conn.query_row(
194+
"settings(geometry_output_format='EWKT') SELECT st_point(60,37)"
195+
)
196+
assert row.values()[0] == "POINT(60 37)", f"geography: {row.values()}"
197+
188198

189199
@then("Select numbers should iterate all rows")
190200
def _(context):

core/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod session;
3030
mod stage;
3131

3232
pub mod schema;
33+
mod settings;
3334

3435
pub use auth::SensitiveString;
3536
pub use client::APIClient;
@@ -41,4 +42,6 @@ pub use presign::presign_upload_to_stage;
4142
pub use presign::PresignedResponse;
4243
pub use response::QueryStats;
4344
pub use response::SchemaField;
45+
pub use settings::GeometryDataType;
46+
pub use settings::ResultFormatSettings;
4447
pub use stage::StageLocation;

core/src/pages.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@ use crate::client::QueryState;
1616
use crate::error::Result;
1717
use crate::response::QueryResponse;
1818
use crate::schema::Schema;
19+
use crate::settings::ResultFormatSettings;
1920
use crate::{APIClient, Error, QueryStats, SchemaField};
2021
use arrow_array::RecordBatch;
21-
use chrono_tz::Tz;
2222
use log::debug;
2323
use parking_lot::Mutex;
2424
use std::collections::BTreeMap;
2525
use std::future::Future;
2626
use std::mem;
2727
use std::pin::Pin;
28-
use std::str::FromStr;
2928
use std::sync::Arc;
3029
use std::task::{Context, Poll};
3130
use std::time::Instant;
@@ -110,7 +109,10 @@ impl Pages {
110109
self.first_page = Some(page);
111110
}
112111

113-
pub async fn wait_for_schema(mut self, need_progress: bool) -> Result<(Self, Schema, Tz)> {
112+
pub async fn wait_for_schema(
113+
mut self,
114+
need_progress: bool,
115+
) -> Result<(Self, Schema, ResultFormatSettings)> {
114116
while let Some(page) = self.next().await {
115117
let page = page?;
116118
if !page.raw_schema.is_empty()
@@ -128,13 +130,8 @@ impl Pages {
128130
s.try_into()
129131
.map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
130132
};
131-
let utc = "UTC".to_owned();
132-
let timezone = page
133-
.settings
134-
.as_ref()
135-
.and_then(|m| m.get("timezone"))
136-
.unwrap_or(&utc);
137-
let timezone = Tz::from_str(timezone).map_err(|e| Error::Decode(e.to_string()))?;
133+
let settings = ResultFormatSettings::from_map(&page.settings)?;
134+
138135
self.add_back(page);
139136
let last_access_time = self.last_access_time.clone();
140137
if let Some(node_id) = &self.node_id {
@@ -146,10 +143,10 @@ impl Pages {
146143
self.client
147144
.register_query_for_heartbeat(&self.query_id, state)
148145
}
149-
return Ok((self, schema, timezone));
146+
return Ok((self, schema, settings));
150147
}
151148
}
152-
Ok((self, Schema::default(), Tz::UTC))
149+
Ok((self, Schema::default(), ResultFormatSettings::default()))
153150
}
154151
}
155152

core/src/settings.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::error::Result;
16+
use crate::Error;
17+
use chrono_tz::Tz;
18+
use serde::Deserialize;
19+
use std::collections::BTreeMap;
20+
use std::str::FromStr;
21+
22+
#[derive(Debug, Clone, Default, Copy)]
23+
pub struct ResultFormatSettings {
24+
pub geometry_output_format: GeometryDataType,
25+
pub timezone: Tz,
26+
}
27+
28+
impl ResultFormatSettings {
29+
pub fn from_map(settings: &Option<BTreeMap<String, String>>) -> Result<Self> {
30+
match settings {
31+
None => Ok(Default::default()),
32+
Some(settings) => {
33+
let timezone = match settings.get("timezone") {
34+
None => Tz::default(),
35+
Some(t) => Tz::from_str(t).map_err(|e| Error::Decode(e.to_string()))?,
36+
};
37+
38+
let geometry_output_format = match settings.get("geometry_output_format") {
39+
None => GeometryDataType::default(),
40+
Some(t) => {
41+
GeometryDataType::from_str(t).map_err(|e| Error::Decode(e.to_string()))?
42+
}
43+
};
44+
45+
Ok(Self {
46+
timezone,
47+
geometry_output_format,
48+
})
49+
}
50+
}
51+
}
52+
}
53+
54+
#[derive(Debug, Clone, Copy, Default, Deserialize)]
55+
pub enum GeometryDataType {
56+
WKB,
57+
WKT,
58+
EWKB,
59+
EWKT,
60+
#[default]
61+
GEOJSON,
62+
}
63+
64+
impl FromStr for GeometryDataType {
65+
type Err = Error;
66+
67+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
68+
match s.to_uppercase().as_str() {
69+
"WKB" => Ok(GeometryDataType::WKB),
70+
"WKT" => Ok(GeometryDataType::WKT),
71+
"EWKB" => Ok(GeometryDataType::EWKB),
72+
"EWKT" => Ok(GeometryDataType::EWKT),
73+
"GEOJSON" => Ok(GeometryDataType::GEOJSON),
74+
_ => Err(Error::Decode("Invalid geometry type format".to_string())),
75+
}
76+
}
77+
}

driver/src/flight_sql.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use arrow_flight::sql::client::FlightSqlServiceClient;
2525
use arrow_flight::utils::flight_data_to_arrow_batch;
2626
use arrow_schema::SchemaRef as ArrowSchemaRef;
2727
use async_trait::async_trait;
28-
use chrono_tz::Tz;
2928
use percent_encoding::percent_decode_str;
3029
use tokio::sync::Mutex;
3130
use tokio_stream::{Stream, StreamExt};
@@ -34,9 +33,9 @@ use url::Url;
3433

3534
use crate::client::LoadMethod;
3635
use crate::conn::{ConnectionInfo, IConnection, Reader};
37-
use databend_client::presign_upload_to_stage;
3836
use databend_client::schema::Schema;
3937
use databend_client::SensitiveString;
38+
use databend_client::{presign_upload_to_stage, ResultFormatSettings};
4039
use databend_driver_core::error::{Error, Result};
4140
use databend_driver_core::rows::{
4241
Row, RowIterator, RowStatsIterator, RowWithStats, Rows, ServerStats,
@@ -373,7 +372,7 @@ impl Stream for FlightSQLRows {
373372
self.schema.clone(),
374373
&dicitionaries_by_id,
375374
)?;
376-
let rows = Rows::try_from((batch, Tz::UTC))?;
375+
let rows = Rows::try_from((batch, ResultFormatSettings::default()))?;
377376
self.rows.extend(rows);
378377
self.poll_next(cx)
379378
}

driver/src/rest_api.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use tokio_stream::Stream;
2929
use crate::client::LoadMethod;
3030
use crate::conn::{ConnectionInfo, IConnection, Reader};
3131
use databend_client::schema::{Schema, SchemaRef};
32-
use databend_client::APIClient;
3332
use databend_client::Pages;
33+
use databend_client::{APIClient, ResultFormatSettings};
3434
use databend_driver_core::error::{Error, Result};
3535
use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats};
3636
use databend_driver_core::rows::{
@@ -296,7 +296,7 @@ pub struct RestAPIRows<T> {
296296
pages: Pages,
297297

298298
schema: SchemaRef,
299-
timezone: Tz,
299+
settings: ResultFormatSettings,
300300

301301
data: VecDeque<Vec<Option<String>>>,
302302
rows: VecDeque<Row>,
@@ -308,11 +308,11 @@ pub struct RestAPIRows<T> {
308308

309309
impl<T> RestAPIRows<T> {
310310
async fn from_pages(pages: Pages) -> Result<(Schema, Self)> {
311-
let (pages, schema, timezone) = pages.wait_for_schema(true).await?;
311+
let (pages, schema, settings) = pages.wait_for_schema(true).await?;
312312
let rows = Self {
313313
pages,
314314
schema: Arc::new(schema.clone()),
315-
timezone,
315+
settings,
316316
data: Default::default(),
317317
rows: Default::default(),
318318
stats: None,
@@ -333,7 +333,7 @@ impl<T: FromRowStats + std::marker::Unpin> Stream for RestAPIRows<T> {
333333
// Therefore, we could guarantee the `/final` called before the last row.
334334
if self.data.len() > 1 {
335335
if let Some(row) = self.data.pop_front() {
336-
let row = T::try_from_raw_row(row, self.schema.clone(), self.timezone)?;
336+
let row = T::try_from_raw_row(row, self.schema.clone(), self.settings.timezone)?;
337337
return Poll::Ready(Some(Ok(row)));
338338
}
339339
} else if self.rows.len() > 1 {
@@ -357,7 +357,7 @@ impl<T: FromRowStats + std::marker::Unpin> Stream for RestAPIRows<T> {
357357
self.data.append(&mut new_data);
358358
} else {
359359
for batch in page.batches.into_iter() {
360-
let rows = Rows::try_from((batch, self.timezone))?;
360+
let rows = Rows::try_from((batch, self.settings))?;
361361
self.rows.extend(rows);
362362
}
363363
}
@@ -369,7 +369,8 @@ impl<T: FromRowStats + std::marker::Unpin> Stream for RestAPIRows<T> {
369369
let row = T::from_row(row);
370370
Poll::Ready(Some(Ok(row)))
371371
} else if let Some(row) = self.data.pop_front() {
372-
let row = T::try_from_raw_row(row, self.schema.clone(), self.timezone)?;
372+
let row =
373+
T::try_from_raw_row(row, self.schema.clone(), self.settings.timezone)?;
373374
Poll::Ready(Some(Ok(row)))
374375
} else {
375376
Poll::Ready(None)

sql/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@ flight-sql = ["dep:tonic"]
1616
[dependencies]
1717
arrow = { workspace = true }
1818
arrow-array = { workspace = true }
19-
arrow-buffer = { workspace = true }
2019
arrow-schema = { workspace = true }
2120
chrono = { workspace = true }
2221
chrono-tz = { workspace = true }
22+
ethnum = "1.5.1"
2323
databend-client = { workspace = true }
2424
jsonb = { workspace = true }
2525
tokio-stream = { workspace = true }
2626
tonic = { workspace = true, optional = true }
2727

28-
geozero = { version = "0.14.0", features = ["with-wkb"] }
28+
geo = { version = "0.28.0", features = ["use-serde"] }
29+
geozero = { version = "0.14.0", features = ["with-geo", "with-geojson", "with-wkb", "with-wkt"] }
2930
glob = "0.3"
3031
hex = "0.4.3"
3132
itertools = "0.14"

sql/src/rows.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::value::Value;
2424
use arrow::record_batch::RecordBatch;
2525
use chrono_tz::Tz;
2626
use databend_client::schema::SchemaRef;
27+
use databend_client::ResultFormatSettings;
2728

2829
#[derive(Clone, Debug)]
2930
pub enum RowWithStats {
@@ -182,9 +183,9 @@ impl Rows {
182183
}
183184
}
184185

185-
impl TryFrom<(RecordBatch, Tz)> for Rows {
186+
impl TryFrom<(RecordBatch, ResultFormatSettings)> for Rows {
186187
type Error = Error;
187-
fn try_from((batch, ltz): (RecordBatch, Tz)) -> Result<Self> {
188+
fn try_from((batch, settings): (RecordBatch, ResultFormatSettings)) -> Result<Self> {
188189
let batch_schema = batch.schema();
189190
let schema = SchemaRef::new(batch_schema.clone().try_into()?);
190191
let mut rows: Vec<Row> = Vec::new();
@@ -193,7 +194,7 @@ impl TryFrom<(RecordBatch, Tz)> for Rows {
193194
for j in 0..batch_schema.fields().len() {
194195
let v = batch.column(j);
195196
let field = batch_schema.field(j);
196-
let value = Value::try_from((field, v, i, ltz))?;
197+
let value = Value::try_from((field, v, i, settings))?;
197198
values.push(value);
198199
}
199200
rows.push(Row::new(schema.clone(), values));

0 commit comments

Comments
 (0)