Skip to content

Commit 5cefd96

Browse files
authored
fix(cubestore): Disable quoting in csv parser for Athena (#9997)
1 parent 51ec263 commit 5cefd96

File tree

6 files changed

+158
-47
lines changed

6 files changed

+158
-47
lines changed

packages/cubejs-athena-driver/src/AthenaDriver.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
394394
types,
395395
csvNoHeader: true,
396396
csvDelimiter: '^A',
397+
csvDisableQuoting: true,
397398
};
398399
}
399400

packages/cubejs-base-driver/src/driver.interface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export interface TableCSVData extends DownloadTableBase {
7171

7272
csvDelimiter?: string;
7373

74+
csvDisableQuoting?: boolean;
75+
7476
/**
7577
* The CSV file escape symbol.
7678
*/

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type CreateTableOptions = {
4949
sourceTable?: any
5050
sealAt?: string
5151
delimiter?: string
52+
disableQuoting?: boolean
5253
};
5354

5455
export class CubeStoreDriver extends BaseDriver implements DriverInterface {
@@ -110,6 +111,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
110111
if (options.delimiter) {
111112
withEntries.push(`delimiter = '${options.delimiter}'`);
112113
}
114+
if (options.disableQuoting) {
115+
withEntries.push(`disable_quoting = true`);
116+
}
113117
if (options.buildRangeEnd) {
114118
withEntries.push(`build_range_end = '${options.buildRangeEnd}'`);
115119
}
@@ -295,6 +299,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
295299
if (tableData.csvDelimiter) {
296300
options.delimiter = tableData.csvDelimiter;
297301
}
302+
if (tableData.csvDisableQuoting) {
303+
options.disableQuoting = tableData.csvDisableQuoting;
304+
}
298305
options.files = files;
299306
}
300307

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
110110
"create_table_with_csv_no_header_and_delimiter",
111111
create_table_with_csv_no_header_and_delimiter,
112112
),
113+
t(
114+
"create_table_with_csv_no_header_and_quotes",
115+
create_table_with_csv_no_header_and_quotes,
116+
),
113117
t("create_table_with_url", create_table_with_url),
114118
t("create_table_fail_and_retry", create_table_fail_and_retry),
115119
t("empty_crash", empty_crash),
@@ -2230,8 +2234,12 @@ async fn create_table_with_csv_no_header(service: Box<dyn SqlClient>) {
22302234

22312235
async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClient>) {
22322236
let file = write_tmp_file(indoc! {"
2237+
\"apple\u{0001}31
2238+
a\"pple\u{0001}32
2239+
a\"pp\"le\u{0001}12
22332240
apple\u{0001}2
22342241
banana\u{0001}3
2242+
\"orange\" orange\u{0001}4
22352243
"})
22362244
.unwrap();
22372245
let path = file.path().to_string_lossy();
@@ -2240,7 +2248,7 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
22402248
.await
22412249
.unwrap();
22422250
let _ = service
2243-
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A') LOCATION '{}'", path).as_str())
2251+
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A', disable_quoting = true) LOCATION '{}'", path).as_str())
22442252
.await
22452253
.unwrap();
22462254
let result = service
@@ -2250,8 +2258,72 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
22502258
assert_eq!(
22512259
to_rows(&result),
22522260
vec![
2261+
vec![
2262+
TableValue::String("\"apple".to_string()),
2263+
TableValue::Int(31)
2264+
],
2265+
vec![
2266+
TableValue::String("\"orange\" orange".to_string()),
2267+
TableValue::Int(4)
2268+
],
2269+
vec![
2270+
TableValue::String("a\"pp\"le".to_string()),
2271+
TableValue::Int(12)
2272+
],
2273+
vec![
2274+
TableValue::String("a\"pple".to_string()),
2275+
TableValue::Int(32)
2276+
],
22532277
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
2254-
vec![TableValue::String("banana".to_string()), TableValue::Int(3)]
2278+
vec![TableValue::String("banana".to_string()), TableValue::Int(3)],
2279+
]
2280+
);
2281+
}
2282+
2283+
async fn create_table_with_csv_no_header_and_quotes(service: Box<dyn SqlClient>) {
2284+
let file = write_tmp_file(indoc! {"
2285+
\"\"\"apple\",31
2286+
\"a\"\"pple\",32
2287+
\"a\"\"pp\"\"le\",12
2288+
apple,2
2289+
banana,3
2290+
\"\"\"orange\"\" orange\",4
2291+
"})
2292+
.unwrap();
2293+
let path = file.path().to_string_lossy();
2294+
let _ = service
2295+
.exec_query("CREATE SCHEMA IF NOT EXISTS test")
2296+
.await
2297+
.unwrap();
2298+
let _ = service
2299+
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = ',', disable_quoting = false) LOCATION '{}'", path).as_str())
2300+
.await
2301+
.unwrap();
2302+
let result = service
2303+
.exec_query("SELECT * FROM test.table")
2304+
.await
2305+
.unwrap();
2306+
assert_eq!(
2307+
to_rows(&result),
2308+
vec![
2309+
vec![
2310+
TableValue::String("\"apple".to_string()),
2311+
TableValue::Int(31)
2312+
],
2313+
vec![
2314+
TableValue::String("\"orange\" orange".to_string()),
2315+
TableValue::Int(4)
2316+
],
2317+
vec![
2318+
TableValue::String("a\"pp\"le".to_string()),
2319+
TableValue::Int(12)
2320+
],
2321+
vec![
2322+
TableValue::String("a\"pple".to_string()),
2323+
TableValue::Int(32)
2324+
],
2325+
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
2326+
vec![TableValue::String("banana".to_string()), TableValue::Int(3)],
22552327
]
22562328
);
22572329
}

rust/cubestore/cubestore/src/import/mod.rs

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,15 @@ impl ImportFormat {
7272
{
7373
match self {
7474
ImportFormat::CSV | ImportFormat::CSVNoHeader | ImportFormat::CSVOptions { .. } => {
75+
let (delimiter, disable_quoting) = match self {
76+
ImportFormat::CSV | ImportFormat::CSVNoHeader => (',', false),
77+
ImportFormat::CSVOptions {
78+
delimiter, quote, ..
79+
} => (delimiter.unwrap_or(','), quote.is_none()),
80+
};
81+
7582
let lines_stream: Pin<Box<dyn Stream<Item = Result<String, CubeError>> + Send>> =
76-
Box::pin(CsvLineStream::new(reader));
83+
Box::pin(CsvLineStream::new(reader, disable_quoting));
7784

7885
let mut header_mapping = match self {
7986
ImportFormat::CSVNoHeader
@@ -89,11 +96,6 @@ impl ImportFormat {
8996
_ => None,
9097
};
9198

92-
let delimiter = match self {
93-
ImportFormat::CSV | ImportFormat::CSVNoHeader => ',',
94-
ImportFormat::CSVOptions { delimiter, .. } => delimiter.unwrap_or(','),
95-
};
96-
9799
if delimiter as u16 > 255 {
98100
return Err(CubeError::user(format!(
99101
"Non ASCII delimiters are unsupported: '{}'",
@@ -104,7 +106,8 @@ impl ImportFormat {
104106
let rows = lines_stream.map(move |line| -> Result<Option<Row>, CubeError> {
105107
let str = line?;
106108

107-
let mut parser = CsvLineParser::new(delimiter as u8, str.as_str());
109+
let mut parser =
110+
CsvLineParser::new(delimiter as u8, disable_quoting, str.as_str());
108111

109112
if header_mapping.is_none() {
110113
let mut mapping = Vec::new();
@@ -310,21 +313,23 @@ fn parse_binary_data(value: &str) -> Result<Vec<u8>, CubeError> {
310313

311314
struct CsvLineParser<'a> {
312315
delimiter: u8,
316+
disable_quoting: bool,
313317
line: &'a str,
314318
remaining: &'a str,
315319
}
316320

317321
impl<'a> CsvLineParser<'a> {
318-
fn new(delimiter: u8, line: &'a str) -> Self {
322+
fn new(delimiter: u8, disable_quoting: bool, line: &'a str) -> Self {
319323
Self {
320324
delimiter,
325+
disable_quoting,
321326
line,
322327
remaining: line,
323328
}
324329
}
325330

326331
fn next_value(&mut self) -> Result<MaybeOwnedStr<'_>, CubeError> {
327-
Ok(
332+
if !self.disable_quoting {
328333
if let Some(b'"') = self.remaining.as_bytes().iter().nth(0) {
329334
let mut closing_index = None;
330335
let mut seen_escapes = false;
@@ -356,19 +361,18 @@ impl<'a> CsvLineParser<'a> {
356361
res = MaybeOwnedStr::Borrowed(&self.remaining[0..closing_index])
357362
}
358363
self.remaining = self.remaining[(closing_index + 1)..].as_ref();
359-
res
360-
} else {
361-
let next_comma = self
362-
.remaining
363-
.as_bytes()
364-
.iter()
365-
.position(|c| *c == self.delimiter)
366-
.unwrap_or(self.remaining.len());
367-
let res = &self.remaining[0..next_comma];
368-
self.remaining = self.remaining[next_comma..].as_ref();
369-
MaybeOwnedStr::Borrowed(res)
370-
},
371-
)
364+
return Ok(res);
365+
}
366+
}
367+
let next_comma = self
368+
.remaining
369+
.as_bytes()
370+
.iter()
371+
.position(|c| *c == self.delimiter)
372+
.unwrap_or(self.remaining.len());
373+
let res = &self.remaining[0..next_comma];
374+
self.remaining = self.remaining[next_comma..].as_ref();
375+
Ok(MaybeOwnedStr::Borrowed(res))
372376
}
373377

374378
fn advance(&mut self) -> Result<(), CubeError> {
@@ -385,15 +389,17 @@ pin_project! {
385389
struct CsvLineStream<R: AsyncBufRead> {
386390
#[pin]
387391
reader: R,
392+
disable_quoting: bool,
388393
buf: Vec<u8>,
389394
in_quotes: bool,
390395
}
391396
}
392397

393398
impl<R: AsyncBufRead> CsvLineStream<R> {
394-
pub fn new(reader: R) -> Self {
399+
pub fn new(reader: R, disable_quoting: bool) -> Self {
395400
Self {
396401
reader,
402+
disable_quoting,
397403
buf: Vec::new(),
398404
in_quotes: false,
399405
}
@@ -417,38 +423,49 @@ impl<R: AsyncBufRead> Stream for CsvLineStream<R> {
417423
return Poll::Ready(Some(Err(CubeError::from_error(err))));
418424
}
419425
Ok(available) => {
420-
if *projected.in_quotes {
421-
let quote_pos = memchr::memchr(b'"', available);
422-
if let Some(i) = quote_pos {
423-
// It consumes every pair of quotes.
424-
// Matching for escapes is unnecessary as it's double "" sequence
425-
*projected.in_quotes = false;
426+
if *projected.disable_quoting {
427+
let new_line_pos = memchr::memchr(b'\n', available);
428+
if let Some(i) = new_line_pos {
426429
projected.buf.extend_from_slice(&available[..=i]);
427-
(false, i + 1)
430+
(true, i + 1)
428431
} else {
429432
projected.buf.extend_from_slice(available);
430433
(false, available.len())
431434
}
432435
} else {
433-
let new_line_pos = memchr::memchr(b'\n', available);
434-
let quote_pos = memchr::memchr(b'"', available);
435-
let in_quotes = quote_pos.is_some()
436-
&& (new_line_pos.is_some() && quote_pos < new_line_pos
437-
|| new_line_pos.is_none());
438-
if in_quotes {
436+
if *projected.in_quotes {
437+
let quote_pos = memchr::memchr(b'"', available);
439438
if let Some(i) = quote_pos {
439+
// It consumes every pair of quotes.
440+
// Matching for escapes is unnecessary as it's double "" sequence
441+
*projected.in_quotes = false;
440442
projected.buf.extend_from_slice(&available[..=i]);
441-
*projected.in_quotes = in_quotes;
442443
(false, i + 1)
443444
} else {
444-
unreachable!()
445+
projected.buf.extend_from_slice(available);
446+
(false, available.len())
445447
}
446-
} else if let Some(i) = new_line_pos {
447-
projected.buf.extend_from_slice(&available[..=i]);
448-
(true, i + 1)
449448
} else {
450-
projected.buf.extend_from_slice(available);
451-
(false, available.len())
449+
let new_line_pos = memchr::memchr(b'\n', available);
450+
let quote_pos = memchr::memchr(b'"', available);
451+
let in_quotes = quote_pos.is_some()
452+
&& (new_line_pos.is_some() && quote_pos < new_line_pos
453+
|| new_line_pos.is_none());
454+
if in_quotes {
455+
if let Some(i) = quote_pos {
456+
projected.buf.extend_from_slice(&available[..=i]);
457+
*projected.in_quotes = in_quotes;
458+
(false, i + 1)
459+
} else {
460+
unreachable!()
461+
}
462+
} else if let Some(i) = new_line_pos {
463+
projected.buf.extend_from_slice(&available[..=i]);
464+
(true, i + 1)
465+
} else {
466+
projected.buf.extend_from_slice(available);
467+
(false, available.len())
468+
}
452469
}
453470
}
454471
}

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,19 +738,31 @@ impl SqlService for SqlServiceImpl {
738738
_ => Err(CubeError::user(format!("Bad delimiter {}", option.value))),
739739
})?;
740740

741+
let disable_quoting = with_options
742+
.iter()
743+
.find(|&opt| opt.name.value == "disable_quoting")
744+
.map_or(Ok(false), |option| match &option.value {
745+
Value::Boolean(value) => Ok(*value),
746+
_ => Err(CubeError::user(format!(
747+
"Bad disable_quoting flag (expected boolean) {}",
748+
option.value
749+
))),
750+
})?;
751+
741752
if let Some(delimiter) = delimiter {
753+
let quote = if disable_quoting { None } else { Some('"') };
742754
import_format = match import_format {
743755
ImportFormat::CSV => ImportFormat::CSVOptions {
744756
delimiter: Some(delimiter),
745757
has_header: true,
746758
escape: None,
747-
quote: None,
759+
quote,
748760
},
749761
ImportFormat::CSVNoHeader => ImportFormat::CSVOptions {
750762
delimiter: Some(delimiter),
751763
has_header: false,
752764
escape: None,
753-
quote: None,
765+
quote,
754766
},
755767
ImportFormat::CSVOptions {
756768
has_header,

0 commit comments

Comments
 (0)