Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-athena-driver/src/AthenaDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
types,
csvNoHeader: true,
csvDelimiter: '^A',
csvDisableQouting: true,
};
}

Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/src/driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export interface TableCSVData extends DownloadTableBase {

csvDelimiter?: string;

csvDisableQouting?: boolean;

/**
* The CSV file escape symbol.
*/
Expand Down
7 changes: 7 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type CreateTableOptions = {
sourceTable?: any
sealAt?: string
delimiter?: string
disableQuoting?: boolean
};

export class CubeStoreDriver extends BaseDriver implements DriverInterface {
Expand Down Expand Up @@ -110,6 +111,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
if (options.delimiter) {
withEntries.push(`delimiter = '${options.delimiter}'`);
}
if (options.disableQuoting) {
withEntries.push(`disable_quoting = true`);
}
if (options.buildRangeEnd) {
withEntries.push(`build_range_end = '${options.buildRangeEnd}'`);
}
Expand Down Expand Up @@ -295,6 +299,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
if (tableData.csvDelimiter) {
options.delimiter = tableData.csvDelimiter;
}
if (tableData.csvDisableQouting) {
options.disableQuoting = tableData.csvDisableQouting;
}
options.files = files;
}

Expand Down
24 changes: 22 additions & 2 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2230,8 +2230,12 @@ async fn create_table_with_csv_no_header(service: Box<dyn SqlClient>) {

async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClient>) {
let file = write_tmp_file(indoc! {"
\"apple\u{0001}31
a\"pple\u{0001}32
a\"pp\"le\u{0001}12
apple\u{0001}2
banana\u{0001}3
\"orange\" orange\u{0001}4
"})
.unwrap();
let path = file.path().to_string_lossy();
Expand All @@ -2240,7 +2244,7 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
.await
.unwrap();
let _ = service
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A') LOCATION '{}'", path).as_str())
.exec_query(format!("CREATE TABLE test.table (`fruit` text, `number` int) WITH (input_format = 'csv_no_header', delimiter = '^A', disable_quoting = true) LOCATION '{}'", path).as_str())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's test without disabling quoting?

.await
.unwrap();
let result = service
Expand All @@ -2250,8 +2254,24 @@ async fn create_table_with_csv_no_header_and_delimiter(service: Box<dyn SqlClien
assert_eq!(
to_rows(&result),
vec![
vec![
TableValue::String("\"apple".to_string()),
TableValue::Int(31)
],
vec![
TableValue::String("\"orange\" orange".to_string()),
TableValue::Int(4)
],
vec![
TableValue::String("a\"pp\"le".to_string()),
TableValue::Int(12)
],
vec![
TableValue::String("a\"pple".to_string()),
TableValue::Int(32)
],
vec![TableValue::String("apple".to_string()), TableValue::Int(2)],
vec![TableValue::String("banana".to_string()), TableValue::Int(3)]
vec![TableValue::String("banana".to_string()), TableValue::Int(3)],
]
);
}
Expand Down
103 changes: 60 additions & 43 deletions rust/cubestore/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,15 @@ impl ImportFormat {
{
match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader | ImportFormat::CSVOptions { .. } => {
let (delimiter, disable_quoting) = match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader => (',', false),
ImportFormat::CSVOptions {
delimiter, quote, ..
} => (delimiter.unwrap_or(','), quote.is_none()),
};

let lines_stream: Pin<Box<dyn Stream<Item = Result<String, CubeError>> + Send>> =
Box::pin(CsvLineStream::new(reader));
Box::pin(CsvLineStream::new(reader, disable_quoting));

let mut header_mapping = match self {
ImportFormat::CSVNoHeader
Expand All @@ -89,11 +96,6 @@ impl ImportFormat {
_ => None,
};

let delimiter = match self {
ImportFormat::CSV | ImportFormat::CSVNoHeader => ',',
ImportFormat::CSVOptions { delimiter, .. } => delimiter.unwrap_or(','),
};

if delimiter as u16 > 255 {
return Err(CubeError::user(format!(
"Non ASCII delimiters are unsupported: '{}'",
Expand All @@ -104,7 +106,8 @@ impl ImportFormat {
let rows = lines_stream.map(move |line| -> Result<Option<Row>, CubeError> {
let str = line?;

let mut parser = CsvLineParser::new(delimiter as u8, str.as_str());
let mut parser =
CsvLineParser::new(delimiter as u8, disable_quoting, str.as_str());

if header_mapping.is_none() {
let mut mapping = Vec::new();
Expand Down Expand Up @@ -310,21 +313,23 @@ fn parse_binary_data(value: &str) -> Result<Vec<u8>, CubeError> {

struct CsvLineParser<'a> {
delimiter: u8,
disable_quoting: bool,
line: &'a str,
remaining: &'a str,
}

impl<'a> CsvLineParser<'a> {
fn new(delimiter: u8, line: &'a str) -> Self {
fn new(delimiter: u8, disable_quoting: bool, line: &'a str) -> Self {
Self {
delimiter,
disable_quoting,
line,
remaining: line,
}
}

fn next_value(&mut self) -> Result<MaybeOwnedStr<'_>, CubeError> {
Ok(
if !self.disable_quoting {
if let Some(b'"') = self.remaining.as_bytes().iter().nth(0) {
let mut closing_index = None;
let mut seen_escapes = false;
Expand Down Expand Up @@ -356,19 +361,18 @@ impl<'a> CsvLineParser<'a> {
res = MaybeOwnedStr::Borrowed(&self.remaining[0..closing_index])
}
self.remaining = self.remaining[(closing_index + 1)..].as_ref();
res
} else {
let next_comma = self
.remaining
.as_bytes()
.iter()
.position(|c| *c == self.delimiter)
.unwrap_or(self.remaining.len());
let res = &self.remaining[0..next_comma];
self.remaining = self.remaining[next_comma..].as_ref();
MaybeOwnedStr::Borrowed(res)
},
)
return Ok(res);
}
}
let next_comma = self
.remaining
.as_bytes()
.iter()
.position(|c| *c == self.delimiter)
.unwrap_or(self.remaining.len());
let res = &self.remaining[0..next_comma];
self.remaining = self.remaining[next_comma..].as_ref();
Ok(MaybeOwnedStr::Borrowed(res))
}

fn advance(&mut self) -> Result<(), CubeError> {
Expand All @@ -385,15 +389,17 @@ pin_project! {
struct CsvLineStream<R: AsyncBufRead> {
#[pin]
reader: R,
disable_quoting: bool,
buf: Vec<u8>,
in_quotes: bool,
}
}

impl<R: AsyncBufRead> CsvLineStream<R> {
pub fn new(reader: R) -> Self {
pub fn new(reader: R, disable_quoting: bool) -> Self {
Self {
reader,
disable_quoting,
buf: Vec::new(),
in_quotes: false,
}
Expand All @@ -417,38 +423,49 @@ impl<R: AsyncBufRead> Stream for CsvLineStream<R> {
return Poll::Ready(Some(Err(CubeError::from_error(err))));
}
Ok(available) => {
if *projected.in_quotes {
let quote_pos = memchr::memchr(b'"', available);
if let Some(i) = quote_pos {
// It consumes every pair of quotes.
// Matching for escapes is unnecessary as it's double "" sequence
*projected.in_quotes = false;
if *projected.disable_quoting {
let new_line_pos = memchr::memchr(b'\n', available);
if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(false, i + 1)
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
}
} else {
let new_line_pos = memchr::memchr(b'\n', available);
let quote_pos = memchr::memchr(b'"', available);
let in_quotes = quote_pos.is_some()
&& (new_line_pos.is_some() && quote_pos < new_line_pos
|| new_line_pos.is_none());
if in_quotes {
if *projected.in_quotes {
let quote_pos = memchr::memchr(b'"', available);
if let Some(i) = quote_pos {
// It consumes every pair of quotes.
// Matching for escapes is unnecessary as it's double "" sequence
*projected.in_quotes = false;
projected.buf.extend_from_slice(&available[..=i]);
*projected.in_quotes = in_quotes;
(false, i + 1)
} else {
unreachable!()
projected.buf.extend_from_slice(available);
(false, available.len())
}
} else if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
let new_line_pos = memchr::memchr(b'\n', available);
let quote_pos = memchr::memchr(b'"', available);
let in_quotes = quote_pos.is_some()
&& (new_line_pos.is_some() && quote_pos < new_line_pos
|| new_line_pos.is_none());
if in_quotes {
if let Some(i) = quote_pos {
projected.buf.extend_from_slice(&available[..=i]);
*projected.in_quotes = in_quotes;
(false, i + 1)
} else {
unreachable!()
}
} else if let Some(i) = new_line_pos {
projected.buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
projected.buf.extend_from_slice(available);
(false, available.len())
}
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,19 +738,31 @@ impl SqlService for SqlServiceImpl {
_ => Err(CubeError::user(format!("Bad delimiter {}", option.value))),
})?;

let disable_quoting = with_options
.iter()
.find(|&opt| opt.name.value == "disable_quoting")
.map_or(Ok(false), |option| match &option.value {
Value::Boolean(value) => Ok(*value),
_ => Err(CubeError::user(format!(
"Bad disable_quoting flag (expected boolean) {}",
option.value
))),
})?;

if let Some(delimiter) = delimiter {
let quote = if disable_quoting { None } else { Some('"') };
import_format = match import_format {
ImportFormat::CSV => ImportFormat::CSVOptions {
delimiter: Some(delimiter),
has_header: true,
escape: None,
quote: None,
quote,
},
ImportFormat::CSVNoHeader => ImportFormat::CSVOptions {
delimiter: Some(delimiter),
has_header: false,
escape: None,
quote: None,
quote,
},
ImportFormat::CSVOptions {
has_header,
Expand Down
Loading