Skip to content

Commit 8b7d289

Browse files
committed
Produce a small completion marker file when cdf-to-csv completes
For larger data sets the write_csv() call in Datafusion can produce multiple CSV files. In order to allow downstream event-driven processes to know when the entirety of the "action" has completed this change now writes `cdf-completion.json` which contains some rudimentary statistics within.
1 parent 8c9de85 commit 8b7d289

File tree

16 files changed

+128
-55
lines changed

16 files changed

+128
-55
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.5.2"
11-
edition = "2021"
10+
version = "1.6.0"
11+
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"
1414
repository = "https://github.com/buoyant-data/oxbow"

crates/oxbow-lambda-shared/src/trigger.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ mod tests {
234234
assert_eq!(ChangeType::TransactionLog { version }, change);
235235
}
236236

237-
let (change, _) = ChangeType::from_key("/mytable/_change_data/cdc-00000-924d9ac7-21a9-4121-b067-a0a6517aa8ed.c000.snappy.parquet");
237+
let (change, _) = ChangeType::from_key(
238+
"/mytable/_change_data/cdc-00000-924d9ac7-21a9-4121-b067-a0a6517aa8ed.c000.snappy.parquet",
239+
);
238240
assert_eq!(ChangeType::ChangeDataFeed, change);
239241

240242
let (change, _) = ChangeType::from_key(

crates/oxbow-sqs/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ impl TimedConsumer {
192192
impl std::ops::Drop for TimedConsumer {
193193
fn drop(&mut self) {
194194
if !self.receive_handles.is_empty() {
195-
error!("The TimedConsumer was not flushed before being dropped! This causes data duplication, you have to flush!");
195+
error!(
196+
"The TimedConsumer was not flushed before being dropped! This causes data duplication, you have to flush!"
197+
);
196198
}
197199
}
198200
}

crates/oxbow/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ homepage.workspace = true
77

88
[dependencies]
99
chrono = { workspace = true }
10-
deltalake = { workspace = true, default-features = false }
10+
deltalake = { workspace = true }
1111
tracing = { workspace = true }
1212
url = { workspace = true }
1313

crates/oxbow/src/lib.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1+
use deltalake::ObjectStore;
12
///
23
/// The lib module contains the business logic of oxbow, regardless of the interface implementation
34
///
45
use deltalake::arrow::datatypes::Schema as ArrowSchema;
56
use deltalake::kernel::models::{Schema, StructField};
67
use deltalake::kernel::*;
78
use deltalake::logstore::ObjectStoreRef;
8-
use deltalake::logstore::{logstore_for, LogStoreRef};
9+
use deltalake::logstore::{LogStoreRef, logstore_for};
910
use deltalake::operations::create::CreateBuilder;
1011
use deltalake::parquet::arrow::async_reader::{
1112
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
1213
};
1314
use deltalake::parquet::file::metadata::ParquetMetaData;
1415
use deltalake::protocol::*;
15-
use deltalake::ObjectStore;
1616
use deltalake::{DeltaResult, DeltaTable, DeltaTableError, ObjectMeta};
1717
use futures::StreamExt;
1818
use tracing::log::*;
@@ -138,7 +138,9 @@ pub async fn discover_parquet_files(
138138
debug!("Discovered file: {:?}", meta);
139139
result.push(meta);
140140
} else {
141-
warn!("Was asked to discover parquet files on what appears to already be a table, and found checkpoint files: {filename}");
141+
warn!(
142+
"Was asked to discover parquet files on what appears to already be a table, and found checkpoint files: {filename}"
143+
);
142144
}
143145
}
144146
}
@@ -444,34 +446,37 @@ fn coerce_field(
444446
) -> deltalake::arrow::datatypes::FieldRef {
445447
use deltalake::arrow::datatypes::*;
446448
match field.data_type() {
447-
DataType::Timestamp(unit, tz) => {
448-
match unit {
449-
TimeUnit::Nanosecond => {
450-
warn!("Given a nanosecond precision which we will cowardly pretend is microseconds");
451-
let field = Field::new(
452-
field.name(),
453-
DataType::Timestamp(TimeUnit::Microsecond, tz.clone()),
454-
field.is_nullable(),
455-
);
456-
return Arc::new(field);
457-
}
458-
TimeUnit::Millisecond => {
459-
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
460-
let field = Field::new(
461-
field.name(),
462-
DataType::Timestamp(TimeUnit::Microsecond, tz.clone()),
463-
field.is_nullable(),
464-
);
465-
return Arc::new(field);
466-
}
467-
_ => {}
449+
DataType::Timestamp(unit, tz) => match unit {
450+
TimeUnit::Nanosecond => {
451+
warn!(
452+
"Given a nanosecond precision which we will cowardly pretend is microseconds"
453+
);
454+
let field = Field::new(
455+
field.name(),
456+
DataType::Timestamp(TimeUnit::Microsecond, tz.clone()),
457+
field.is_nullable(),
458+
);
459+
return Arc::new(field);
468460
}
469-
}
461+
TimeUnit::Millisecond => {
462+
warn!(
463+
"I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)",
464+
field.name()
465+
);
466+
let field = Field::new(
467+
field.name(),
468+
DataType::Timestamp(TimeUnit::Microsecond, tz.clone()),
469+
field.is_nullable(),
470+
);
471+
return Arc::new(field);
472+
}
473+
_ => {}
474+
},
470475
DataType::List(field) => {
471476
let coerced = coerce_field(field.clone());
472477
let list_field = Field::new(field.name(), DataType::List(coerced), field.is_nullable());
473478
return Arc::new(list_field);
474-
}
479+
},
475480
DataType::Struct(fields) => {
476481
let coerced: Vec<deltalake::arrow::datatypes::FieldRef> =
477482
fields.iter().map(|f| coerce_field(f.clone())).collect();
@@ -481,7 +486,7 @@ fn coerce_field(
481486
field.is_nullable(),
482487
);
483488
return Arc::new(struct_field);
484-
}
489+
},
485490
_ => {}
486491
};
487492
field.clone()

