Skip to content

Commit 1863111

Browse files
committed
Upgrade the entire oxbow suite to delta-rs 0.29.x
This release has newer performance improvements on log replay and should noticeably reduce memory consumption of the lambdas with larger tables Signed-off-by: R. Tyler Croy <rtyler@buoyantdata.com>
1 parent 96b347f commit 1863111

File tree

11 files changed

+115
-109
lines changed

11 files changed

+115
-109
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.7.2"
10+
version = "1.8.0"
1111
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"
@@ -20,7 +20,7 @@ anyhow = "=1"
2020
chrono = "0.4"
2121
aws_lambda_events = { version = "0.15", default-features = false, features = ["sns", "sqs", "s3"] }
2222
# The datafusion feature is required to support invariants which may be in error, but is required as of currently released 0.18.2
23-
deltalake = { version = "0.26.2", features = ["s3", "json", "datafusion"] }
23+
deltalake = { version = "0.29.0", features = ["s3", "json", "datafusion"] }
2424
#deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3", "json", "datafusion"]}
2525
#deltalake = { path = "../../delta-io/delta-rs/crates/deltalake", features = ["s3", "json", "datafusion"]}
2626
futures = { version = "0.3" }

cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ anyhow = { workspace = true }
1010
tokio = { workspace = true, features = ["full"] }
1111
tracing = { workspace = true }
1212
tracing-subscriber = { workspace = true }
13+
url = { workspace = true }
1314

1415
gumdrop = "=0.8"
1516
oxbow = { path = "../crates/oxbow" }

cli/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use gumdrop::Options;
66
use tracing::log::*;
7+
use url::Url;
78

