Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-athena-driver/src/AthenaDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
types,
csvNoHeader: true,
csvDelimiter: '^A',
csvDisableQuoting: true,
};
}

Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/src/driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export interface TableCSVData extends DownloadTableBase {

csvDelimiter?: string;

csvDisableQuoting?: boolean;

/**
* The CSV file escape symbol.
*/
Expand Down
7 changes: 7 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type CreateTableOptions = {
sourceTable?: any
sealAt?: string
delimiter?: string
disableQuoting?: boolean
};

export class CubeStoreDriver extends BaseDriver implements DriverInterface {
Expand Down Expand Up @@ -110,6 +111,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
if (options.delimiter) {
withEntries.push(`delimiter = '${options.delimiter}'`);
}
if (options.disableQuoting) {
withEntries.push(`disable_quoting = true`);
}
if (options.buildRangeEnd) {
withEntries.push(`build_range_end = '${options.buildRangeEnd}'`);
}
Expand Down Expand Up @@ -295,6 +299,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
if (tableData.csvDelimiter) {
options.delimiter = tableData.csvDelimiter;
}
if (tableData.csvDisableQuoting) {
options.disableQuoting = tableData.csvDisableQuoting;
}
options.files = files;
}

Expand Down
76 changes: 74 additions & 2 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
"create_table_with_csv_no_header_and_delimiter",
create_table_with_csv_no_header_and_delimiter,
),
t(
"create_table_with_csv_no_header_and_quotes",
create_table_with_csv_no_header_and_quotes,
),
t("create_table_with_url", create_table_with_url),
t("create_table_fail_and_retry", create_table_fail_and_retry),
t("empty_crash", empty_crash),
Expand Down Expand Up @@ -2230,8 +2234,12 @@ async fn create_table_with_csv_no_header(service: Box<dyn SqlClient>) {

async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClient>) {
let file = write_tmp_file(indoc! {"
\"apple\u{0001}31
a\"pple\u{0001}32
a\"pp\"le\u{0001}12
apple\u{0001}2
banana\u{0001}3
\"orange\" orange\u{0001}4
"})
.unwrap();
let path = file.path().to_string_lossy();
Expand All @@ -2240,7 +2248,7 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
.await
.unwrap();
let _ = service
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A') LOCATION '{}'", path).as_str())
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A', disable_quoting = true) LOCATION '{}'", path).as_str())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's test without disabling quoting?

.await
.unwrap();
let result = service
Expand All @@ -2250,8 +2258,72 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
assert_eq!(
to_rows(&result),
vec![
vec![
TableValue::String("\"apple".to_string()),
TableValue::Int(31)
],
vec![
TableValue::String("\"orange\" orange".to_string()),
TableValue::Int(4)
],
vec![
TableValue::String("a\"pp\"le".to_string()),
TableValue::Int(12)
],
vec![
TableValue::String("a\"pple".to_string()),
TableValue::Int(32)
],
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
vec![TableValue::String("banana".to_string()), TableValue::Int(3)]
vec![TableValue::String("banana".to_string()), TableValue::Int(3)],
]
);
}

async fn create_table_with_csv_no_header_and_quotes(service: Box<dyn SqlClient>) {
let file = write_tmp_file(indoc! {"
\"\"\"apple\",31
\"a\"\"pple\",32
\"a\"\"pp\"\"le\",12
apple,2
banana,3
\"\"\"orange\"\" orange\",4
"})
.unwrap();
let path = file.path().to_string_lossy();
let _ = service
.exec_query("CREATE SCHEMA IF NOT EXISTS test")
.await
.unwrap();
let _ = service
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = ',', disable_quoting = false) LOCATION '{}'", path).as_str())
.await
.unwrap();
let result = service
.exec_query("SELECT * FROM test.table")
.await
.unwrap();
assert_eq!(
to_rows(&result),
vec![
vec![
TableValue::String("\"apple".to_string()),
TableValue::Int(31)
],
vec![
TableValue::String("\"orange\" orange".to_string()),
TableValue::Int(4)
],
vec![
TableValue::String("a\"pp\"le".to_string()),
TableValue::Int(12)
],
vec![
TableValue::String("a\"pple".to_string()),
TableValue::Int(32)
],
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
vec![TableValue::String("banana".to_string()), TableValue::Int(3)],
]
);
}
Expand Down
103 changes: 60 additions & 43 deletions rust/cubestore/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,15 @@ impl ImportFormat {
{
match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader | ImportFormat::CSVOptions { .. } => {
let (delimiter, disable_quoting) = match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader => (',', false),
ImportFormat::CSVOptions {
delimiter, quote, ..
} => (delimiter.unwrap_or(','), quote.is_none()),
};

let lines_stream: Pin<Box<dyn Stream<Item = Result<String, CubeError>> + Send>> =
Box::pin(CsvLineStream::new(reader));
Box::pin(CsvLineStream::new(reader, disable_quoting));

let mut header_mapping = match self {
ImportFormat::CSVNoHeader
Expand All @@ -89,11 +96,6 @@ impl ImportFormat {
_ => None,
};

let delimiter = match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader => ',',
ImportFormat::CSVOptions { delimiter, .. } => delimiter.unwrap_or(','),
};

if delimiter as u16 > 255 {
return Err(CubeError::user(format!(
"Non ASCII delimiters are unsupported: '{}'",
Expand All @@ -104,7 +106,8 @@ impl ImportFormat {
let rows = lines_stream.map(move |line| -> Result<Option<Row>, CubeError> {
let str = line?;

let mut parser = CsvLineParser::new(delimiter as u8, str.as_str());
let mut parser =
CsvLineParser::new(delimiter as u8, disable_quoting, str.as_str());

if header_mapping.is_none() {
let mut mapping = Vec::new();
Expand Down Expand Up @@ -310,21 +313,23 @@ fn parse_binary_data(value: &str) -> Result<Vec<u8>, CubeError> {

struct CsvLineParser<'a> {
delimiter: u8,
disable_quoting: bool,
line: &'a str,
remaining: &'a str,
}

impl<'a> CsvLineParser<'a> {
fn new(delimiter: u8, line: &'a str) -> Self {
fn new(delimiter: u8, disable_quoting: bool, line: &'a str) -> Self {
Self {
delimiter,
disable_quoting,
line,
remaining: line,
}
}

fn next_value(&mut self) -> Result<MaybeOwnedStr<'_>, CubeError> {
Ok(
if !self.disable_quoting {
if let Some(b'"') = self.remaining.as_bytes().iter().nth(0) {
let mut closing_index = None;
let mut seen_escapes = false;
Expand Down Expand Up @@ -356,19 +361,18 @@ impl<'a> CsvLineParser<'a> {
res = MaybeOwnedStr::Borrowed(&self.remaining[0..closing_index])
}
self.remaining = self.remaining[(closing_index + 1)..].as_ref();
res
} else {
let next_comma = self
.remaining
.as_bytes()
.iter()
.position(|c| *c == self.delimiter)
.unwrap_or(self.remaining.len());
let res = &self.remaining[0..next_comma];
self.remaining = self.remaining[next_comma..].as_ref();
MaybeOwnedStr::Borrowed(res)
},
)
return Ok(res);
}
}
let next_comma = self
.remaining
.as_bytes()
.iter()
.position(|c| *c == self.delimiter)
.unwrap_or(self.remaining.len());
let res = &self.remaining[0..next_comma];
self.remaining = self.remaining[next_comma..].as_ref();
Ok(MaybeOwnedStr::Borrowed(res))
}

fn advance(&mut self) -> Result<(), CubeError> {
Expand All @@ -385,15 +389,17 @@ pin_project! {
struct CsvLineStream<R: AsyncBufRead> {
#[pin]
reader: R,
disable_quoting: bool,
buf: Vec<u8>,
in_quotes: bool,
}
}

impl<R: AsyncBufRead> CsvLineStream<R> {
pub fn new(reader: R) -> Self {
pub fn new(reader: R, disable_quoting: bool) -> Self {
Self {
reader,
disable_quoting,
buf: Vec::new(),
in_quotes: false,
}
Expand All @@ -417,38 +423,49 @@ impl<R: AsyncBufRead> Stream for CsvLineStream<R> {
return Poll::Ready(Some(Err(CubeError::from_error(err))));
}
Ok(available) => {
if *projected.in_quotes {
let quote_pos = memchr::memchr(b'"', available);
if let Some(i) = quote_pos {
// It consumes every pair of quotes.
// Matching for escapes is unnecessary as it's double "" sequence
*projected.in_quotes = false;
if *projected.disable_quoting {
let new_line_pos = memchr::memchr(b'\n', available);
if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(false, i + 1)
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
}
} else {
let new_line_pos = memchr::memchr(b'\n', available);
let quote_pos = memchr::memchr(b'"', available);
let in_quotes = quote_pos.is_some()
&& (new_line_pos.is_some() && quote_pos < new_line_pos
|| new_line_pos.is_none());
if in_quotes {
if *projected.in_quotes {
let quote_pos = memchr::memchr(b'"', available);
if let Some(i) = quote_pos {
// It consumes every pair of quotes.
// Matching for escapes is unnecessary as it's double "" sequence
*projected.in_quotes = false;
projected.buf.extend_from_slice(&available[..=i]);
*projected.in_quotes = in_quotes;
(false, i + 1)
} else {
unreachable!()
projected.buf.extend_from_slice(available);
(false, available.len())
}
} else if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
let new_line_pos = memchr::memchr(b'\n', available);
let quote_pos = memchr::memchr(b'"', available);
let in_quotes = quote_pos.is_some()
&& (new_line_pos.is_some() && quote_pos < new_line_pos
|| new_line_pos.is_none());
if in_quotes {
if let Some(i) = quote_pos {
projected.buf.extend_from_slice(&available[..=i]);
*projected.in_quotes = in_quotes;
(false, i + 1)
} else {
unreachable!()
}
} else if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
}
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,19 +738,31 @@ impl SqlService for SqlServiceImpl {
_ => Err(CubeError::user(format!("Bad delimiter {}", option.value))),
})?;

let disable_quoting = with_options
.iter()
.find(|&opt| opt.name.value == "disable_quoting")
.map_or(Ok(false), |option| match &option.value {
Value::Boolean(value) => Ok(*value),
_ => Err(CubeError::user(format!(
"Bad disable_quoting flag (expected boolean) {}",
option.value
))),
})?;

if let Some(delimiter) = delimiter {
let quote = if disable_quoting { None } else { Some('"') };
import_format = match import_format {
ImportFormat::CSV => ImportFormat::CSVOptions {
delimiter: Some(delimiter),
has_header: true,
escape: None,
quote: None,
quote,
},
ImportFormat::CSVNoHeader => ImportFormat::CSVOptions {
delimiter: Some(delimiter),
has_header: false,
escape: None,
quote: None,
quote,
},
ImportFormat::CSVOptions {
has_header,
Expand Down
Loading