Skip to content

Commit 9eb97c3

Browse files
authored
feat: support load Avro Files. (#17548)
1 parent e577b7e commit 9eb97c3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1559
-151
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ ahash = "0.8"
220220
aho-corasick = { version = "1.0.1" } #
221221
anyerror = { version = "=0.1.13" }
222222
anyhow = { version = "1.0.65" }
223+
apache-avro = { version = "0.17.0", features = ["snappy", "zstandard", "xz", "snappy", "bzip"] }
223224
approx = "0.5.1"
224225
arrow = { version = "53" }
225226
arrow-array = { version = "53" }

src/meta/app/src/principal/file_format.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub enum FileFormatParams {
6161
Xml(XmlFileFormatParams),
6262
Parquet(ParquetFileFormatParams),
6363
Orc(OrcFileFormatParams),
64+
Avro(AvroFileFormatParams),
6465
}
6566

6667
impl FileFormatParams {
@@ -73,6 +74,7 @@ impl FileFormatParams {
7374
FileFormatParams::Xml(_) => StageFileFormatType::Xml,
7475
FileFormatParams::Parquet(_) => StageFileFormatType::Parquet,
7576
FileFormatParams::Orc(_) => StageFileFormatType::Orc,
77+
FileFormatParams::Avro(_) => StageFileFormatType::Avro,
7678
}
7779
}
7880

@@ -90,6 +92,9 @@ impl FileFormatParams {
9092
Ok(FileFormatParams::Json(JsonFileFormatParams::default()))
9193
}
9294
StageFileFormatType::Orc => Ok(FileFormatParams::Orc(OrcFileFormatParams::default())),
95+
StageFileFormatType::Avro => {
96+
Ok(FileFormatParams::Avro(AvroFileFormatParams::default()))
97+
}
9398
_ => Err(ErrorCode::IllegalFileFormat(format!(
9499
"Unsupported file format type: {:?}",
95100
format_type
@@ -106,6 +111,7 @@ impl FileFormatParams {
106111
FileFormatParams::Xml(v) => v.compression,
107112
FileFormatParams::Parquet(_) => StageFileCompression::None,
108113
FileFormatParams::Orc(_) => StageFileCompression::None,
114+
FileFormatParams::Avro(_) => StageFileCompression::None,
109115
}
110116
}
111117

@@ -169,6 +175,16 @@ impl FileFormatParams {
169175
null_if,
170176
)?)
171177
}
178+
StageFileFormatType::Avro => {
179+
let compression = reader.take_compression()?;
180+
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
181+
let null_if = parse_null_if(reader.options.remove(NULL_IF))?;
182+
FileFormatParams::Avro(AvroFileFormatParams::try_create(
183+
compression,
184+
missing_field_as.as_deref(),
185+
null_if,
186+
)?)
187+
}
172188
StageFileFormatType::Parquet => {
173189
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
174190
let null_if = parse_null_if(reader.options.remove(NULL_IF))?;
@@ -690,6 +706,52 @@ impl NdJsonFileFormatParams {
690706
}
691707
}
692708

709+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
710+
pub struct AvroFileFormatParams {
711+
pub compression: StageFileCompression,
712+
pub missing_field_as: NullAs,
713+
pub null_if: Vec<String>,
714+
}
715+
716+
impl AvroFileFormatParams {
717+
pub fn try_create(
718+
compression: StageFileCompression,
719+
missing_field_as: Option<&str>,
720+
null_if: Vec<String>,
721+
) -> Result<Self> {
722+
let missing_field_as = NullAs::parse(missing_field_as, MISSING_FIELD_AS, NullAs::Error)?;
723+
if matches!(missing_field_as, NullAs::Null) {
724+
return Err(ErrorCode::InvalidArgument(
725+
"Invalid option value for Avro: NULL_FIELD_AS is set to NULL. The valid values are ERROR | FIELD_DEFAULT.",
726+
));
727+
}
728+
Ok(Self {
729+
compression,
730+
missing_field_as,
731+
null_if,
732+
})
733+
}
734+
}
735+
736+
impl Default for crate::principal::AvroFileFormatParams {
737+
fn default() -> Self {
738+
crate::principal::AvroFileFormatParams {
739+
compression: StageFileCompression::None,
740+
missing_field_as: NullAs::Error,
741+
null_if: vec![],
742+
}
743+
}
744+
}
745+
746+
impl AvroFileFormatParams {
747+
pub fn downcast_unchecked(params: &FileFormatParams) -> &AvroFileFormatParams {
748+
match params {
749+
FileFormatParams::Avro(p) => p,
750+
_ => unreachable!(),
751+
}
752+
}
753+
}
754+
693755
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
694756
pub struct ParquetFileFormatParams {
695757
pub missing_field_as: NullAs,
@@ -776,6 +838,9 @@ impl Display for FileFormatParams {
776838
params.compression, params.missing_field_as, params.null_field_as
777839
)
778840
}
841+
FileFormatParams::Avro(params) => {
842+
write!(f, "TYPE = AVRO, NULL_FIELDS_AS = {}", params.compression,)
843+
}
779844
FileFormatParams::Parquet(params) => {
780845
write!(
781846
f,

src/meta/app/src/principal/user_stage.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,9 @@ impl FromStr for StageFileFormatType {
192192
"XML" => Ok(StageFileFormatType::Xml),
193193
"JSON" => Ok(StageFileFormatType::Json),
194194
"ORC" => Ok(StageFileFormatType::Orc),
195-
"AVRO" => Err(format!(
196-
"File format type '{s}' not implemented yet', must be one of ( CSV | TSV | NDJSON | PARQUET | ORC)"
197-
)),
195+
"AVRO" => Ok(StageFileFormatType::Avro),
198196
_ => Err(format!(
199-
"Unknown file format type '{s}', must be one of ( CSV | TSV | NDJSON | PARQUET | ORC)"
197+
"Unknown file format type '{s}', must be one of ( CSV | TSV | NDJSON | PARQUET | ORC | AVRO | JSON )"
200198
)),
201199
}
202200
}

src/meta/proto-conv/src/file_format_from_to_protobuf_impl.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ impl FromToProto for mt::principal::FileFormatParams {
247247
mt::principal::XmlFileFormatParams::from_pb(p)?,
248248
))
249249
}
250+
Some(pb::file_format_params::Format::Avro(p)) => {
251+
Ok(mt::principal::FileFormatParams::Avro(
252+
mt::principal::AvroFileFormatParams::from_pb(p)?,
253+
))
254+
}
250255
None => Err(Incompatible::new(
251256
"FileFormatParams.format cannot be None".to_string(),
252257
)),
@@ -275,6 +280,11 @@ impl FromToProto for mt::principal::FileFormatParams {
275280
mt::principal::JsonFileFormatParams::to_pb(p)?,
276281
)),
277282
}),
283+
Self::Avro(p) => Ok(Self::PB {
284+
format: Some(pb::file_format_params::Format::Avro(
285+
mt::principal::AvroFileFormatParams::to_pb(p)?,
286+
)),
287+
}),
278288
Self::Tsv(p) => Ok(Self::PB {
279289
format: Some(pb::file_format_params::Format::Tsv(
280290
mt::principal::TsvFileFormatParams::to_pb(p)?,
@@ -557,3 +567,39 @@ impl FromToProto for mt::principal::TsvFileFormatParams {
557567
})
558568
}
559569
}
570+
571+
impl FromToProto for mt::principal::AvroFileFormatParams {
572+
type PB = pb::AvroFileFormatParams;
573+
fn get_pb_ver(p: &Self::PB) -> u64 {
574+
p.ver
575+
}
576+
577+
fn from_pb(p: pb::AvroFileFormatParams) -> Result<Self, Incompatible>
578+
where Self: Sized {
579+
reader_check_msg(p.ver, p.min_reader_ver)?;
580+
let compression = mt::principal::StageFileCompression::from_pb_enum(
581+
FromPrimitive::from_i32(p.compression).ok_or_else(|| {
582+
Incompatible::new(format!("invalid StageFileCompression: {}", p.compression))
583+
})?,
584+
)?;
585+
586+
mt::principal::AvroFileFormatParams::try_create(
587+
compression,
588+
p.missing_field_as.as_deref(),
589+
p.null_if,
590+
)
591+
.map_err(|e| Incompatible::new(format!("{e}")))
592+
}
593+
594+
fn to_pb(&self) -> Result<pb::AvroFileFormatParams, Incompatible> {
595+
let compression =
596+
mt::principal::StageFileCompression::to_pb_enum(&self.compression)? as i32;
597+
Ok(pb::AvroFileFormatParams {
598+
ver: VER,
599+
min_reader_ver: MIN_READER_VER,
600+
compression,
601+
missing_field_as: Some(self.missing_field_as.to_string()),
602+
null_if: self.null_if.clone(),
603+
})
604+
}
605+
}

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
150150
(118, "2025-01-22: Add: config.proto: add user_name in WebhdfsConfig"),
151151
(119, "2025-01-25: Add: virtual_column add alias_names and auto_generated field"),
152152
(120, "2025-02-11: Add: Add new UserPrivilege CreateWarehouse and new OwnershipObject::Warehouse"),
153+
(121, "2025-03-03: Add: Add new FileFormat AvroFileFormatParams"),
153154
// Dear developer:
154155
// If you're gonna add a new metadata version, you'll have to add a test for it.
155156
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,4 @@ mod v117_webhdfs_add_disable_list_batch;
118118
mod v118_webhdfs_add_user_name;
119119
mod v119_virtual_column;
120120
mod v120_warehouse_ownershipobject;
121+
mod v121_avro_format_params;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::principal::AvroFileFormatParams;
16+
use databend_common_meta_app::principal::FileFormatParams;
17+
use databend_common_meta_app::principal::NullAs;
18+
use databend_common_meta_app::principal::StageFileCompression;
19+
use fastrace::func_name;
20+
21+
use crate::common;
22+
23+
// These bytes are built when a new version in introduced,
24+
25+
// and are kept for backward compatibility test.
26+
//
27+
// *************************************************************
28+
// * These messages should never be updated, *
29+
// * only be added when a new version is added, *
30+
// * or be removed when an old version is no longer supported. *
31+
// *************************************************************
32+
//
33+
#[test]
34+
fn test_decode_v121_avro_file_format_params() -> anyhow::Result<()> {
35+
let avro_file_format_params_v121 = vec![
36+
8, 4, 18, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 26, 4, 110, 117, 108,
37+
108, 26, 4, 78, 85, 76, 76, 160, 6, 121, 168, 6, 24,
38+
];
39+
40+
let want = || AvroFileFormatParams {
41+
compression: StageFileCompression::Zstd,
42+
missing_field_as: NullAs::FieldDefault,
43+
null_if: vec!["null".to_string(), "NULL".to_string()],
44+
};
45+
common::test_load_old(
46+
func_name!(),
47+
avro_file_format_params_v121.as_slice(),
48+
121,
49+
want(),
50+
)?;
51+
common::test_pb_from_to(func_name!(), want())?;
52+
Ok(())
53+
}
54+
55+
#[test]
56+
fn test_decode_v121_file_format_params() -> anyhow::Result<()> {
57+
let file_format_params_v121 = vec![
58+
66, 35, 8, 4, 18, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 26, 4, 110, 117,
59+
108, 108, 26, 4, 78, 85, 76, 76, 160, 6, 121, 168, 6, 24,
60+
];
61+
let want = || {
62+
FileFormatParams::Avro(AvroFileFormatParams {
63+
compression: StageFileCompression::Zstd,
64+
missing_field_as: NullAs::FieldDefault,
65+
null_if: vec!["null".to_string(), "NULL".to_string()],
66+
})
67+
};
68+
common::test_load_old(func_name!(), file_format_params_v121.as_slice(), 0, want())?;
69+
common::test_pb_from_to(func_name!(), want())?;
70+
Ok(())
71+
}

src/meta/protos/proto/file_format.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ message FileFormatParams {
8585
NdJsonFileFormatParams nd_json = 5;
8686
XmlFileFormatParams xml = 6;
8787
OrcFileFormatParams orc = 7;
88+
AvroFileFormatParams avro = 8;
8889
}
8990
}
9091

