Skip to content

Commit d8f7b68

Browse files
committed
Upgrade oxbow to deltalake 0.28
There are some upstream changes to incorporate delta_kernel which need to be accounted for. Signed-off-by: R. Tyler Croy <rtyler@buoyantdata.com>
1 parent 7cccf0b commit d8f7b68

File tree

8 files changed

+97
-75
lines changed

8 files changed

+97
-75
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "3"
88

99
[workspace.package]
10-
version = "1.6.0"
10+
version = "1.6.1"
1111
edition = "2024"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"
@@ -20,10 +20,11 @@ anyhow = "=1"
2020
chrono = "0.4"
2121
aws_lambda_events = { version = "0.15", default-features = false, features = ["sns", "sqs", "s3"] }
2222
# The datafusion feature is required to support invariants which may be in error, but is required as of currently released 0.18.2
23-
deltalake = { version = "0.26.2", features = ["s3", "json", "datafusion"] }
23+
deltalake = { version = "0.28.0", features = ["s3", "json", "datafusion"] }
2424
#deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3", "json", "datafusion"]}
2525
#deltalake = { path = "../../delta-io/delta-rs/crates/deltalake", features = ["s3", "json", "datafusion"]}
2626
futures = { version = "0.3" }
27+
mimalloc = { version = "0.1", features = ["v3"] }
2728
tokio = { version = "=1", features = ["macros"] }
2829
regex = "=1"
2930
serde = "1"
@@ -44,4 +45,3 @@ opt-level = "z"
4445
[profile.dist]
4546
inherits = "release"
4647
lto = "thin"
47-

crates/oxbow/src/lib.rs

Lines changed: 60 additions & 52 deletions
Large diffs are not rendered by default.

crates/oxbow/src/write.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use deltalake::arrow::array::RecordBatch;
33
use deltalake::arrow::datatypes::Schema as ArrowSchema;
44
use deltalake::arrow::error::ArrowError;
55
use deltalake::arrow::json::reader::ReaderBuilder;
6+
use deltalake::kernel::engine::arrow_conversion::TryIntoArrow;
7+
use deltalake::table::config::TablePropertiesExt;
68
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
79
use deltalake::{DeltaResult, DeltaTable};
810

