Skip to content

Commit bf8f7d6

Browse files
committed
Upgrade to delta-rs 0.29.2 and pull CommitProperties through for writers
Closes #39 Signed-off-by: R. Tyler Croy <rtyler@buoyantdata.com>
1 parent 1863111 commit bf8f7d6

File tree

4 files changed

+17
-12
lines changed

4 files changed

+17
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ anyhow = "=1"
2020
chrono = "0.4"
2121
aws_lambda_events = { version = "0.15", default-features = false, features = ["sns", "sqs", "s3"] }
2222
# The datafusion feature is required to support invariants which may be in error, but is required as of currently released 0.18.2
23-
deltalake = { version = "0.29.0", features = ["s3", "json", "datafusion"] }
23+
deltalake = { version = "0.29.2", features = ["s3", "json", "datafusion"] }
2424
#deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3", "json", "datafusion"]}
2525
#deltalake = { path = "../../delta-io/delta-rs/crates/deltalake", features = ["s3", "json", "datafusion"]}
2626
futures = { version = "0.3" }

crates/oxbow/src/lib.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use deltalake::arrow::datatypes::Schema as ArrowSchema;
55
use deltalake::kernel::engine::arrow_conversion::TryFromArrow;
66
use deltalake::kernel::schema::{Schema, StructField};
7+
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties};
78
use deltalake::logstore::{LogStoreRef, ObjectStoreRef, StorageConfig, logstore_for};
89
use deltalake::operations::create::CreateBuilder;
910
use deltalake::parquet::arrow::async_reader::{
@@ -207,18 +208,21 @@ pub async fn create_table_with(
207208
.await
208209
}
209210

211+
/// Return the common default [CommitProperties] to be used by issuing commits in oxbow
212+
pub(crate) fn default_commit_properties() -> CommitProperties {
213+
CommitProperties::default()
214+
// Turn off cleanup of expired logs. After each commit, versions are
215+
// scanned (ListObject on s3) to clean up expired entries. This creates
216+
// significant overhead when there are many versions on each commit.
217+
.with_cleanup_expired_logs(Some(false))
218+
}
219+
210220
/// Commit the given [Action]s to the [DeltaTable]
211221
pub async fn commit_to_table(actions: &[Action], table: &DeltaTable) -> DeltaResult<i64> {
212-
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties};
213222
if actions.is_empty() {
214223
return table.version().ok_or(DeltaTableError::NotInitialized);
215224
}
216-
let commit = CommitProperties::default()
217-
// Turn off cleanup of expired logs. After each commit, versions are
218-
// scanned (ListObject on s3) to clean up expired entries. This creates
219-
// significant overhead when there are many versions on each commit.
220-
.with_cleanup_expired_logs(Some(false));
221-
let pre_commit = CommitBuilder::from(commit)
225+
let pre_commit = CommitBuilder::from(default_commit_properties())
222226
.with_actions(actions.to_vec())
223227
.build(
224228
Some(table.snapshot()?),
@@ -1226,7 +1230,6 @@ mod tests {
12261230
let table_url = Url::from_file_path(&table_path).expect("Failed to parse local path");
12271231
let store =
12281232
logstore_for(table_url, StorageConfig::default()).expect("Failed to get object store");
1229-
12301233
let files = discover_parquet_files(store.object_store(None).clone())
12311234
.await
12321235
.expect("Failed to discover parquet files");

crates/oxbow/src/write.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ pub async fn append_batches(
1717
mut table: DeltaTable,
1818
batches: impl IntoIterator<Item = Result<RecordBatch, ArrowError>>,
1919
) -> DeltaResult<DeltaTable> {
20-
let mut writer = RecordBatchWriter::for_table(&table)?;
20+
let mut writer = RecordBatchWriter::for_table(&table)?
21+
.with_commit_properties(super::default_commit_properties());
2122
let mut written = false;
2223

2324
trace!("Iterating through batches to write");
@@ -47,7 +48,8 @@ pub async fn append_values(
4748
let schema = table.snapshot()?.snapshot().arrow_schema();
4849
debug!("Attempting to append values with schema: {schema:?}");
4950

50-
let mut writer = RecordBatchWriter::for_table(&table)?;
51+
let mut writer = RecordBatchWriter::for_table(&table)?
52+
.with_commit_properties(super::default_commit_properties());
5153
let mut written = false;
5254

5355
for value in values {

lambdas/group-events/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ mod tests {
152152
let event: S3Event = serde_json::from_str(&buf).expect("Failed to parse");
153153
assert_eq!(4, event.records.len());
154154

155-
let fifos = segmented_by_prefix(&event.records, Some(4)).expect("Failed to segment");
155+
let fifos = segmented_by_prefix(&event.records, Some(8)).expect("Failed to segment");
156156
println!("fifos: {fifos:#?}");
157157
assert!(
158158
fifos.keys().len() > 2,

0 commit comments

Comments
 (0)