@@ -155,4 +156,12 @@ message OrcFileFormatParams {
155156
uint64 ver = 100;
156157
uint64 min_reader_ver = 101;
157158
optional string missing_field_as = 1;
159+
}
160+
161+
message AvroFileFormatParams {
162+
uint64 ver = 100;
163+
uint64 min_reader_ver = 101;
164+
StageFileCompression compression = 1;
165+
optional string missing_field_as = 2;
166+
repeated string null_if = 3;
158167
}

src/query/ast/src/parser/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ pub fn copy_into(i: Input) -> IResult<Statement> {
120120
#copy_into_location:"`COPY
121121
INTO { internalStage | externalStage | externalLocation }
122122
FROM { [<database_name>.]<table_name> | ( <query> ) }
123-
[ FILE_FORMAT = ( { TYPE = { CSV | JSON | PARQUET | TSV } [ formatTypeOptions ] } ) ]
123+
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TSV | AVRO } [ formatTypeOptions ] } ) ]
124124
[ copyOptions ]`"
125125
| #copy_into_table: "`COPY
126126
INTO { [<database_name>.]<table_name> { ( <columns> ) } }
127127
FROM { internalStage | externalStage | externalLocation | ( <query> ) }
128-
[ FILE_FORMAT = ( { TYPE = { CSV | JSON | PARQUET | TSV } [ formatTypeOptions ] } ) ]
128+
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TSV | AVRO } [ formatTypeOptions ] } ) ]
129129
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
130130
[ PATTERN = '<regex_pattern>' ]
131131
[ VALIDATION_MODE = RETURN_ROWS ]

0 commit comments

Comments
 (0)