Skip to content

Commit f103481

Browse files
authored
refactor(format): add FileFormatParams. (#10931)
* refactor(FlightSQL): add FileFormatParams. fix headers * refactor(meta): move some "impl FromToProto". * feat(meta): add message FileFormatParams * ci(format): test_pb_from_to(FileFormatParams).
1 parent a78ef6a commit f103481

File tree

9 files changed

+803
-130
lines changed

9 files changed

+803
-130
lines changed
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
use std::fmt::Debug;
17+
use std::str::FromStr;
18+
19+
use common_exception::ErrorCode;
20+
use common_exception::Result;
21+
use serde::Deserialize;
22+
use serde::Serialize;
23+
24+
use crate::principal::StageFileCompression;
25+
use crate::principal::StageFileFormatType;
26+
27+
const OPT_FILED_DELIMITER: &str = "field_delimiter";
28+
const OPT_RECORDE_DELIMITER: &str = "record_delimiter";
29+
const OPT_SKIP_HEADER: &str = "skip_header";
30+
const OPT_NAN_DISPLAY: &str = "nan_display";
31+
const OPT_ESCAPE: &str = "escape";
32+
const OPT_QUOTE: &str = "quote";
33+
const OPT_ROW_TAG: &str = "row_tag";
34+
35+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
36+
pub struct FileFormatOptionsAst {
37+
pub options: BTreeMap<String, String>,
38+
}
39+
40+
impl FileFormatOptionsAst {
41+
fn take_string(&mut self, key: &str, default: &str) -> String {
42+
self.options
43+
.remove(key)
44+
.unwrap_or_else(|| default.to_string())
45+
}
46+
47+
fn take_type(&mut self) -> Result<StageFileFormatType> {
48+
let typ = self.options.remove("type").ok_or_else(|| {
49+
ErrorCode::IllegalFileFormat("Missing type in file format options".to_string())
50+
})?;
51+
StageFileFormatType::from_str(&typ).map_err(ErrorCode::IllegalFileFormat)
52+
}
53+
54+
fn take_compression(&mut self) -> Result<StageFileCompression> {
55+
let compression = self.options.remove("compression").ok_or_else(|| {
56+
ErrorCode::IllegalFileFormat("Missing compression in file format options".to_string())
57+
})?;
58+
StageFileCompression::from_str(&compression).map_err(ErrorCode::IllegalFileFormat)
59+
}
60+
61+
fn take_u64(&mut self, key: &str, default: u64) -> Result<u64> {
62+
match self.options.remove(key) {
63+
Some(v) => Ok(u64::from_str(&v)?),
64+
None => Ok(default),
65+
}
66+
}
67+
}
68+
69+
/// File format parameters after checking and parsing.
70+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
71+
#[serde(tag = "type")]
72+
pub enum FileFormatParams {
73+
Csv(CsvFileFormatParams),
74+
Tsv(TsvFileFormatParams),
75+
NdJson(NdJsonFileFormatParams),
76+
Json(JsonFileFormatParams),
77+
Xml(XmlFileFormatParams),
78+
Parquet(ParquetFileFormatParams),
79+
}
80+
81+
impl Default for FileFormatParams {
82+
fn default() -> Self {
83+
FileFormatParams::Parquet(ParquetFileFormatParams {})
84+
}
85+
}
86+
87+
impl TryFrom<FileFormatOptionsAst> for FileFormatParams {
88+
type Error = ErrorCode;
89+
fn try_from(ast: FileFormatOptionsAst) -> Result<Self> {
90+
let mut ast = ast;
91+
let typ = ast.take_type()?;
92+
let params = match typ {
93+
StageFileFormatType::Xml => {
94+
let row_tag = ast.take_string(OPT_ROW_TAG, "row");
95+
let compression = ast.take_compression()?;
96+
FileFormatParams::Xml(XmlFileFormatParams {
97+
compression,
98+
row_tag,
99+
})
100+
}
101+
StageFileFormatType::Json => {
102+
let compression = ast.take_compression()?;
103+
FileFormatParams::Json(JsonFileFormatParams { compression })
104+
}
105+
StageFileFormatType::NdJson => {
106+
let compression = ast.take_compression()?;
107+
FileFormatParams::NdJson(NdJsonFileFormatParams { compression })
108+
}
109+
StageFileFormatType::Parquet => FileFormatParams::Parquet(ParquetFileFormatParams {}),
110+
StageFileFormatType::Csv => {
111+
let compression = ast.take_compression()?;
112+
let headers = ast.take_u64(OPT_SKIP_HEADER, 0)?;
113+
let field_delimiter = ast.take_string(OPT_FILED_DELIMITER, ",");
114+
let record_delimiter = ast.take_string(OPT_RECORDE_DELIMITER, "\n");
115+
let nan_display = ast.take_string(OPT_NAN_DISPLAY, "NaN");
116+
let escape = ast.take_string(OPT_ESCAPE, "");
117+
let quote = ast.take_string(OPT_QUOTE, "\"");
118+
FileFormatParams::Csv(CsvFileFormatParams {
119+
compression,
120+
headers,
121+
field_delimiter,
122+
record_delimiter,
123+
nan_display,
124+
escape,
125+
quote,
126+
})
127+
}
128+
StageFileFormatType::Tsv => {
129+
let compression = ast.take_compression()?;
130+
let headers = ast.take_u64(OPT_SKIP_HEADER, 0)?;
131+
let field_delimiter = ast.take_string(OPT_FILED_DELIMITER, "\t");
132+
let record_delimiter = ast.take_string(OPT_RECORDE_DELIMITER, "\n");
133+
let nan_display = ast.take_string(OPT_NAN_DISPLAY, "nan");
134+
let escape = ast.take_string(OPT_ESCAPE, "\\");
135+
let quote = ast.take_string(OPT_QUOTE, "\'");
136+
FileFormatParams::Tsv(TsvFileFormatParams {
137+
compression,
138+
headers,
139+
field_delimiter,
140+
record_delimiter,
141+
nan_display,
142+
quote,
143+
escape,
144+
})
145+
}
146+
_ => {
147+
return Err(ErrorCode::IllegalFileFormat(format!(
148+
"Unsupported file format {typ:?}"
149+
)));
150+
}
151+
};
152+
153+
if ast.options.is_empty() {
154+
Ok(params)
155+
} else {
156+
Err(ErrorCode::IllegalFileFormat(format!(
157+
"Unsupported options for {:?} {:?}",
158+
typ, ast.options
159+
)))
160+
}
161+
}
162+
}
163+
164+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
165+
pub struct CsvFileFormatParams {
166+
pub compression: StageFileCompression,
167+
pub headers: u64,
168+
pub field_delimiter: String,
169+
pub record_delimiter: String,
170+
pub nan_display: String,
171+
pub escape: String,
172+
pub quote: String,
173+
}
174+
175+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
176+
pub struct TsvFileFormatParams {
177+
pub compression: StageFileCompression,
178+
pub headers: u64,
179+
pub field_delimiter: String,
180+
pub record_delimiter: String,
181+
pub nan_display: String,
182+
pub escape: String,
183+
pub quote: String,
184+
}
185+
186+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
187+
pub struct XmlFileFormatParams {
188+
pub compression: StageFileCompression,
189+
pub row_tag: String,
190+
}
191+
192+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
193+
pub struct JsonFileFormatParams {
194+
pub compression: StageFileCompression,
195+
}
196+
197+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
198+
pub struct NdJsonFileFormatParams {
199+
pub compression: StageFileCompression,
200+
}
201+
202+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
203+
pub struct ParquetFileFormatParams {}

src/meta/app/src/principal/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
//! Principal is a user or role that accesses an entity.
1616
17+
mod file_format;
1718
mod principal_identity;
1819
mod role_info;
1920
mod user_auth;
@@ -27,6 +28,7 @@ mod user_quota;
2728
mod user_setting;
2829
mod user_stage;
2930

31+
pub use file_format::*;
3032
pub use principal_identity::PrincipalIdentity;
3133
pub use role_info::RoleInfo;
3234
pub use role_info::RoleInfoSerdeError;

0 commit comments

Comments
 (0)