Skip to content

Commit 2363be8

Browse files
committed
Revert "Upgrade oxbow to deltalake 0.28"
This reverts commit d8f7b68. The 0.27 and 0.28 versions of delta-rs incorporate kernel-based log reply and we're seeing pretty dramatic performance degredation
1 parent cc4c1f7 commit 2363be8

File tree

7 files changed

+82
-101
lines changed

7 files changed

+82
-101
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ anyhow = "=1"
2020
chrono = "0.4"
2121
aws_lambda_events = { version = "0.15", default-features = false, features = ["sns", "sqs", "s3"] }
2222
# The datafusion feature is required to support invariants which may be in error, but is required as of currently released 0.18.2
23-
deltalake = { version = "0.28.0", features = ["s3", "json", "datafusion"] }
23+
deltalake = { version = "0.26.2", features = ["s3", "json", "datafusion"] }
2424
#deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3", "json", "datafusion"]}
2525
#deltalake = { path = "../../delta-io/delta-rs/crates/deltalake", features = ["s3", "json", "datafusion"]}
2626
futures = { version = "0.3" }
27-
mimalloc = { version = "0.1", features = ["v3"] }
2827
tokio = { version = "=1", features = ["macros"] }
2928
regex = "=1"
3029
serde = "1"
@@ -45,3 +44,4 @@ opt-level = "z"
4544
[profile.dist]
4645
inherits = "release"
4746
lto = "thin"
47+

crates/oxbow/src/lib.rs

Lines changed: 52 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1+
use deltalake::ObjectStore;
12
///
23
/// The lib module contains the business logic of oxbow, regardless of the interface implementation
34
///
45
use deltalake::arrow::datatypes::Schema as ArrowSchema;
5-
use deltalake::kernel::engine::arrow_conversion::TryFromArrow;
6+
use deltalake::kernel::models::{Schema, StructField};
67
use deltalake::kernel::*;
7-
use deltalake::logstore::{LogStoreRef, ObjectStoreRef, StorageConfig, logstore_for};
8+
use deltalake::logstore::ObjectStoreRef;
9+
use deltalake::logstore::{LogStoreRef, logstore_for};
810
use deltalake::operations::create::CreateBuilder;
911
use deltalake::parquet::arrow::async_reader::{
1012
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
1113
};
1214
use deltalake::parquet::file::metadata::ParquetMetaData;
1315
use deltalake::protocol::*;
14-
use deltalake::table::config::TablePropertiesExt;
15-
use deltalake::{DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, ObjectStore};
16-
use deltalake::{Schema, StructField};
16+
use deltalake::{DeltaResult, DeltaTable, DeltaTableError, ObjectMeta};
1717
use futures::StreamExt;
1818
use tracing::log::*;
1919
use url::Url;
@@ -79,8 +79,6 @@ pub async fn convert(
7979
None => deltalake::open_table(&location).await,
8080
};
8181

