Skip to content

Commit 75a31db

Browse files
authored
refactor: simplify creation of default engine in tests (#1437)
## What changes are proposed in this pull request? Unit tests have accumulated myriad ways of creating a new default engine instance. Harmonize and simplify them. ## Breaking changes `parse_url_opts` was renamed as `store_from_url_opts` `DefaultEngine::try_new` was removed. `DefaultEngine::new` signature changed. ## How was this change tested? Test-only change.
1 parent f4f1851 commit 75a31db

File tree

32 files changed

+202
-258
lines changed

32 files changed

+202
-258
lines changed

acceptance/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tar = "0.4"
3636
[dev-dependencies]
3737
datatest-stable = "0.3"
3838
test-log = { version = "0.2", default-features = false, features = ["trace"] }
39+
test_utils = { path = "../test-utils" }
3940
tempfile = "3"
4041
test-case = { version = "3.3.1" }
4142
tokio = { version = "1.47" }

acceptance/tests/dat_reader.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use std::path::Path;
2-
use std::sync::Arc;
32

43
use acceptance::read_dat_case;
5-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
6-
use delta_kernel::engine::default::DefaultEngine;
74

85
// TODO(zach): skip iceberg_compat_v1 test until DAT is fixed
96
static SKIPPED_TESTS: &[&str; 1] = &["iceberg_compat_v1"];
@@ -27,14 +24,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
2724
.block_on(async {
2825
let case = read_dat_case(root_dir).unwrap();
2926
let table_root = case.table_root().unwrap();
30-
let engine = Arc::new(
31-
DefaultEngine::try_new(
32-
&table_root,
33-
std::iter::empty::<(&str, &str)>(),
34-
Arc::new(TokioBackgroundExecutor::new()),
35-
)
36-
.unwrap(),
37-
);
27+
let engine = test_utils::create_default_engine(&table_root).unwrap();
3828

3929
case.assert_metadata(engine.clone()).await.unwrap();
4030
acceptance::data::assert_scan_metadata(engine.clone(), &case)

acceptance/tests/other.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ async fn test_read_table_with_checkpoint() {
3838
))
3939
.unwrap();
4040
let location = url::Url::from_directory_path(path).unwrap();
41-
let engine = Arc::new(
42-
DefaultEngine::try_new(&location, HashMap::<String, String>::new()).unwrap(),
43-
);
41+
let engine = test_utils::create_default_engine(&location).unwrap();
4442
let snapshot = Snapshot::try_new(location, engine, None)
4543
.await
4644
.unwrap();

ffi/src/domain_metadata.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ mod tests {
9393
recover_string,
9494
};
9595
use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot};
96-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
9796
use delta_kernel::engine::default::DefaultEngine;
9897
use delta_kernel::DeltaResult;
9998
use object_store::memory::InMemory;
@@ -107,7 +106,7 @@ mod tests {
107106
async fn test_domain_metadata() -> DeltaResult<()> {
108107
let storage = Arc::new(InMemory::new());
109108

110-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
109+
let engine = DefaultEngine::new(storage.clone());
111110
let engine = engine_to_handle(Arc::new(engine), allocate_err);
112111
let path = "memory:///";
113112

ffi/src/lib.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -561,13 +561,11 @@ fn get_default_engine_impl(
561561
allocate_error: AllocateErrorFn,
562562
) -> DeltaResult<Handle<SharedExternEngine>> {
563563
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
564+
use delta_kernel::engine::default::storage::store_from_url_opts;
564565
use delta_kernel::engine::default::DefaultEngine;
565-
let engine = DefaultEngine::<TokioBackgroundExecutor>::try_new(
566-
&url,
567-
options,
568-
Arc::new(TokioBackgroundExecutor::new()),
569-
);
570-
Ok(engine_to_handle(Arc::new(engine?), allocate_error))
566+
let store = store_from_url_opts(&url, options)?;
567+
let engine = DefaultEngine::<TokioBackgroundExecutor>::new(store);
568+
Ok(engine_to_handle(Arc::new(engine), allocate_error))
571569
}
572570

573571
/// # Safety
@@ -881,7 +879,7 @@ mod tests {
881879
allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic,
882880
recover_string,
883881
};
884-
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
882+
use delta_kernel::engine::default::DefaultEngine;
885883
use object_store::memory::InMemory;
886884
use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction};
887885

@@ -928,7 +926,7 @@ mod tests {
928926
actions_to_string(vec![TestAction::Metadata]),
929927
)
930928
.await?;
931-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
929+
let engine = DefaultEngine::new(storage.clone());
932930
let engine = engine_to_handle(Arc::new(engine), allocate_err);
933931
let path = "memory:///";
934932

@@ -974,7 +972,7 @@ mod tests {
974972
actions_to_string_partitioned(vec![TestAction::Metadata]),
975973
)
976974
.await?;
977-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
975+
let engine = DefaultEngine::new(storage.clone());
978976
let engine = engine_to_handle(Arc::new(engine), allocate_err);
979977
let path = "memory:///";
980978

@@ -1010,7 +1008,7 @@ mod tests {
10101008
actions_to_string(vec![TestAction::Metadata]),
10111009
)
10121010
.await?;
1013-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
1011+
let engine = DefaultEngine::new(storage.clone());
10141012
let engine = engine_to_handle(Arc::new(engine), allocate_null_err);
10151013
let path = "memory:///";
10161014

@@ -1040,7 +1038,7 @@ mod tests {
10401038
actions_to_string(vec![TestAction::Add("path1".into())]),
10411039
)
10421040
.await?;
1043-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
1041+
let engine = DefaultEngine::new(storage.clone());
10441042
let engine = engine_to_handle(Arc::new(engine), allocate_err);
10451043
let path = "memory:///";
10461044

ffi/src/table_changes.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ mod tests {
346346
use delta_kernel::arrow::record_batch::RecordBatch;
347347
use delta_kernel::arrow::util::pretty::pretty_format_batches;
348348
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
349-
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
349+
use delta_kernel::engine::default::DefaultEngine;
350350
use delta_kernel::schema::{DataType, StructField, StructType};
351351
use delta_kernel::Engine;
352352
use delta_kernel_ffi::engine_data::get_engine_data;
@@ -530,7 +530,7 @@ mod tests {
530530
put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?;
531531

532532
let path = "memory:///";
533-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
533+
let engine = DefaultEngine::new(storage);
534534
let engine = engine_to_handle(Arc::new(engine), allocate_err);
535535

536536
let table_changes = ok_or_panic(unsafe {
@@ -617,7 +617,7 @@ mod tests {
617617
put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?;
618618

619619
let path = "memory:///";
620-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
620+
let engine = DefaultEngine::new(storage);
621621
let engine = engine_to_handle(Arc::new(engine), allocate_err);
622622

623623
let table_changes = ok_or_panic(unsafe {
@@ -673,7 +673,7 @@ mod tests {
673673
put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?;
674674

675675
let path = "memory:///";
676-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
676+
let engine = DefaultEngine::new(storage);
677677
let engine = engine_to_handle(Arc::new(engine), allocate_err);
678678

679679
let table_changes = ok_or_panic(unsafe {
@@ -753,7 +753,7 @@ mod tests {
753753
put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?;
754754

755755
let path = "memory:///";
756-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
756+
let engine = DefaultEngine::new(storage);
757757
let engine = engine_to_handle(Arc::new(engine), allocate_err);
758758

759759
let table_changes = ok_or_panic(unsafe {

kernel/benches/metadata_bench.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
//!
1818
//! Follow-ups: <https://github.com/delta-io/delta-kernel-rs/issues/1185>
1919
20-
use std::collections::HashMap;
2120
use std::sync::Arc;
2221

2322
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
@@ -41,9 +40,9 @@ fn setup() -> (TempDir, Url, Arc<DefaultEngine<TokioBackgroundExecutor>>) {
4140
let table_path = tempdir.path().join(table);
4241
let url = try_parse_uri(table_path.to_str().unwrap()).expect("Failed to parse table path");
4342
// TODO: use multi-threaded executor
44-
let executor = Arc::new(TokioBackgroundExecutor::new());
45-
let engine = DefaultEngine::try_new(&url, HashMap::<String, String>::new(), executor)
46-
.expect("Failed to create engine");
43+
use delta_kernel::engine::default::storage::store_from_url;
44+
let store = store_from_url(&url).expect("Failed to create store");
45+
let engine = DefaultEngine::new(store);
4746

4847
(tempdir, url, Arc::new(engine))
4948
}

kernel/examples/common/src/lib.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ use std::{collections::HashMap, sync::Arc};
44

55
use clap::{Args, CommandFactory, FromArgMatches};
66
use delta_kernel::{
7-
arrow::array::RecordBatch,
8-
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
9-
scan::Scan,
10-
schema::MetadataColumnSpec,
11-
DeltaResult, SnapshotRef,
7+
arrow::array::RecordBatch, engine::default::executor::tokio::TokioBackgroundExecutor,
8+
engine::default::storage::store_from_url_opts, engine::default::DefaultEngine, scan::Scan,
9+
schema::MetadataColumnSpec, DeltaResult, SnapshotRef,
1210
};
1311

1412
use object_store::{
@@ -158,16 +156,13 @@ pub fn get_engine(
158156
)));
159157
}
160158
};
161-
Ok(DefaultEngine::new(
162-
store,
163-
Arc::new(TokioBackgroundExecutor::new()),
164-
))
159+
Ok(DefaultEngine::new(Arc::new(store)))
165160
} else if !args.option.is_empty() {
166161
let opts = args.option.iter().map(|option| {
167162
let parts: Vec<&str> = option.split("=").collect();
168163
(parts[0].to_ascii_lowercase(), parts[1])
169164
});
170-
DefaultEngine::try_new(url, opts, Arc::new(TokioBackgroundExecutor::new()))
165+
Ok(DefaultEngine::new(store_from_url_opts(url, opts)?))
171166
} else {
172167
let mut options = if let Some(ref region) = args.region {
173168
HashMap::from([("region", region.clone())])
@@ -177,7 +172,7 @@ pub fn get_engine(
177172
if args.public {
178173
options.insert("skip_signature", "true".to_string());
179174
}
180-
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
175+
Ok(DefaultEngine::new(store_from_url_opts(url, options)?))
181176
}
182177
}
183178

kernel/examples/write-table/src/main.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,8 @@ async fn try_main() -> DeltaResult<()> {
7676
println!("Using Delta table at: {url}");
7777

7878
// Get the engine for local filesystem
79-
let engine = DefaultEngine::try_new(
80-
&url,
81-
HashMap::<String, String>::new(),
82-
Arc::new(TokioBackgroundExecutor::new()),
83-
)?;
79+
use delta_kernel::engine::default::storage::store_from_url;
80+
let engine = DefaultEngine::new(store_from_url(&url)?);
8481

8582
// Create or get the table
8683
let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?;

kernel/src/checkpoint/tests.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::arrow::{
1212
};
1313
use crate::checkpoint::create_last_checkpoint_data;
1414
use crate::engine::arrow_data::ArrowEngineData;
15-
use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
15+
use crate::engine::default::DefaultEngine;
1616
use crate::log_replay::HasSelectionVector;
1717
use crate::schema::{DataType as KernelDataType, StructField, StructType};
1818
use crate::utils::test_utils::Action;
@@ -59,7 +59,7 @@ fn test_deleted_file_retention_timestamp() -> DeltaResult<()> {
5959
#[test]
6060
fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> {
6161
let (store, _) = new_in_memory_store();
62-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
62+
let engine = DefaultEngine::new(store.clone());
6363

6464
// 1st commit (version 0) - metadata and protocol actions
6565
// Protocol action does not include the v2Checkpoint reader/writer feature.
@@ -115,7 +115,7 @@ fn test_create_last_checkpoint_data() -> DeltaResult<()> {
115115
let add_actions_counter = 75;
116116
let size_in_bytes: i64 = 1024 * 1024; // 1MB
117117
let (store, _) = new_in_memory_store();
118-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
118+
let engine = DefaultEngine::new(store.clone());
119119

120120
// Create last checkpoint metadata
121121
let last_checkpoint_batch = create_last_checkpoint_data(
@@ -276,7 +276,7 @@ fn read_last_checkpoint_file(store: &Arc<InMemory>) -> DeltaResult<Value> {
276276
#[test]
277277
fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
278278
let (store, _) = new_in_memory_store();
279-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
279+
let engine = DefaultEngine::new(store.clone());
280280

281281
// 1st commit: adds `fake_path_1`
282282
write_commit_to_store(&store, vec![create_add_action("fake_path_1")], 0)?;
@@ -346,7 +346,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
346346
#[test]
347347
fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {
348348
let (store, _) = new_in_memory_store();
349-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
349+
let engine = DefaultEngine::new(store.clone());
350350

351351
// 1st commit (version 0) - metadata and protocol actions
352352
// Protocol action does not include the v2Checkpoint reader/writer feature.
@@ -408,7 +408,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {
408408
#[test]
409409
fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> {
410410
let (store, _) = new_in_memory_store();
411-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
411+
let engine = DefaultEngine::new(store.clone());
412412

413413
// 1st commit (version 0) - metadata and protocol actions
414414
write_commit_to_store(
@@ -450,7 +450,7 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR
450450
#[test]
451451
fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
452452
let (store, _) = new_in_memory_store();
453-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
453+
let engine = DefaultEngine::new(store.clone());
454454

455455
// 1st commit: adds `fake_path_2` & removes `fake_path_1`
456456
write_commit_to_store(
@@ -522,7 +522,7 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
522522
#[test]
523523
fn test_no_checkpoint_staged_commits() -> DeltaResult<()> {
524524
let (store, _) = new_in_memory_store();
525-
let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
525+
let engine = DefaultEngine::new(store.clone());
526526

527527
// normal commit
528528
write_commit_to_store(

0 commit comments

Comments
 (0)