Skip to content

Commit f8c2c88

Browse files
authored
feat(cubestore): Support custom CSV delimiters (#6597)
1 parent 0819075 commit f8c2c88

File tree

4 files changed

+109
-9
lines changed

4 files changed

+109
-9
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
103103
"create_table_with_csv_no_header",
104104
create_table_with_csv_no_header,
105105
),
106+
t(
107+
"create_table_with_csv_no_header_and_delimiter",
108+
create_table_with_csv_no_header_and_delimiter,
109+
),
106110
t("create_table_with_url", create_table_with_url),
107111
t("create_table_fail_and_retry", create_table_fail_and_retry),
108112
t("empty_crash", empty_crash),
@@ -2155,6 +2159,34 @@ async fn create_table_with_csv_no_header(service: Box<dyn SqlClient>) {
21552159
);
21562160
}
21572161

2162+
async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClient>) {
2163+
let file = write_tmp_file(indoc! {"
2164+
apple\u{0001}2
2165+
banana\u{0001}3
2166+
"})
2167+
.unwrap();
2168+
let path = file.path().to_string_lossy();
2169+
let _ = service
2170+
.exec_query("CREATE SCHEMA IF NOT EXISTS test")
2171+
.await
2172+
.unwrap();
2173+
let _ = service
2174+
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A') LOCATION '{}'", path).as_str())
2175+
.await
2176+
.unwrap();
2177+
let result = service
2178+
.exec_query("SELECT * FROM test.table")
2179+
.await
2180+
.unwrap();
2181+
assert_eq!(
2182+
to_rows(&result),
2183+
vec![
2184+
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
2185+
vec![TableValue::String("banana".to_string()), TableValue::Int(3)]
2186+
]
2187+
);
2188+
}
2189+
21582190
async fn create_table_with_url(service: Box<dyn SqlClient>) {
21592191
let url = "https://data.wprdc.org/dataset/0b584c84-7e35-4f4d-a5a2-b01697470c0f/resource/e95dd941-8e47-4460-9bd8-1e51c194370b/download/bikepghpublic.csv";
21602192

rust/cubestore/cubestore/src/import/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,40 @@ impl ImportFormat {
6767
) -> Result<Pin<Box<dyn Stream<Item = Result<Option<Row>, CubeError>> + Send + 'a>>, CubeError>
6868
{
6969
match self {
70-
ImportFormat::CSV | ImportFormat::CSVNoHeader => {
70+
ImportFormat::CSV | ImportFormat::CSVNoHeader | ImportFormat::CSVOptions { .. } => {
7171
let lines_stream: Pin<Box<dyn Stream<Item = Result<String, CubeError>> + Send>> =
7272
Box::pin(CsvLineStream::new(reader));
7373

7474
let mut header_mapping = match self {
75-
ImportFormat::CSV => None,
76-
ImportFormat::CSVNoHeader => Some(
75+
ImportFormat::CSVNoHeader
76+
| ImportFormat::CSVOptions {
77+
has_header: false, ..
78+
} => Some(
7779
columns
7880
.iter()
7981
.enumerate()
8082
.map(|(i, c)| (i, c.clone()))
8183
.collect(),
8284
),
85+
_ => None,
8386
};
8487

88+
let delimiter = match self {
89+
ImportFormat::CSV | ImportFormat::CSVNoHeader => ',',
90+
ImportFormat::CSVOptions { delimiter, .. } => delimiter.unwrap_or(','),
91+
};
92+
93+
if delimiter as u16 > 255 {
94+
return Err(CubeError::user(format!(
95+
"Non ASCII delimiters are unsupported: '{}'",
96+
delimiter
97+
)));
98+
}
99+
85100
let rows = lines_stream.map(move |line| -> Result<Option<Row>, CubeError> {
86101
let str = line?;
87102

88-
let mut parser = CsvLineParser::new(str.as_str());
103+
let mut parser = CsvLineParser::new(delimiter as u8, str.as_str());
89104

90105
if header_mapping.is_none() {
91106
let mut mapping = Vec::new();
@@ -257,13 +272,15 @@ fn parse_binary_data(value: &str) -> Result<Vec<u8>, CubeError> {
257272
}
258273

259274
struct CsvLineParser<'a> {
275+
delimiter: u8,
260276
line: &'a str,
261277
remaining: &'a str,
262278
}
263279

264280
impl<'a> CsvLineParser<'a> {
265-
fn new(line: &'a str) -> Self {
281+
fn new(delimiter: u8, line: &'a str) -> Self {
266282
Self {
283+
delimiter,
267284
line,
268285
remaining: line,
269286
}
@@ -308,7 +325,7 @@ impl<'a> CsvLineParser<'a> {
308325
.remaining
309326
.as_bytes()
310327
.iter()
311-
.position(|c| *c == b',')
328+
.position(|c| *c == self.delimiter)
312329
.unwrap_or(self.remaining.len());
313330
let res = &self.remaining[0..next_comma];
314331
self.remaining = self.remaining[next_comma..].as_ref();
@@ -318,8 +335,10 @@ impl<'a> CsvLineParser<'a> {
318335
}
319336

320337
fn advance(&mut self) -> Result<(), CubeError> {
321-
if let Some(b',') = self.remaining.as_bytes().iter().nth(0) {
322-
self.remaining = self.remaining[1..].as_ref()
338+
if let Some(c) = self.remaining.as_bytes().iter().nth(0) {
339+
if *c == self.delimiter {
340+
self.remaining = self.remaining[1..].as_ref()
341+
}
323342
}
324343
Ok(())
325344
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,12 @@ impl fmt::Display for Column {
552552
pub enum ImportFormat {
553553
CSV,
554554
CSVNoHeader,
555+
CSVOptions {
556+
delimiter: Option<char>,
557+
escape: Option<char>,
558+
quote: Option<char>,
559+
has_header: bool,
560+
},
555561
}
556562

557563
data_frame_from! {

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ impl SqlService for SqlServiceImpl {
969969
}
970970
let schema_name = &nv[0].value;
971971
let table_name = &nv[1].value;
972-
let import_format = with_options
972+
let mut import_format = with_options
973973
.iter()
974974
.find(|&opt| opt.name.value == "input_format")
975975
.map_or(Result::Ok(ImportFormat::CSV), |option| {
@@ -990,6 +990,49 @@ impl SqlService for SqlServiceImpl {
990990
))),
991991
}
992992
})?;
993+
994+
let delimiter = with_options
995+
.iter()
996+
.find(|&opt| opt.name.value == "delimiter")
997+
.map_or(Ok(None), |option| match &option.value {
998+
Value::SingleQuotedString(delimiter) => match delimiter.as_str() {
999+
"tab" => Ok(Some('\t')),
1000+
"^A" => Ok(Some('\u{0001}')),
1001+
s if s.len() != 1 => {
1002+
Err(CubeError::user(format!("Bad delimiter {}", option.value)))
1003+
}
1004+
s => Ok(Some(s.chars().next().unwrap())),
1005+
},
1006+
_ => Err(CubeError::user(format!("Bad delimiter {}", option.value))),
1007+
})?;
1008+
1009+
if let Some(delimiter) = delimiter {
1010+
import_format = match import_format {
1011+
ImportFormat::CSV => ImportFormat::CSVOptions {
1012+
delimiter: Some(delimiter),
1013+
has_header: true,
1014+
escape: None,
1015+
quote: None,
1016+
},
1017+
ImportFormat::CSVNoHeader => ImportFormat::CSVOptions {
1018+
delimiter: Some(delimiter),
1019+
has_header: false,
1020+
escape: None,
1021+
quote: None,
1022+
},
1023+
ImportFormat::CSVOptions {
1024+
has_header,
1025+
escape,
1026+
quote,
1027+
..
1028+
} => ImportFormat::CSVOptions {
1029+
delimiter: Some(delimiter),
1030+
has_header,
1031+
escape,
1032+
quote,
1033+
},
1034+
}
1035+
}
9931036
let build_range_end = with_options
9941037
.iter()
9951038
.find(|&opt| opt.name.value == "build_range_end")

0 commit comments

Comments
 (0)