Skip to content

Commit 9987c34

Browse files
youngsofunTCeason
andauthored
feat(query): load ndjson support ts with diff units. (#17239)
* chore: Split functions crate into multi crates (#17133) * chore: Split functions crate into multi crates * split new crate databend-functions-scalar-arithmetic databend-functions-scalar-arithmetic-modulo databend-functions-scalar-datetime databend-functions-scalar-decimal-utils * split numeric basic arithmetic and other bit arithmetic * fix ut * restore int/double arithmetic * fix * feat(query): load ndjson support ts with diff units. (#17203) * feat(query): load ndjson/csv/tsv support ts with diff units. * fix * refactor * refactor * fix clippy --------- Co-authored-by: TCeason <[email protected]>
1 parent 749610a commit 9987c34

File tree

42 files changed

+897
-472
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+897
-472
lines changed

Cargo.lock

Lines changed: 84 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ members = [
3636
"src/query/expression",
3737
"src/query/formats",
3838
"src/query/functions",
39+
"src/query/functions",
40+
"src/query/functions/src/scalars/mathematics",
41+
"src/query/functions/src/scalars/geographic",
42+
"src/query/functions/src/scalars/timestamp",
43+
"src/query/functions/src/scalars/numeric_basic_arithmetic",
44+
"src/query/functions/src/scalars/arithmetic",
3945
"src/query/management",
4046
"src/query/pipeline/core",
4147
"src/query/pipeline/sinks",
@@ -184,6 +190,12 @@ databend-enterprise-storage-quota = { path = "src/query/ee_features/storage_quot
184190
databend-enterprise-stream-handler = { path = "src/query/ee_features/stream_handler" }
185191
databend-enterprise-vacuum-handler = { path = "src/query/ee_features/vacuum_handler" }
186192
databend-enterprise-virtual-column = { path = "src/query/ee_features/virtual_column" }
193+
databend-functions-scalar-arithmetic = { path = "src/query/functions/src/scalars/arithmetic" }
194+
databend-functions-scalar-datetime = { path = "src/query/functions/src/scalars/timestamp" }
195+
databend-functions-scalar-decimal = { path = "src/query/functions/src/scalars/decimal" }
196+
databend-functions-scalar-geo = { path = "src/query/functions/src/scalars/geographic" }
197+
databend-functions-scalar-math = { path = "src/query/functions/src/scalars/mathematics" }
198+
databend-functions-scalar-numeric-basic-arithmetic = { path = "src/query/functions/src/scalars/numeric_basic_arithmetic" }
187199
databend-meta = { path = "src/meta/service" }
188200
databend-query = { path = "src/query/service" }
189201
databend-sqllogictests = { path = "tests/sqllogictests" }

src/query/formats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ databend-common-expression = { workspace = true }
1919
databend-common-io = { workspace = true }
2020
databend-common-meta-app = { workspace = true }
2121
databend-common-settings = { workspace = true }
22+
databend-functions-scalar-datetime = { workspace = true }
2223
databend-storages-common-blocks = { workspace = true }
2324
databend-storages-common-table-meta = { workspace = true }
2425

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io::Cursor;
16+
17+
use bstr::ByteSlice;
18+
use databend_common_exception::ErrorCode;
19+
use databend_common_exception::Result;
20+
use databend_common_expression::types::timestamp::clamp_timestamp;
21+
use databend_common_io::cursor_ext::read_num_text_exact;
22+
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
23+
use databend_common_io::cursor_ext::DateTimeResType;
24+
use databend_common_io::cursor_ext::ReadBytesExt;
25+
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;
26+
27+
use crate::InputCommonSettings;
28+
29+
pub(crate) fn read_timestamp(
30+
column: &mut Vec<i64>,
31+
data: &[u8],
32+
settings: &InputCommonSettings,
33+
) -> Result<()> {
34+
let ts = if !data.contains(&b'-') {
35+
int64_to_timestamp(read_num_text_exact(data)?)
36+
} else {
37+
let mut buffer_readr = Cursor::new(&data);
38+
let t = buffer_readr.read_timestamp_text(&settings.jiff_timezone)?;
39+
match t {
40+
DateTimeResType::Datetime(t) => {
41+
if !buffer_readr.eof() {
42+
let data = data.to_str().unwrap_or("not utf8");
43+
let msg = format!(
44+
"fail to deserialize timestamp, unexpected end at pos {} of {}",
45+
buffer_readr.position(),
46+
data
47+
);
48+
return Err(ErrorCode::BadBytes(msg));
49+
}
50+
let mut ts = t.timestamp().as_microsecond();
51+
clamp_timestamp(&mut ts);
52+
ts
53+
}
54+
_ => unreachable!(),
55+
}
56+
};
57+
column.push(ts);
58+
Ok(())
59+
}

src/query/formats/src/field_decoder/fast_values.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use databend_common_expression::types::decimal::DecimalSize;
3737
use databend_common_expression::types::nullable::NullableColumnBuilder;
3838
use databend_common_expression::types::number::Number;
3939
use databend_common_expression::types::string::StringColumnBuilder;
40-
use databend_common_expression::types::timestamp::clamp_timestamp;
4140
use databend_common_expression::types::AnyType;
4241
use databend_common_expression::types::MutableBitmap;
4342
use databend_common_expression::types::NumberColumnBuilder;
@@ -51,7 +50,6 @@ use databend_common_io::constants::NULL_BYTES_UPPER;
5150
use databend_common_io::constants::TRUE_BYTES_LOWER;
5251
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
5352
use databend_common_io::cursor_ext::BufferReadStringExt;
54-
use databend_common_io::cursor_ext::DateTimeResType;
5553
use databend_common_io::cursor_ext::ReadBytesExt;
5654
use databend_common_io::cursor_ext::ReadCheckPointExt;
5755
use databend_common_io::cursor_ext::ReadNumberExt;
@@ -62,9 +60,9 @@ use databend_common_io::prelude::FormatSettings;
6260
use databend_common_io::Interval;
6361
use jsonb::parse_value;
6462
use lexical_core::FromLexical;
65-
use num::cast::AsPrimitive;
6663
use num_traits::NumCast;
6764

65+
use crate::field_decoder::common::read_timestamp;
6866
use crate::FieldDecoder;
6967
use crate::InputCommonSettings;
7068

@@ -323,26 +321,7 @@ impl FastFieldDecoderValues {
323321
) -> Result<()> {
324322
let mut buf = Vec::new();
325323
self.read_string_inner(reader, &mut buf, positions)?;
326-
let mut buffer_readr = Cursor::new(&buf);
327-
let ts = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
328-
match ts {
329-
DateTimeResType::Datetime(ts) => {
330-
if !buffer_readr.eof() {
331-
let data = buf.to_str().unwrap_or("not utf8");
332-
let msg = format!(
333-
"fail to deserialize timestamp, unexpected end at pos {} of {}",
334-
buffer_readr.position(),
335-
data
336-
);
337-
return Err(ErrorCode::BadBytes(msg));
338-
}
339-
let mut micros = ts.timestamp().as_microsecond();
340-
clamp_timestamp(&mut micros);
341-
column.push(micros.as_());
342-
}
343-
_ => unreachable!(),
344-
}
345-
Ok(())
324+
read_timestamp(column, &buf, self.common_settings())
346325
}
347326

348327
fn read_array<R: AsRef<[u8]>>(

src/query/formats/src/field_decoder/json_ast.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use databend_common_io::cursor_ext::DateTimeResType;
4040
use databend_common_io::geography::geography_from_ewkt;
4141
use databend_common_io::geometry_from_ewkt;
4242
use databend_common_io::parse_bitmap;
43+
use databend_functions_scalar_datetime::datetime::int64_to_timestamp;
4344
use jiff::tz::TimeZone;
4445
use lexical_core::FromLexical;
4546
use num::cast::AsPrimitive;
@@ -296,8 +297,8 @@ impl FieldJsonAstDecoder {
296297
Ok(())
297298
}
298299
Value::Number(number) => match number.as_i64() {
299-
Some(mut n) => {
300-
clamp_timestamp(&mut n);
300+
Some(n) => {
301+
let n = int64_to_timestamp(n);
301302
column.push(n);
302303
Ok(())
303304
}

src/query/formats/src/field_decoder/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod common;
1516
mod fast_values;
1617
mod json_ast;
1718
mod nested;

src/query/formats/src/field_decoder/nested.rs

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use databend_common_expression::types::decimal::DecimalSize;
3232
use databend_common_expression::types::nullable::NullableColumnBuilder;
3333
use databend_common_expression::types::number::Number;
3434
use databend_common_expression::types::string::StringColumnBuilder;
35-
use databend_common_expression::types::timestamp::clamp_timestamp;
3635
use databend_common_expression::types::AnyType;
3736
use databend_common_expression::types::MutableBitmap;
3837
use databend_common_expression::types::NumberColumnBuilder;
@@ -45,7 +44,6 @@ use databend_common_io::constants::NULL_BYTES_UPPER;
4544
use databend_common_io::constants::TRUE_BYTES_LOWER;
4645
use databend_common_io::cursor_ext::BufferReadDateTimeExt;
4746
use databend_common_io::cursor_ext::BufferReadStringExt;
48-
use databend_common_io::cursor_ext::DateTimeResType;
4947
use databend_common_io::cursor_ext::ReadBytesExt;
5048
use databend_common_io::cursor_ext::ReadCheckPointExt;
5149
use databend_common_io::cursor_ext::ReadNumberExt;
@@ -57,6 +55,7 @@ use jsonb::parse_value;
5755
use lexical_core::FromLexical;
5856

5957
use crate::binary::decode_binary;
58+
use crate::field_decoder::common::read_timestamp;
6059
use crate::FileFormatOptionsExt;
6160
use crate::InputCommonSettings;
6261

@@ -67,9 +66,9 @@ pub struct NestedValues {
6766

6867
impl NestedValues {
6968
/// Consider map/tuple/array as a private object format like JSON.
70-
/// Currently we assume it as a fixed format, embed it in "strings" of other formats.
71-
/// So we can used the same code to encode/decode in clients.
72-
/// It maybe need to be configurable in future,
69+
/// Currently, we assume it as a fixed format, embed it in "strings" of other formats.
70+
/// So we can use the same code to encode/decode in clients.
71+
/// It maybe needs to be configurable in the future,
7372
/// to read data from other DB which also support map/tuple/array.
7473
pub fn create(options_ext: &FileFormatOptionsExt) -> Self {
7574
NestedValues {
@@ -281,30 +280,7 @@ impl NestedValues {
281280
) -> Result<()> {
282281
let mut buf = Vec::new();
283282
self.read_string_inner(reader, &mut buf)?;
284-
let mut buffer_readr = Cursor::new(&buf);
285-
let mut ts = if !buf.contains(&b'-') {
286-
buffer_readr.read_num_text_exact()?
287-
} else {
288-
let t = buffer_readr.read_timestamp_text(&self.common_settings().jiff_timezone)?;
289-
match t {
290-
DateTimeResType::Datetime(t) => {
291-
if !buffer_readr.eof() {
292-
let data = buf.to_str().unwrap_or("not utf8");
293-
let msg = format!(
294-
"fail to deserialize timestamp, unexpected end at pos {} of {}",
295-
buffer_readr.position(),
296-
data
297-
);
298-
return Err(ErrorCode::BadBytes(msg));
299-
}
300-
t.timestamp().as_microsecond()
301-
}
302-
_ => unreachable!(),
303-
}
304-
};
305-
clamp_timestamp(&mut ts);
306-
column.push(ts);
307-
Ok(())
283+
read_timestamp(column, &buf, self.common_settings())
308284
}
309285

310286
fn read_bitmap<R: AsRef<[u8]>>(

0 commit comments

Comments
 (0)