Skip to content

Commit 171269a

Browse files
authored
feat: support select from AVRO files. (#17953)
* feat: support select from avro. * add unit tests.
1 parent e0b608d commit 171269a

File tree

5 files changed

+289
-45
lines changed

5 files changed

+289
-45
lines changed

src/query/service/src/sessions/query_ctx.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,7 +1671,7 @@ impl TableContext for QueryContext {
16711671
};
16721672
OrcTable::try_create(info).await
16731673
}
1674-
FileFormatParams::NdJson(..) => {
1674+
FileFormatParams::NdJson(..) | FileFormatParams::Avro(..) => {
16751675
let schema = Arc::new(TableSchema::new(vec![TableField::new(
16761676
"_$1", // TODO: this name should be in visible
16771677
TableDataType::Variant,
@@ -1731,7 +1731,7 @@ impl TableContext for QueryContext {
17311731
}
17321732
_ => {
17331733
return Err(ErrorCode::Unimplemented(format!(
1734-
"The file format in the query stage is not supported. Currently supported formats are: Parquet, NDJson, CSV, and TSV. Provided format: '{}'.",
1734+
"The file format in the query stage is not supported. Currently supported formats are: Parquet, NDJson, AVRO, CSV, and TSV. Provided format: '{}'.",
17351735
stage_info.file_format_params
17361736
)));
17371737
}

src/query/storages/stage/src/read/avro/avro_to_jsonb.rs

Lines changed: 212 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,54 +15,233 @@
1515
use std::collections::BTreeMap;
1616

1717
use apache_avro::types::Value;
18+
use apache_avro::Schema;
19+
use databend_common_expression::types::i256;
20+
use databend_common_expression::types::Decimal;
21+
use databend_common_expression::types::MAX_DECIMAL128_PRECISION;
22+
use databend_common_expression::types::MAX_DECIMAL256_PRECISION;
23+
use num_bigint::BigInt;
1824

19-
pub(super) fn to_jsonb(value: &Value) -> Result<jsonb::Value, String> {
20-
let jvalue = match value {
21-
Value::Null => jsonb::Value::Null,
22-
Value::Boolean(v) => jsonb::Value::Bool(*v),
23-
Value::Int(v) => jsonb::Value::from(*v),
24-
Value::Long(v) => jsonb::Value::from(*v),
25-
Value::Float(v) => jsonb::Value::from(*v),
26-
Value::Double(v) => jsonb::Value::from(*v),
27-
Value::String(v) => jsonb::Value::from(v.as_str()),
28-
Value::Enum(_, v) => jsonb::Value::from(v.as_str()),
29-
Value::Uuid(v) => jsonb::Value::from(v.to_string()),
30-
Value::Union(_, v) => to_jsonb(v)?,
31-
Value::Array(v) => {
25+
pub(super) fn to_jsonb<'a>(value: &'a Value, schema: &Schema) -> Result<jsonb::Value<'a>, String> {
26+
let jvalue = match (value, schema) {
27+
// primaries
28+
(Value::Null, Schema::Null) => jsonb::Value::Null,
29+
(Value::Boolean(v), Schema::Boolean) => jsonb::Value::Bool(*v),
30+
(Value::Int(v), Schema::Int) => jsonb::Value::from(*v),
31+
(Value::Long(v), Schema::Long) => jsonb::Value::from(*v),
32+
(Value::Float(v), Schema::Float) => jsonb::Value::from(*v),
33+
(Value::Double(v), Schema::Double) => jsonb::Value::from(*v),
34+
(Value::String(v), Schema::String) => jsonb::Value::from(v.as_str()),
35+
(Value::Enum(_, v), Schema::Enum(_)) => jsonb::Value::from(v.as_str()),
36+
(Value::Uuid(v), Schema::Uuid) => jsonb::Value::from(v.to_string()),
37+
(Value::Bytes(v), Schema::Bytes) | (Value::Fixed(_, v), Schema::Fixed(_)) => {
38+
jsonb::Value::Binary(v)
39+
}
40+
(Value::Decimal(d), Schema::Decimal(schema)) => {
41+
let big_int = <BigInt>::from(d.to_owned());
42+
jsonb::Value::Number(convert_decimal(schema.precision, schema.scale, big_int)?)
43+
}
44+
(Value::BigDecimal(d), Schema::BigDecimal) => {
45+
let precision = d.digits() as usize;
46+
let (big_int, scale) = d.clone().into_bigint_and_exponent();
47+
jsonb::Value::Number(convert_decimal(precision, scale as usize, big_int)?)
48+
}
49+
(Value::Date(d), Schema::Date) => jsonb::Value::Date(jsonb::Date { value: *d }),
50+
(Value::TimeMillis(t), Schema::TimeMillis) => {
51+
jsonb::Value::Number(jsonb::Number::Int64(*t as i64))
52+
}
53+
(Value::TimeMicros(t), Schema::TimeMicros) => {
54+
jsonb::Value::Number(jsonb::Number::Int64(*t))
55+
}
56+
(Value::TimestampMillis(v), Schema::TimestampMillis)
57+
| (Value::LocalTimestampMillis(v), Schema::LocalTimestampMillis) => {
58+
jsonb::Value::Timestamp(jsonb::Timestamp {
59+
value: (*v) * 1_000_000,
60+
})
61+
}
62+
(Value::LocalTimestampMicros(v), Schema::LocalTimestampMicros) => {
63+
jsonb::Value::Timestamp(jsonb::Timestamp {
64+
value: (*v) * 1_000,
65+
})
66+
}
67+
(Value::LocalTimestampNanos(v), Schema::LocalTimestampNanos) => {
68+
jsonb::Value::Timestamp(jsonb::Timestamp { value: (*v) })
69+
}
70+
(Value::Duration(d), Schema::Duration) => {
71+
let months: u32 = d.months().into();
72+
let days: u32 = d.days().into();
73+
let millis: u32 = d.millis().into();
74+
jsonb::Value::Interval(jsonb::Interval {
75+
months: months as i32,
76+
days: days as i32,
77+
micros: (millis * 1000) as i64,
78+
})
79+
}
80+
81+
// container
82+
(Value::Union(i, v), Schema::Union(union_schema)) => {
83+
to_jsonb(v, &union_schema.variants()[(*i) as usize])?
84+
}
85+
(Value::Array(v), Schema::Array(array_schema)) => {
3286
let mut array = Vec::with_capacity(v.len());
3387
for v in v {
34-
array.push(to_jsonb(v)?)
88+
array.push(to_jsonb(v, &array_schema.items)?)
3589
}
3690
jsonb::Value::Array(array)
3791
}
38-
Value::Map(v) => {
92+
(Value::Map(v), Schema::Map(map_schema)) => {
3993
let mut array = Vec::with_capacity(v.len());
4094
for (k, v) in v {
41-
array.push((k.clone(), to_jsonb(v)?));
95+
array.push((k.clone(), to_jsonb(v, &map_schema.types)?));
4296
}
4397
jsonb::Value::Object(BTreeMap::from_iter(array))
4498
}
45-
Value::Record(v) => {
99+
(Value::Record(v), Schema::Record(record_schema)) => {
46100
let mut array = Vec::with_capacity(v.len());
47-
for (k, v) in v {
48-
array.push((k.clone(), to_jsonb(v)?));
101+
for (i, (k, v)) in v.iter().enumerate() {
102+
array.push((k.clone(), to_jsonb(v, &record_schema.fields[i].schema)?));
49103
}
50104
jsonb::Value::Object(BTreeMap::from_iter(array))
51105
}
52-
_ => return Err(format!("Cannot convert {:?} to JSONB", value)),
53-
// Value::Bytes(_v) | Value::Fixed(_, _v) => {}
54-
// Value::Date(_) => {}
55-
// Value::Decimal(_) => {}
56-
// Value::BigDecimal(_) => {}
57-
// Value::TimeMillis(_) => {}
58-
// Value::TimeMicros(_) => {}
59-
// Value::TimestampMillis(_) => {}
60-
// Value::TimestampMicros(_) => {}
61-
// Value::TimestampNanos(_) => {}
62-
// Value::LocalTimestampMillis(_) => {}
63-
// Value::LocalTimestampMicros(_) => {}
64-
// Value::LocalTimestampNanos(_) => {}
65-
// Value::Duration(_) => {}
106+
_ => {
107+
return Err(format!(
108+
"bug: avro schema and value not match: schema = {:?}, value = {:?}",
109+
schema, value
110+
))
111+
}
66112
};
67113
Ok(jvalue)
68114
}
115+
116+
fn convert_decimal(
117+
precision: usize,
118+
scale: usize,
119+
big_int: BigInt,
120+
) -> Result<jsonb::Number, String> {
121+
let max_128 = MAX_DECIMAL128_PRECISION as usize;
122+
let max_256 = MAX_DECIMAL256_PRECISION as usize;
123+
if precision <= max_128 {
124+
Ok(jsonb::Number::Decimal128(jsonb::Decimal128 {
125+
precision: precision as u8,
126+
scale: scale as u8,
127+
value: i128::from_bigint(big_int).ok_or("too many bits for i128".to_string())?,
128+
}))
129+
} else if precision <= max_256 {
130+
Ok(jsonb::Number::Decimal256(jsonb::Decimal256 {
131+
precision: precision as u8,
132+
scale: scale as u8,
133+
value: i256::from_bigint(big_int)
134+
.ok_or("too many bits for i256".to_string())?
135+
.0,
136+
}))
137+
} else {
138+
return Err(format!("Decimal precision too large: {}", precision));
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use std::ops::Mul;
145+
use std::str::FromStr;
146+
147+
use apache_avro::schema::DecimalSchema;
148+
use apache_avro::Schema;
149+
use databend_common_expression::types::i256;
150+
use databend_common_expression::types::Decimal;
151+
use num_bigint::BigInt;
152+
153+
use crate::read::avro::avro_to_jsonb::to_jsonb;
154+
155+
fn create_avro_decimal(
156+
bigint: &str,
157+
precision: usize,
158+
scale: usize,
159+
) -> (apache_avro::types::Value, Schema) {
160+
let avro_schema = Schema::Decimal(DecimalSchema {
161+
precision,
162+
scale,
163+
inner: Box::new(Schema::Null),
164+
});
165+
let big_int = BigInt::from_str(bigint).unwrap();
166+
let value = apache_avro::types::Value::Decimal(apache_avro::Decimal::from(
167+
big_int.to_signed_bytes_be(),
168+
));
169+
(value, avro_schema)
170+
}
171+
172+
fn create_avro_big_decimal(bigint: &str, scale: usize) -> (apache_avro::types::Value, Schema) {
173+
let avro_schema = Schema::BigDecimal;
174+
let big_int = BigInt::from_str(bigint).unwrap();
175+
let value = apache_avro::types::Value::BigDecimal(apache_avro::BigDecimal::new(
176+
big_int,
177+
scale as i64,
178+
));
179+
(value, avro_schema)
180+
}
181+
182+
#[test]
183+
fn test_decimal_128_ok() {
184+
let cases = vec![
185+
(7, 4, 1234567i128, 7),
186+
(7, 4, 123456i128, 6),
187+
(38, 10, i128::MAX, 38),
188+
];
189+
for (p, s, v, digits) in cases {
190+
let (value, schema) = create_avro_decimal(&v.to_string(), p, s);
191+
let jsonb_value = to_jsonb(&value, &schema).unwrap();
192+
let expected = jsonb::Value::Number(jsonb::Number::Decimal128(jsonb::Decimal128 {
193+
precision: p as u8,
194+
scale: s as u8,
195+
value: v,
196+
}));
197+
assert_eq!(jsonb_value, expected);
198+
199+
let (value, schema) = create_avro_big_decimal(&v.to_string(), s);
200+
let jsonb_value = to_jsonb(&value, &schema).unwrap();
201+
let expected = jsonb::Value::Number(jsonb::Number::Decimal128(jsonb::Decimal128 {
202+
precision: digits as u8,
203+
scale: s as u8,
204+
value: v,
205+
}));
206+
assert_eq!(jsonb_value, expected);
207+
}
208+
}
209+
210+
#[test]
211+
fn test_decimal_256_ok() {
212+
let cases = vec![
213+
(
214+
39,
215+
10,
216+
i256::from_i128(i128::MAX).mul(i256::from_i128(10)),
217+
39,
218+
),
219+
(
220+
72,
221+
10,
222+
i256::from_i128(i128::MAX).mul(i256::from_i128(10)),
223+
39,
224+
),
225+
(72, 10, i256::MAX, 72),
226+
];
227+
for (p, s, v, digits) in cases {
228+
let (value, schema) = create_avro_decimal(&v.to_string(), p, s);
229+
let jsonb_value = to_jsonb(&value, &schema).unwrap();
230+
let expected = jsonb::Value::Number(jsonb::Number::Decimal256(jsonb::Decimal256 {
231+
precision: p as u8,
232+
scale: s as u8,
233+
value: v.0,
234+
}));
235+
assert_eq!(jsonb_value, expected);
236+
237+
let (value, schema) = create_avro_big_decimal(&v.to_string(), s);
238+
let jsonb_value = to_jsonb(&value, &schema).unwrap();
239+
let expected = jsonb::Value::Number(jsonb::Number::Decimal256(jsonb::Decimal256 {
240+
precision: digits as u8,
241+
scale: s as u8,
242+
value: v.0,
243+
}));
244+
assert_eq!(jsonb_value, expected);
245+
}
246+
}
247+
}

src/query/storages/stage/src/read/avro/block_builder_processor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,19 @@ impl BlockBuilderProcessor {
4545
}
4646

4747
impl AccumulatingTransform for BlockBuilderProcessor {
48-
const NAME: &'static str = "BlockBuilder";
48+
const NAME: &'static str = "AvroBlockBuilder";
4949

5050
fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
5151
let data = data
5252
.get_owned_meta()
5353
.and_then(WholeFileData::downcast_from)
5454
.unwrap();
5555
self.state.file_path = data.path.clone();
56+
let num_rows = self.state.num_rows;
5657
self.state.file_full_path = format!("{}{}", self.ctx.stage_root, data.path);
5758
self.decoder.add(&mut self.state, data)?;
59+
self.state
60+
.add_internals_columns_batch(self.state.num_rows - num_rows);
5861

5962
self.state.flush_status(&self.ctx.table_context)?;
6063
let blocks = self.state.try_flush_block_by_memory(&self.ctx)?;

0 commit comments

Comments
 (0)