Skip to content

Commit b1a851e

Browse files
committed
Expose some configuration for cdf-to-csv to customize output CSVs
Some database engines need specific formattings to handle the conversion of nulls or timestamps
1 parent 6dac8e8 commit b1a851e

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.8.5"
10+
version = "1.8.6"
1111
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"

lambdas/cdf-to-csv/README.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ s3://csvs/
6262
|
6363
| Prefix for output CSV files with deleted rows (requires primary key set)
6464

65+
| `CSV_NULL_CHARACTER`
66+
|
67+
| Character to use as the null character in CSV output
68+
69+
| `CSV_TIMESTAMP_FORMAT`
70+
| `%Y-%m-%dT%H:%M:%S%.f`
71+
| Format string for `timestamp` fields
72+
73+
| `CSV_TIMESTAMP_TZ_FORMAT`
74+
|
75+
| Timestamp format for timestamp with timezone arrays. When empty ISO 8601 format is used.
76+
6577
| `RUST_LOG`
6678
| `error`
6779
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`

lambdas/cdf-to-csv/src/main.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
//! produce CSV files to ingest into Aurora for Change Data Feeds
44
55
use aws_lambda_events::event::sqs::SqsEvent;
6+
use deltalake::datafusion::config::CsvOptions;
67
use deltalake::datafusion::dataframe::DataFrameWriteOptions;
8+
use deltalake::datafusion::datasource::physical_plan::CsvOpener;
79
use deltalake::datafusion::prelude::*;
810
use deltalake::delta_datafusion::DeltaCdfTableProvider;
911
use deltalake::{DeltaOps, DeltaResult};
@@ -105,10 +107,18 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> DeltaResult<(), Error
105107
// write_csv will return a Vec,RecordBatch> which we can use for some rudimentary
106108
// statistics
107109
let inserts = inserts
108-
.write_csv("cdfo://inserts", DataFrameWriteOptions::default(), None)
110+
.write_csv(
111+
"cdfo://inserts",
112+
DataFrameWriteOptions::default(),
113+
Some(csv_options()),
114+
)
109115
.await?;
110116
let deletes = deletes
111-
.write_csv("cdfo://deletes", DataFrameWriteOptions::default(), None)
117+
.write_csv(
118+
"cdfo://deletes",
119+
DataFrameWriteOptions::default(),
120+
Some(csv_options()),
121+
)
112122
.await?;
113123

114124
let completion = Completion {
@@ -196,6 +206,16 @@ fn escape_dataframe(input: DataFrame) -> DeltaResult<DataFrame> {
196206
Ok(df)
197207
}
198208

209+
/// Return the computed [CsvOptions] for writing output files
210+
fn csv_options() -> CsvOptions {
211+
CsvOptions {
212+
null_value: std::env::var("CSV_NULL_CHARACTER").ok(),
213+
timestamp_format: std::env::var("CSV_TIMESTAMP_FORMAT").ok(),
214+
timestamp_tz_format: std::env::var("CSV_TIMESTAMP_TZ_FORMAT").ok(),
215+
..Default::default()
216+
}
217+
}
218+
199219
#[cfg(test)]
200220
mod tests {
201221
use std::fs::File;
@@ -301,7 +321,7 @@ mod tests {
301321
ctx.register_object_store(&Url::parse("cdfo://deletes").unwrap(), delete_store.clone());
302322

303323
let mut stream = store.list(None);
304-
while let Some(Ok(_entry)) = stream.next().await {
324+
if let Some(Ok(_entry)) = stream.next().await {
305325
unreachable!("The InMemory object store should be empty before the test begins");
306326
}
307327

@@ -310,10 +330,18 @@ mod tests {
310330
let change_data = retrieve_inserts(&ctx).await?;
311331
let deletes = retrieve_deletes(&ctx).await?;
312332
change_data
313-
.write_csv("cdfo://inserts", DataFrameWriteOptions::default(), None)
333+
.write_csv(
334+
"cdfo://inserts",
335+
DataFrameWriteOptions::default(),
336+
Some(csv_options()),
337+
)
314338
.await?;
315339
deletes
316-
.write_csv("cdfo://deletes", DataFrameWriteOptions::default(), None)
340+
.write_csv(
341+
"cdfo://deletes",
342+
DataFrameWriteOptions::default(),
343+
Some(csv_options()),
344+
)
317345
.await?;
318346

319347
let mut stream = store.list(None);
@@ -356,7 +384,7 @@ mod tests {
356384
df.write_csv(
357385
tempfile,
358386
DataFrameWriteOptions::default(),
359-
Some(CsvOptions::default()),
387+
Some(csv_options()),
360388
)
361389
.await?;
362390

0 commit comments

Comments
 (0)