crates/oxbow/src/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use deltalake::arrow::array::RecordBatch;
33
use deltalake::arrow::datatypes::Schema as ArrowSchema;
44
use deltalake::arrow::error::ArrowError;
55
use deltalake::arrow::json::reader::ReaderBuilder;
6-
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
6+
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
77
use deltalake::{DeltaResult, DeltaTable};
88

99
use std::io::Cursor;

lambdas/auto-tag/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use aws_lambda_events::event::s3::S3EventRecord;
22
use aws_lambda_events::event::sqs::SqsEvent;
33
use aws_sdk_s3::types::{Tag, Tagging};
4-
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
4+
use lambda_runtime::{Error, LambdaEvent, run, service_fn};
55
use tracing::log::*;
66

77
use std::collections::HashMap;

lambdas/cdf-to-csv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ chrono = { workspace = true }
1111
lambda_runtime = "0.14.2"
1212
deltalake = { workspace = true, features = ["s3", "json", "datafusion"] }
1313
object_store = { version = "0.12.1", features = ["cloud"]}
14+
serde = { workspace = true }
1415
serde_json = { workspace = true }
1516
tokio = { workspace = true }
1617
tracing = { workspace = true }

lambdas/cdf-to-csv/src/main.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ use deltalake::datafusion::dataframe::DataFrameWriteOptions;
77
use deltalake::datafusion::prelude::*;
88
use deltalake::delta_datafusion::DeltaCdfTableProvider;
99
use deltalake::{DeltaOps, DeltaResult};
10-
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
11-
use object_store::prefix::PrefixStore;
10+
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
1211
use object_store::ObjectStore;
12+
use object_store::PutPayload;
13+
use object_store::path::Path;
14+
use object_store::prefix::PrefixStore;
1315
use oxbow_lambda_shared::*;
16+
use serde::{Deserialize, Serialize};
1417
use std::sync::Arc;
1518
use tracing::log::*;
1619
use url::Url;
@@ -98,13 +101,22 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> DeltaResult<(), Error
98101

99102
let inserts = retrieve_inserts(&ctx).await?;
100103
let deletes = retrieve_deletes(&ctx).await?;
101-
inserts
104+
105+
// write_csv will return a Vec,RecordBatch> which we can use for some rudimentary
106+
// statistics
107+
let inserts = inserts
102108
.write_csv("cdfo://inserts", DataFrameWriteOptions::default(), None)
103109
.await?;
104-
105-
deletes
110+
let deletes = deletes
106111
.write_csv("cdfo://deletes", DataFrameWriteOptions::default(), None)
107112
.await?;
113+
114+
let completion = Completion {
115+
inserts: inserts.iter().map(|rb| rb.num_rows()).sum(),
116+
deletes: deletes.iter().map(|rb| rb.num_rows()).sum(),
117+
};
118+
119+
mark_complete(store.clone(), &completion).await?;
108120
} else {
109121
warn!("Invoked but didn't find min/max trigger versions, something is fishy!");
110122
}
@@ -131,6 +143,7 @@ async fn retrieve_inserts(ctx: &SessionContext) -> DeltaResult<DataFrame> {
131143
])?)
132144
}
133145