@@ -47,7 +49,7 @@ pub async fn append_values(
4749
) -> DeltaResult<DeltaTable> {
4850
let schema = table.get_schema()?;
4951
debug!("Attempting to append values with schema: {schema:?}");
50-
let schema = ArrowSchema::try_from(schema)?;
52+
let schema: ArrowSchema = schema.try_into_arrow()?;
5153

5254
let mut writer = RecordBatchWriter::for_table(&table)?;
5355
let mut written = false;
@@ -129,6 +131,7 @@ mod tests {
129131
use super::*;
130132
use deltalake::kernel::DataType;
131133
use deltalake::*;
134+
use std::num::NonZero;
132135

133136
async fn setup_test_table() -> DeltaResult<DeltaTable> {
134137
DeltaOps::try_from_uri("memory://")
@@ -157,12 +160,12 @@ mod tests {
157160

158161
let buf = File::open("../../tests/data/senators.jsonl")?;
159162
let reader = BufReader::new(buf);
160-
let schema = ArrowSchema::try_from(table.get_schema()?)?;
163+
let schema: ArrowSchema = table.snapshot()?.schema().try_into_arrow()?;
161164

162165
let json = deltalake::arrow::json::ReaderBuilder::new(schema.into()).build(reader)?;
163166

164167
let table = append_batches(table, json).await?;
165-
assert_eq!(table.version(), 1);
168+
assert_eq!(table.version(), Some(1));
166169
Ok(())
167170
}
168171

@@ -178,7 +181,7 @@ mod tests {
178181
.await
179182
.expect("Failed to do nothing");
180183

181-
assert_eq!(table.version(), 1);
184+
assert_eq!(table.version(), Some(1));
182185
Ok(())
183186
}
184187

@@ -191,8 +194,8 @@ mod tests {
191194
"#;
192195

193196
let cursor = Cursor::new(jsonl);
194-
let schema = table.get_schema()?;
195-
let schema = ArrowSchema::try_from(schema)?;
197+
let schema = table.snapshot()?.schema();
198+
let schema: ArrowSchema = schema.try_into_arrow()?;
196199
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
197200

198201
while let Some(Ok(batch)) = reader.next() {
@@ -215,7 +218,7 @@ mod tests {
215218
.await
216219
.expect("Failed to do nothing");
217220

218-
assert_eq!(table.version(), 1);
221+
assert_eq!(table.version(), Some(1));
219222
Ok(())
220223
}
221224

@@ -235,7 +238,10 @@ mod tests {
235238

236239
if let Some(state) = table.state.as_ref() {
237240
// The default is expected to be 100
238-
assert_eq!(100, state.table_config().checkpoint_interval());
241+
assert_eq!(
242+
NonZero::new(100).unwrap(),
243+
state.table_config().checkpoint_interval()
244+
);
239245
}
240246

241247
use deltalake::Path;
@@ -245,7 +251,7 @@ mod tests {
245251
.await?;
246252

247253
assert_ne!(0, checkpoint.size);
248-
assert_eq!(table.version(), 101);
254+
assert_eq!(table.version(), Some(101));
249255
Ok(())
250256
}
251257
}

lambdas/cdf-to-csv/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ homepage.workspace = true
99
aws_lambda_events = { workspace = true, default-features = false, features = ["sqs"] }
1010
chrono = { workspace = true }
1111
lambda_runtime = "0.14.2"
12-
deltalake = { workspace = true, features = ["s3", "json", "datafusion"] }
1312
object_store = { version = "0.12.1", features = ["cloud"]}
13+
deltalake = { workspace = true }
1414
serde = { workspace = true }
1515
serde_json = { workspace = true }
1616
tokio = { workspace = true }

lambdas/cdf-to-csv/src/main.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use aws_lambda_events::event::sqs::SqsEvent;
66
use deltalake::datafusion::dataframe::DataFrameWriteOptions;
77
use deltalake::datafusion::prelude::*;
88
use deltalake::delta_datafusion::DeltaCdfTableProvider;
9+
use deltalake::logstore::object_store::ObjectStore;
10+
use deltalake::logstore::object_store::aws::AmazonS3Builder;
11+
use deltalake::logstore::object_store::path::Path;
12+
use deltalake::logstore::object_store::prefix::PrefixStore;
913
use deltalake::{DeltaOps, DeltaResult};
1014
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
11-
use object_store::ObjectStore;
12-
use object_store::PutPayload;
13-
use object_store::path::Path;
14-
use object_store::prefix::PrefixStore;
1515
use oxbow_lambda_shared::*;
1616
use serde::{Deserialize, Serialize};
1717
use std::sync::Arc;
@@ -62,15 +62,15 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> DeltaResult<(), Error
6262
// the context is unique to a triggered table
6363
let ctx = SessionContext::new();
6464
let store: Arc<dyn ObjectStore> = Arc::new(
65-
object_store::aws::AmazonS3Builder::from_env()
65+
AmazonS3Builder::from_env()
6666
.with_url(&destination)
6767
.build()
6868
.expect("Failed to create an output object_store"),
6969
);
7070

7171
let table = deltalake::open_table(trigger.location().as_str()).await?;
7272
info!(
73-
"Loaded a table for {} at version {}",
73+
"Loaded a table for {} at version {:?}",
7474
trigger.location().as_str(),
7575
table.version()
7676
);
@@ -181,8 +181,9 @@ struct Completion {
181181
#[cfg(test)]
182182
mod tests {
183183
use super::*;
184+
use deltalake::logstore::object_store::memory::InMemory;
185+
use deltalake::logstore::object_store::path::Path;
184186
use futures::StreamExt;
185-
use object_store::{GetResultPayload, ObjectStore};
186187

187188
use deltalake::datafusion::{
188189
common::assert_batches_sorted_eq, dataframe::DataFrameWriteOptions,
@@ -268,7 +269,7 @@ mod tests {
268269
#[tokio::test]
269270
async fn test_write_csv() -> DeltaResult<()> {
270271
let (ctx, cdf) = cdf_test_setup().await?;
271-
let store: Arc<dyn ObjectStore> = Arc::new(object_store::memory::InMemory::new());
272+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
272273
let insert_store = Arc::new(PrefixStore::new(store.clone(), "inserts"));
273274
let delete_store = Arc::new(PrefixStore::new(store.clone(), "deletes"));
274275
ctx.register_object_store(&Url::parse("cdfo://inserts").unwrap(), insert_store.clone());

lambdas/file-loader/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ oxbow-sqs = { path = "../../crates/oxbow-sqs" }
1414
anyhow = { workspace = true }
1515
aws_lambda_events = { workspace = true, default-features = false, features = ["sqs"] }
1616
deltalake = { workspace = true }
17+
mimalloc = { workspace = true }
1718
tokio = { workspace = true }
1819
aws-config = "1.5.10"
1920
aws-sdk-s3 = "1.60.0"

lambdas/file-loader/src/main.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@ use aws_lambda_events::sqs::SqsMessage;
88
use deltalake::DeltaResult;
99
use deltalake::arrow::datatypes::Schema as ArrowSchema;
1010
use deltalake::arrow::json::reader::ReaderBuilder;
11+
use deltalake::kernel::engine::arrow_conversion::TryIntoArrow;
1112
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
1213
use lambda_runtime::tracing::{debug, error, info, trace};
1314
use lambda_runtime::{Error, LambdaEvent, run, service_fn, tracing};
1415

1516
use oxbow_lambda_shared::*;
1617
use oxbow_sqs::{ConsumerConfig, TimedConsumer};
1718

19+
use mimalloc::MiMalloc;
20+
21+
#[global_allocator]
22+
static GLOBAL: MiMalloc = MiMalloc;
23+
1824
use std::env;
1925
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2026

@@ -124,7 +130,7 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
124130
.send()
125131
.await?;
126132
info!("Attempting to read bytes from {file_record:?}");
127-
let schema = ArrowSchema::try_from(table.get_schema()?)?;
133+
let schema: ArrowSchema = table.snapshot()?.schema().try_into_arrow()?;
128134

129135
let mut json = ReaderBuilder::new(schema.into()).build_decoder()?;
130136
while let Some(bytes) = response.body.try_next().await? {

lambdas/glue-sync/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
9494
let mut updated = original.clone();
9595
updated.insert(
9696
"delta.lastUpdateVersion".into(),
97-
delta_table.version().to_string(),
97+
delta_table.version().unwrap().to_string(),
9898
);
9999
Some(updated)
100100
}

0 commit comments

Comments
 (0)