Skip to content

Commit 48b0dd1

Browse files
committed
Enforce string column escaping when generating CSVs
I had made the assumption that the Datafusion writers would do this, but newlines will potentially break the output CSV. If the input strings have newlines and commas, that seems to break things Signed-off-by: R. Tyler Croy <rtyler@buoyantdata.com>
1 parent 13caf2d commit 48b0dd1

File tree

4 files changed

+66
-2
lines changed

4 files changed

+66
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.8.3"
10+
version = "1.8.4"
1111
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"

lambdas/cdf-to-csv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
2424

2525
[dev-dependencies]
2626
futures = "*"
27+
tempfile = "3.27.0"

lambdas/cdf-to-csv/src/main.rs

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async fn retrieve_inserts(ctx: &SessionContext) -> DeltaResult<DataFrame> {
135135
let df = ctx
136136
.sql("SELECT * FROM cdf WHERE _change_type IN ('insert', 'update_postimage')")
137137
.await?;
138-
Ok(df.drop_columns(&[
138+
Ok(escape_dataframe(df)?.drop_columns(&[
139139
"_change_type",
140140
"_commit_version",
141141
"_commit_timestamp",
@@ -178,8 +178,29 @@ struct Completion {
178178
deletes: usize,
179179
}
180180

181+
/// Escape the string columns for newlines in the input [DataFrame] to avoid any issues with CSV
182+
/// serialization
183+
fn escape_dataframe(input: DataFrame) -> DeltaResult<DataFrame> {
184+
use deltalake::arrow::datatypes::DataType;
185+
186+
let mut df = input.clone();
187+
let schema = input.schema();
188+
for field in schema.fields() {
189+
if field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8 {
190+
df = df.with_column(
191+
field.name(),
192+
replace(col(field.name()), lit("\n"), lit("\\n")),
193+
)?
194+
}
195+
}
196+
Ok(df)
197+
}
198+
181199
#[cfg(test)]
182200
mod tests {
201+
use std::fs::File;
202+
use std::io::{BufRead, BufReader};
203+
183204
use super::*;
184205
use futures::StreamExt;
185206
use object_store::ObjectStore;
@@ -312,4 +333,44 @@ mod tests {
312333
assert!(insert_found, "No insert was found to be written");
313334
Ok(())
314335
}
336+
337+
#[tokio::test]
338+
async fn test_writing_with_newlines() -> DeltaResult<()> {
339+
use deltalake::datafusion::config::CsvOptions;
340+
341+
let ctx = SessionContext::new();
342+
let temp = tempfile::tempdir()?;
343+
let tempfile = temp.path().join("some.csv");
344+
let tempfile = tempfile.as_os_str().to_str().unwrap();
345+
346+
let mut df = ctx
347+
.read_json(
348+
"./tests/data/row.json",
349+
NdJsonReadOptions::default().schema_infer_max_records(1),
350+
)
351+
.await?;
352+
let written = df.clone();
353+
let written_schema = written.schema();
354+
df = escape_dataframe(df)?;
355+
356+
df.write_csv(
357+
tempfile,
358+
DataFrameWriteOptions::default(),
359+
Some(CsvOptions::default()),
360+
)
361+
.await?;
362+
363+
let fd = File::open(tempfile)?;
364+
let reader = BufReader::new(fd);
365+
let lines: Vec<_> = reader.lines().collect();
366+
assert_eq!(
367+
3,
368+
lines.len(),
369+
"Should have only written three lines for the sample input data"
370+
);
371+
372+
let df = ctx.read_csv(tempfile, CsvReadOptions::default()).await?;
373+
assert_eq!(written_schema, df.schema());
374+
Ok(())
375+
}
315376
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"id": 209, "other": 149, "rating" : 1.00, "r" : "TRUST ME BRO\the sources are \"my friend once...\" \"my friend's daughter...\"\nbook full of chicken! as a pilot, break dancing was simply awfull\n\nper quanto" }
2+
{"id": 209, "other": 149, "rating" : 1.00, "r" : "TRUST ME BRO\the sources are \"my friend once...\" \"my friend's daughter...\"\nbook full of chicken! as a pilot, break dancing was simply awfull\n\nper quanto" }

0 commit comments

Comments
 (0)