146+
/// Compute the deletes from the change data feed associated with the [SessionContext]
134147
async fn retrieve_deletes(ctx: &SessionContext) -> DeltaResult<DataFrame> {
135148
let df = ctx
136149
.sql("SELECT * FROM cdf WHERE _change_type IN ('delete')")
@@ -143,12 +156,33 @@ async fn retrieve_deletes(ctx: &SessionContext) -> DeltaResult<DataFrame> {
143156
])?)
144157
}
145158

159+
/// Write a completion file to the given object store.
160+
///
161+
/// This is expected to be the prefix store associated with a werite
162+
async fn mark_complete(store: Arc<dyn ObjectStore>, completion: &Completion) -> DeltaResult<()> {
163+
// Write a sentinel file once the writes have completed successfully
164+
store
165+
.put(
166+
&Path::from("cdf-completion.json"),
167+
serde_json::to_string(completion)
168+
.expect("Failed to serialize Completion")
169+
.into(),
170+
)
171+
.await?;
172+
Ok(())
173+
}
174+
175+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
176+
struct Completion {
177+
inserts: usize,
178+
deletes: usize,
179+
}
180+
146181
#[cfg(test)]
147182
mod tests {
148183
use super::*;
149184
use futures::StreamExt;
150-
use object_store::path::Path;
151-
use object_store::ObjectStore;
185+
use object_store::{GetResultPayload, ObjectStore};
152186

153187
use deltalake::datafusion::{
154188
common::assert_batches_sorted_eq, dataframe::DataFrameWriteOptions,
@@ -161,6 +195,27 @@ mod tests {
161195
let ctx = SessionContext::new();
162196
Ok((ctx, cdf))
163197
}
198+
199+
#[tokio::test]
200+
async fn test_mark_complete() -> DeltaResult<()> {
201+
let store: Arc<dyn ObjectStore> = Arc::new(object_store::memory::InMemory::new());
202+
let completion = Completion {
203+
inserts: 1,
204+
deletes: 0,
205+
};
206+
mark_complete(store.clone(), &completion).await?;
207+
let _ = store.head(&Path::from("cdf-completion.json")).await?;
208+
209+
let result = store.get(&Path::from("cdf-completion.json")).await?;
210+
let bytes = result.bytes().await?;
211+
let s = String::from_utf8(bytes.to_vec()).expect("Failed to convert buffer");
212+
let received: Completion = serde_json::from_str(&s)?;
213+
214+
assert_eq!(completion, received);
215+
216+
Ok(())
217+
}
218+
164219
#[tokio::test]
165220
async fn test_read_cdf_deletes() -> DeltaResult<()> {
166221
let (ctx, cdf) = cdf_test_setup().await?;

lambdas/file-loader/src/main.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
use aws_lambda_events::event::sqs::SqsEvent;
66
use aws_lambda_events::s3::S3EventRecord;
77
use aws_lambda_events::sqs::SqsMessage;
8+
use deltalake::DeltaResult;
89
use deltalake::arrow::datatypes::Schema as ArrowSchema;
910
use deltalake::arrow::json::reader::ReaderBuilder;
10-
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
11-
use deltalake::DeltaResult;
11+
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
1212
use lambda_runtime::tracing::{debug, error, info, trace};
13-
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
13+
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
1414

1515
use oxbow_lambda_shared::*;
1616
use oxbow_sqs::{ConsumerConfig, TimedConsumer};
@@ -138,7 +138,9 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
138138
}
139139
}
140140
RecordType::Unknown => {
141-
error!("file-loader was invoked for a file with an unknown suffix! Ignoring: {file_record:?}");
141+
error!(
142+
"file-loader was invoked for a file with an unknown suffix! Ignoring: {file_record:?}"
143+
);
142144
}
143145
}
144146
}
@@ -147,7 +149,9 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
147149
let mbytes_to_consume: usize = str::parse(&bytes_to_consume)
148150
.expect("BUFFER_MORE_BYTES_ALLOWED must be parseable as a uint64");
149151

150-
info!("Allocated {bytes_consumed} bytes thus far... I can only have {mbytes_to_consume}MB");
152+
info!(
153+
"Allocated {bytes_consumed} bytes thus far... I can only have {mbytes_to_consume}MB"
154+
);
151155
if bytes_consumed >= (mbytes_to_consume * 1024 * 1024) {
152156
info!("Finalizing after consuming {bytes_consumed} bytes of memory");
153157
break;

0 commit comments

Comments
 (0)