89
/*
910
* Flags is a structure for managing command linke parameters
@@ -39,7 +40,7 @@ async fn main() -> Result<(), anyhow::Error> {
3940
info!("Starting oxbow");
4041
let flags = Flags::parse_args_default_or_exit();
4142
debug!("Options as read: {:?}", flags);
42-
let location = table_location(&flags)?;
43+
let location = Url::parse(&table_location(&flags)?)?;
4344
info!("Using the table location of: {:?}", location);
4445

4546
oxbow::convert(&location, None)

crates/oxbow/src/lib.rs

Lines changed: 65 additions & 71 deletions
Large diffs are not rendered by default.

crates/oxbow/src/lock.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@
44
///
55
use dynamodb_lock::Region;
66
use tracing::log::*;
7+
use url::Url;
78

89
use std::collections::HashMap;
910

1011
///
1112
///Wrapper aroudn [deltalake::open_table] which will open the table with the appropriate storage
1213
///options needed for locking
1314
pub async fn open_table(table_uri: &str) -> deltalake::DeltaResult<deltalake::DeltaTable> {
14-
deltalake::open_table_with_storage_options(&table_uri, storage_options(table_uri)).await
15+
deltalake::open_table_with_storage_options(
16+
Url::parse(table_uri).expect("Fail"),
17+
storage_options(table_uri),
18+
)
19+
.await
1520
}
1621

1722
/// Default storage options for using with `deltalake` calls

crates/oxbow/src/write.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use chrono::prelude::*;
22
use deltalake::arrow::array::RecordBatch;
3-
use deltalake::arrow::datatypes::Schema as ArrowSchema;
43
use deltalake::arrow::error::ArrowError;
54
use deltalake::arrow::json::reader::ReaderBuilder;
65
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
@@ -45,18 +44,15 @@ pub async fn append_values(
4544
mut table: DeltaTable,
4645
values: impl IntoIterator<Item: AsRef<str>>,
4746
) -> DeltaResult<DeltaTable> {
48-
let schema = table.get_schema()?;
47+
let schema = table.snapshot()?.snapshot().arrow_schema();
4948
debug!("Attempting to append values with schema: {schema:?}");
50-
let schema = ArrowSchema::try_from(schema)?;
5149

5250
let mut writer = RecordBatchWriter::for_table(&table)?;
5351
let mut written = false;
5452

5553
for value in values {
5654
let cursor: Cursor<&str> = Cursor::new(value.as_ref());
57-
let reader = ReaderBuilder::new(schema.clone().into())
58-
.build(cursor)
59-
.unwrap();
55+
let reader = ReaderBuilder::new(schema.clone()).build(cursor).unwrap();
6056

6157
for res in reader {
6258
match res {
@@ -131,8 +127,7 @@ mod tests {
131127
use deltalake::*;
132128

133129
async fn setup_test_table() -> DeltaResult<DeltaTable> {
134-
DeltaOps::try_from_uri("memory://")
135-
.await?
130+
DeltaOps::new_in_memory()
136131
.create()
137132
.with_table_name("test")
138133
.with_column("id", DataType::INTEGER, true, None)
@@ -146,8 +141,7 @@ mod tests {
146141
use std::fs::File;
147142
use std::io::BufReader;
148143

149-
let table = DeltaOps::try_from_uri("memory://")
150-
.await?
144+
let table = DeltaOps::new_in_memory()
151145
.create()
152146
.with_table_name("test")
153147
.with_column("current", DataType::BOOLEAN, true, None)
@@ -157,12 +151,12 @@ mod tests {
157151

158152
let buf = File::open("../../tests/data/senators.jsonl")?;
159153
let reader = BufReader::new(buf);
160-
let schema = ArrowSchema::try_from(table.get_schema()?)?;
154+
let schema = table.snapshot()?.snapshot().arrow_schema();
161155

162-
let json = deltalake::arrow::json::ReaderBuilder::new(schema.into()).build(reader)?;
156+
let json = deltalake::arrow::json::ReaderBuilder::new(schema).build(reader)?;
163157

164158
let table = append_batches(table, json).await?;
165-
assert_eq!(table.version(), 1);
159+
assert_eq!(table.version(), Some(1));
166160
Ok(())
167161
}
168162

@@ -178,7 +172,7 @@ mod tests {
178172
.await
179173
.expect("Failed to do nothing");
180174

181-
assert_eq!(table.version(), 1);
175+
assert_eq!(table.version(), Some(1));
182176
Ok(())
183177
}
184178

@@ -191,9 +185,8 @@ mod tests {
191185
"#;
192186

193187
let cursor = Cursor::new(jsonl);
194-
let schema = table.get_schema()?;
195-
let schema = ArrowSchema::try_from(schema)?;
196-
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
188+
let schema = table.snapshot()?.snapshot().arrow_schema();
189+
let mut reader = ReaderBuilder::new(schema).build(cursor).unwrap();
197190

198191
while let Some(Ok(batch)) = reader.next() {
199192
let batch = augment_with_ds(batch)?;
@@ -215,12 +208,13 @@ mod tests {
215208
.await
216209
.expect("Failed to do nothing");
217210

218-
assert_eq!(table.version(), 1);
211+
assert_eq!(table.version(), Some(1));
219212
Ok(())
220213
}
221214

222215
#[tokio::test]
223216
async fn test_append_values_checkpoint() -> DeltaResult<()> {
217+
use deltalake::table::config::TablePropertiesExt;
224218
let mut table = setup_test_table().await?;
225219

226220
for _ in 0..101 {
@@ -235,7 +229,7 @@ mod tests {
235229

236230
if let Some(state) = table.state.as_ref() {
237231
// The default is expected to be 100
238-
assert_eq!(100, state.table_config().checkpoint_interval());
232+
assert_eq!(100, state.table_config().checkpoint_interval().get());
239233
}
240234

241235
use deltalake::Path;
@@ -245,7 +239,7 @@ mod tests {
245239
.await?;
246240

247241
assert_ne!(0, checkpoint.size);
248-
assert_eq!(table.version(), 101);
242+
assert_eq!(table.version(), Some(101));
249243
Ok(())
250244
}
251245
}

lambdas/cdf-to-csv/src/main.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use deltalake::delta_datafusion::DeltaCdfTableProvider;
99
use deltalake::{DeltaOps, DeltaResult};
1010
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
1111
use object_store::ObjectStore;
12-
use object_store::PutPayload;
1312
use object_store::path::Path;
1413
use object_store::prefix::PrefixStore;
1514
use oxbow_lambda_shared::*;
@@ -68,9 +67,9 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> DeltaResult<(), Error
6867
.expect("Failed to create an output object_store"),
6968
);
7069

71-
let table = deltalake::open_table(trigger.location().as_str()).await?;
70+
let table = deltalake::open_table(trigger.location().clone()).await?;
7271
info!(
73-
"Loaded a table for {} at version {}",
72+
"Loaded a table for {} at version {:?}",
7473
trigger.location().as_str(),
7574
table.version()
7675
);
@@ -182,15 +181,20 @@ struct Completion {
182181
mod tests {
183182
use super::*;
184183
use futures::StreamExt;
185-
use object_store::{GetResultPayload, ObjectStore};
184+
use object_store::ObjectStore;
186185

187186
use deltalake::datafusion::{
188187
common::assert_batches_sorted_eq, dataframe::DataFrameWriteOptions,
189188
};
190189
use deltalake::operations::load_cdf::CdfLoadBuilder;
191190

192191
async fn cdf_test_setup() -> DeltaResult<(SessionContext, CdfLoadBuilder)> {
193-
let table = deltalake::open_table("../../tests/data/hive/checkpoint-cdf-table/").await?;
192+
let canonical = std::fs::canonicalize("../../tests/data/hive/checkpoint-cdf-table")
193+
.expect("Failed to canonicalize");
194+
let table = deltalake::open_table(
195+
Url::from_file_path(canonical).expect("Failed to find the Url for the CDF table"),
196+
)
197+
.await?;
194198
let cdf = DeltaOps::from(table).load_cdf().with_starting_version(3);
195199
let ctx = SessionContext::new();
196200
Ok((ctx, cdf))

lambdas/file-loader/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ mod tests {
281281

282282
table.load().await?;
283283

284-
assert_eq!(table.version(), version);
284+
assert_eq!(table.version(), Some(version));
285285

286286
let ctx = SessionContext::new();
287287
ctx.register_table("test", Arc::new(table))?;

lambdas/glue-sync/src/main.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use deltalake::kernel::{DataType, PrimitiveType};
99
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
1010
use regex::Regex;
1111
use tracing::log::*;
12+
use url::Url;
1213

1314
use oxbow_lambda_shared::*;
1415
use std::collections::HashMap;
@@ -84,8 +85,8 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
8485
{
8586
if let Some(table_path) = key_is_delta_table(&key) {
8687
debug!("loading table from {table_path}");
87-
let uri = format!("s3://{bucket}/{table_path}");
88-
let delta_table = deltalake::open_table(&uri).await?;
88+
let uri = Url::parse(&format!("s3://{bucket}/{table_path}"))?;
89+
let delta_table = deltalake::open_table(uri).await?;
8990

9091
if let Some(descriptor) = storage_descriptor_from(&delta_table, &glue_table) {
9192
let parameters = match glue_table.parameters {
@@ -94,7 +95,10 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
9495
let mut updated = original.clone();
9596
updated.insert(
9697
"delta.lastUpdateVersion".into(),
97-
delta_table.version().to_string(),
98+
delta_table
99+
.version()
100+
.expect("Failed to load a table version")
101+
.to_string(),
98102
);
99103
Some(updated)
100104
}
@@ -258,8 +262,9 @@ fn to_glue_type(data_type: &DataType) -> String {
258262
/// Generate the [Column] for Glue [aws_sdk_glue::types::StorageDescriptor] based on the provided
259263
/// [DeltaTable]
260264
fn glue_columns_for(table: &DeltaTable) -> Vec<Column> {
261-
if let Ok(schema) = table.get_schema() {
262-
return schema
265+
if let Ok(snapshot) = table.snapshot() {
266+
return snapshot
267+
.schema()
263268
.fields()
264269
.map(|field| {
265270
Column::builder()
@@ -315,8 +320,9 @@ mod tests {
315320

316321
#[tokio::test]
317322
async fn test_glue_columns() {
318-
let table = "../../tests/data/hive/deltatbl-non-partitioned";
319-
let table = deltalake::open_table(&table)
323+
let table =
324+
std::fs::canonicalize("../../tests/data/hive/deltatbl-non-partitioned").unwrap();
325+
let table = deltalake::open_table(Url::from_file_path(table).unwrap())
320326
.await
321327
.expect("Failed to open table");
322328

lambdas/group-events/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ mod tests {
153153
assert_eq!(4, event.records.len());
154154

155155
let fifos = segmented_by_prefix(&event.records, Some(4)).expect("Failed to segment");
156+
println!("fifos: {fifos:#?}");
156157
assert!(
157158
fifos.keys().len() > 2,
158159
"The segmented test file should have been distributed to more than just two keys"

0 commit comments

Comments
 (0)