82-
let storage_options = StorageConfig::parse_options(storage_options.unwrap_or_default())?;
83-
8482
match table_result {
8583
Err(e) => {
8684
info!("No Delta table at {}: {:?}", location, e);
@@ -97,7 +95,7 @@ pub async fn convert(
9795
.expect("Failed to parse the location as a file path")
9896
}
9997
};
100-
let store = logstore_for(location, storage_options)?;
98+
let store = logstore_for(location, storage_options.unwrap_or_default(), None)?;
10199
let files = discover_parquet_files(store.object_store(None).clone()).await?;
102100
debug!(
103101
"Files identified for turning into a delta table: {:?}",
@@ -190,7 +188,7 @@ pub async fn create_table_with(
190188

191189
let arrow_schema = ArrowSchema::new_with_metadata(conversions, arrow_schema.metadata.clone());
192190

193-
let schema = Schema::try_from_arrow(&arrow_schema)
191+
let schema = Schema::try_from(&arrow_schema)
194192
.expect("Failed to convert the schema for creating the table");
195193

196194
let mut columns: Vec<StructField> = schema.fields().cloned().collect();
@@ -222,7 +220,7 @@ pub async fn create_table_with(
222220
pub async fn commit_to_table(actions: &[Action], table: &DeltaTable) -> DeltaResult<i64> {
223221
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties};
224222
if actions.is_empty() {
225-
return Ok(table.version().unwrap());
223+
return Ok(table.version());
226224
}
227225
let commit = CommitProperties::default();
228226
let pre_commit = CommitBuilder::from(commit)
@@ -242,7 +240,7 @@ pub async fn actions_for(
242240
table: &DeltaTable,
243241
evolve_schema: bool,
244242
) -> DeltaResult<Vec<Action>> {
245-
let existing_files: Vec<deltalake::Path> = table.snapshot()?.file_paths_iter().collect();
243+
let existing_files: Vec<deltalake::Path> = table.get_files_iter()?.collect();
246244
let new_files: Vec<ObjectMeta> = mods
247245
.adds()
248246
.into_iter()
@@ -281,7 +279,7 @@ async fn metadata_actions_for(
281279
) -> DeltaResult<Vec<Action>> {
282280
if let Some(last_file) = files.last() {
283281
debug!("Attempting to evolve the schema for {table:?}");
284-
let table_schema = table.snapshot()?.schema();
282+
let table_schema = table.get_schema()?;
285283
// Cloning here to take an owned version of [DeltaTableMetaData] for later modification
286284
let table_metadata = table.metadata()?;
287285

@@ -298,24 +296,24 @@ async fn metadata_actions_for(
298296
new_schema.push(StructField::new(
299297
name.to_string(),
300298
// These types can have timestmaps in them, so coerce them properly
301-
deltalake::kernel::schema::DataType::try_from_arrow(coerced.data_type())?,
299+
deltalake::kernel::DataType::try_from(coerced.data_type())?,
302300
true,
303301
));
304302
}
305303
}
306304

307305
if new_schema.len() > table_schema.fields.len() {
308306
let new_schema = Schema::new(new_schema);
309-
let mut action = deltalake::kernel::new_metadata(
310-
&new_schema,
311-
table_metadata.partition_columns().clone(),
312-
table_metadata.configuration().clone(),
307+
let mut action = deltalake::kernel::Metadata::try_new(
308+
new_schema,
309+
table_metadata.partition_columns.clone(),
310+
table_metadata.configuration.clone(),
313311
)?;
314-
if let Some(name) = &table_metadata.name() {
315-
action = action.with_name(name.to_string())?;
312+
if let Some(name) = &table_metadata.name {
313+
action = action.with_name(name);
316314
}
317-
if let Some(description) = &table_metadata.description() {
318-
action = action.with_description(description.to_string())?;
315+
if let Some(description) = &table_metadata.description {
316+
action = action.with_description(description);
319317
}
320318
return Ok(vec![Action::Metadata(action)]);
321319
}
@@ -500,7 +498,6 @@ mod tests {
500498

501499
use chrono::prelude::Utc;
502500
use deltalake::Path;
503-
use std::num::NonZero;
504501

505502
/*
506503
* test utilities to share between test cases
@@ -594,7 +591,8 @@ mod tests {
594591
let url = Url::from_file_path(dir.path()).expect("Failed to parse local path");
595592
(
596593
dir,
597-
logstore_for(url, StorageConfig::default()).expect("Failed to get store"),
594+
logstore_for(url, HashMap::<String, String>::default(), None)
595+
.expect("Failed to get store"),
598596
)
599597
}
600598
}
@@ -603,7 +601,8 @@ mod tests {
603601
async fn discover_parquet_files_empty_dir() {
604602
let dir = tempfile::tempdir().expect("Failed to create a temporary directory");
605603
let url = Url::from_file_path(dir.path()).expect("Failed to parse local path");
606-
let store = logstore_for(url, StorageConfig::default()).expect("Failed to get store");
604+
let store = logstore_for(url, HashMap::<String, String>::default(), None)
605+
.expect("Failed to get store");
607606

608607
let files = discover_parquet_files(store.object_store(None).clone())
609608
.await
@@ -616,7 +615,8 @@ mod tests {
616615
let path = std::fs::canonicalize("../../tests/data/hive/deltatbl-non-partitioned")
617616
.expect("Failed to canonicalize");
618617
let url = Url::from_file_path(path).expect("Failed to parse local path");
619-
let store = logstore_for(url, StorageConfig::default()).expect("Failed to get store");
618+
let store = logstore_for(url, HashMap::<String, String>::default(), None)
619+
.expect("Failed to get store");
620620

621621
let files = discover_parquet_files(store.object_store(None).clone())
622622
.await
@@ -773,7 +773,7 @@ mod tests {
773773
* See <https://github.com/buoyant-data/oxbow/issues/2>
774774
*/
775775
#[tokio::test]
776-
async fn create_schema_for_partitioned_path() -> DeltaResult<()> {
776+
async fn create_schema_for_partitioned_path() {
777777
let (_tempdir, store) =
778778
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
779779
let files = discover_parquet_files(store.object_store(None).clone())
@@ -791,12 +791,11 @@ mod tests {
791791
let table = create_table_with(&files, store.clone())
792792
.await
793793
.expect("Failed to create table");
794-
let schema = table.snapshot()?.schema();
794+
let schema = table.get_schema().expect("Failed to get schema");
795795
assert!(
796796
schema.index_of("c2").is_some(),
797797
"The schema does not include the expected partition key `c2`"
798798
);
799-
Ok(())
800799
}
801800

802801
/*
@@ -841,7 +840,8 @@ mod tests {
841840
let files: Vec<ObjectMeta> = vec![];
842841
let store = logstore_for(
843842
Url::parse("s3://example/non-existent").unwrap(),
844-
StorageConfig::default(),
843+
HashMap::<String, String>::default(),
844+
None,
845845
)
846846
.expect("Failed to get store");
847847
let result = create_table_with(&files, store).await;
@@ -861,7 +861,8 @@ mod tests {
861861
std::fs::canonicalize("../../tests/data/hive/deltatbl-non-partitioned-with-checkpoint")
862862
.expect("Failed to canonicalize");
863863
let url = Url::from_file_path(test_dir).expect("Failed to parse local path");
864-
let store = logstore_for(url, StorageConfig::default()).expect("Failed to get store");
864+
let store = logstore_for(url, HashMap::<String, String>::default(), None)
865+
.expect("Failed to get store");
865866

866867
let files = discover_parquet_files(store.object_store(None).clone())
867868
.await
@@ -875,7 +876,7 @@ mod tests {
875876
* will automatically insert the hive-style partition information into the parquet schema
876877
*/
877878
#[tokio::test]
878-
async fn test_avoid_duplicate_partition_columns() -> DeltaResult<()> {
879+
async fn test_avoid_duplicate_partition_columns() {
879880
let (_tempdir, store) = util::create_temp_path_with("../../tests/data/hive/gcs-export");
880881

881882
let files = discover_parquet_files(store.object_store(None).clone())
@@ -886,7 +887,7 @@ mod tests {
886887
let table = create_table_with(&files, store.clone())
887888
.await
888889
.expect("Failed to create table");
889-
let schema = table.snapshot()?.schema();
890+
let schema = table.get_schema().expect("Failed to get schema");
890891
let fields: Vec<&str> = schema.fields().map(|f| f.name.as_ref()).collect();
891892

892893
let mut uniq = HashSet::new();
@@ -898,7 +899,6 @@ mod tests {
898899
fields.len(),
899900
"There were not unique fields, that probably means a `ds` column is doubled up"
900901
);
901-
Ok(())
902902
}
903903

904904
/// This test is mostly to validate an approach for reading and loading schema
@@ -912,7 +912,8 @@ mod tests {
912912
"../../tests/data/hive/deltatbl-partitioned",
913913
)?)
914914
.expect("Failed to parse");
915-
let storage = logstore_for(url, StorageConfig::default()).expect("Failed to get store");
915+
let storage = logstore_for(url, HashMap::<String, String>::default(), None)
916+
.expect("Failed to get store");
916917
let meta = storage.object_store(None).head(&location).await.unwrap();
917918

918919
let schema = fetch_parquet_schema(storage.object_store(None).clone(), meta)
@@ -957,9 +958,9 @@ mod tests {
957958
}
958959

959960
#[tokio::test]
960-
async fn test_actions_for_with_redundant_files() -> DeltaResult<()> {
961+
async fn test_actions_for_with_redundant_files() {
961962
let (_tempdir, table) = util::test_table().await;
962-
let files = util::paths_to_objectmetas(table.snapshot()?.file_paths_iter());
963+
let files = util::paths_to_objectmetas(table.get_files_iter().unwrap());
963964
let mods = TableMods::new(&files, &vec![]);
964965

965966
let actions = actions_for(&mods, &table, false)
@@ -970,13 +971,12 @@ mod tests {
970971
0,
971972
"Expected no add actions for redundant files"
972973
);
973-
Ok(())
974974
}
975975

976976
#[tokio::test]
977977
async fn test_actions_for_with_removes() -> DeltaResult<()> {
978978
let (_tempdir, table) = util::test_table().await;
979-
let files = util::paths_to_objectmetas(table.snapshot()?.file_paths_iter());
979+
let files = util::paths_to_objectmetas(table.get_files_iter()?);
980980
let mods = TableMods::new(&vec![], &files);
981981

982982
let actions = actions_for(&mods, &table, false)
@@ -1032,7 +1032,7 @@ mod tests {
10321032
#[tokio::test]
10331033
async fn test_actions_for_with_redundant_removes() -> DeltaResult<()> {
10341034
let (_tempdir, table) = util::test_table().await;
1035-
let files = util::paths_to_objectmetas(table.snapshot()?.file_paths_iter());
1035+
let files = util::paths_to_objectmetas(table.get_files_iter()?);
10361036
let mut redundant_removes = files.clone();
10371037
redundant_removes.append(&mut files.clone());
10381038
let mods = TableMods::new(&vec![], &redundant_removes);
@@ -1052,7 +1052,7 @@ mod tests {
10521052
#[tokio::test]
10531053
async fn test_tablemods_uniqueness() -> DeltaResult<()> {
10541054
let (_tempdir, table) = util::test_table().await;
1055-
let files = util::paths_to_objectmetas(table.snapshot()?.file_paths_iter());
1055+
let files = util::paths_to_objectmetas(table.get_files_iter()?);
10561056
let mut redundant_removes = files.clone();
10571057
redundant_removes.append(&mut files.clone());
10581058
let mods = TableMods::new(&vec![], &redundant_removes);
@@ -1071,7 +1071,7 @@ mod tests {
10711071
#[tokio::test]
10721072
async fn test_commit_with_no_actions() {
10731073
let (_tempdir, table) = util::test_table().await;
1074-
let initial_version = table.version().unwrap();
1074+
let initial_version = table.version();
10751075
let result = commit_to_table(&[], &table).await;
10761076
assert!(result.is_ok());
10771077
assert_eq!(result.unwrap(), initial_version);
@@ -1090,7 +1090,7 @@ mod tests {
10901090
let mut table = create_table_with(&[files[0].clone()], store.clone())
10911091
.await
10921092
.expect("Failed to create table");
1093-
let initial_version = table.version().unwrap();
1093+
let initial_version = table.version();
10941094
assert_eq!(0, initial_version);
10951095

10961096
let mods = TableMods::new(&files, &[files[0].clone()]);
@@ -1102,14 +1102,11 @@ mod tests {
11021102
let _ = commit_to_table(&actions, &table).await?;
11031103
table.load().await?;
11041104
}
1105-
assert_eq!(table.version(), Some(101));
1105+
assert_eq!(table.version(), 101);
11061106

11071107
if let Some(state) = table.state.as_ref() {
11081108
// The default is expected to be 100
1109-
assert_eq!(
1110-
NonZero::new(100).unwrap(),
1111-
state.table_config().checkpoint_interval()
1112-
);
1109+
assert_eq!(100, state.table_config().checkpoint_interval());
11131110
}
11141111

11151112
use deltalake::Path;
@@ -1125,9 +1122,9 @@ mod tests {
11251122
#[tokio::test]
11261123
async fn test_commit_with_remove_actions() -> DeltaResult<()> {
11271124
let (_tempdir, table) = util::test_table().await;
1128-
let initial_version = table.version().unwrap();
1125+
let initial_version = table.version();
11291126

1130-
let files = util::paths_to_objectmetas(table.snapshot()?.file_paths_iter());
1127+
let files = util::paths_to_objectmetas(table.get_files_iter()?);
11311128
let mods = TableMods::new(&[], &files);
11321129
let actions = actions_for(&mods, &table, false).await?;
11331130
assert_eq!(
@@ -1146,7 +1143,7 @@ mod tests {
11461143
}
11471144

11481145
#[tokio::test]
1149-
async fn test_commit_with_all_actions() -> DeltaResult<()> {
1146+
async fn test_commit_with_all_actions() {
11501147
let (_tempdir, store) =
11511148
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
11521149
let files = discover_parquet_files(store.object_store(None).clone())
@@ -1158,7 +1155,7 @@ mod tests {
11581155
let mut table = create_table_with(&[files[0].clone()], store.clone())
11591156
.await
11601157
.expect("Failed to create table");
1161-
let initial_version = table.version().unwrap();
1158+
let initial_version = table.version();
11621159
assert_eq!(0, initial_version);
11631160

11641161
let mods = TableMods::new(&files, &vec![files[0].clone()]);
@@ -1179,15 +1176,10 @@ mod tests {
11791176
);
11801177
table.load().await.expect("Failed to reload table");
11811178
assert_eq!(
1182-
table
1183-
.snapshot()?
1184-
.file_paths_iter()
1185-
.collect::<Vec<_>>()
1186-
.len(),
1179+
table.get_files_iter().unwrap().collect::<Vec<_>>().len(),
11871180
3,
11881181
"Expected to only find three files on the table at this state"
11891182
);
1190-
Ok(())
11911183
}
11921184

11931185
#[tokio::test]
@@ -1204,7 +1196,7 @@ mod tests {
12041196
.await
12051197
.expect("Failed to create a test table");
12061198

1207-
let initial_version = table.version().unwrap();
1199+
let initial_version = table.version();
12081200
assert_eq!(initial_version, 0);
12091201

12101202
let adds = discover_parquet_files(store.object_store(None).clone())
@@ -1233,8 +1225,8 @@ mod tests {
12331225
// The store that comes back is not properly prefixed to the delta table that this test
12341226
// needs to work with
12351227
let table_url = Url::from_file_path(&table_path).expect("Failed to parse local path");
1236-
let store =
1237-
logstore_for(table_url, StorageConfig::default()).expect("Failed to get object store");
1228+
let store = logstore_for(table_url, HashMap::<String, String>::default(), None)
1229+
.expect("Failed to get object store");
12381230

12391231
let files = discover_parquet_files(store.object_store(None).clone())
12401232
.await

0 commit comments

Comments
 (0)