|
| 1 | +use std::sync::Arc; |
| 2 | + |
| 3 | +use arrow_cast::pretty::print_batches; |
| 4 | +use datafusion::datasource::TableProvider; |
| 5 | +use datafusion::physical_plan::collect_partitioned; |
| 6 | +use datafusion::prelude::SessionContext; |
| 7 | +use deltalake_core::delta_datafusion::engine::DataFusionEngine; |
| 8 | +use deltalake_core::kernel::Snapshot; |
| 9 | +use deltalake_core::DeltaTableError; |
| 10 | +use url::Url; |
| 11 | + |
| 12 | +static CASES: &[&str] = &[ |
| 13 | + "./dat/v0.0.3/reader_tests/generated/all_primitive_types/delta/", // 0 |
| 14 | + "./dat/v0.0.3/reader_tests/generated/basic_append/delta/", // 1 |
| 15 | + "./dat/v0.0.3/reader_tests/generated/basic_partitioned/delta/", // 2 |
| 16 | + "./dat/v0.0.3/reader_tests/generated/cdf/delta/", // 3 |
| 17 | + "./dat/v0.0.3/reader_tests/generated/check_constraints/delta/", // 4 |
| 18 | + "./dat/v0.0.3/reader_tests/generated/column_mapping/delta/", // 5 |
| 19 | + "./dat/v0.0.3/reader_tests/generated/deletion_vectors/delta/", // 6 |
| 20 | + "./dat/v0.0.3/reader_tests/generated/generated_columns/delta/", // 7 |
| 21 | + "./dat/v0.0.3/reader_tests/generated/iceberg_compat_v1/delta/", // 8 |
| 22 | + "./dat/v0.0.3/reader_tests/generated/multi_partitioned/delta/", // 9 |
| 23 | + "./dat/v0.0.3/reader_tests/generated/multi_partitioned_2/delta/", // 10 |
| 24 | + "./dat/v0.0.3/reader_tests/generated/nested_types/delta/", // 11 |
| 25 | + "./dat/v0.0.3/reader_tests/generated/no_replay/delta/", // 12 |
| 26 | + "./dat/v0.0.3/reader_tests/generated/no_stats/delta/", // 13 |
| 27 | + "./dat/v0.0.3/reader_tests/generated/partitioned_with_null/delta/", // 14 |
| 28 | + "./dat/v0.0.3/reader_tests/generated/stats_as_struct/delta/", // 15 |
| 29 | + "./dat/v0.0.3/reader_tests/generated/timestamp_ntz/delta/", // 16 |
| 30 | + "./dat/v0.0.3/reader_tests/generated/with_checkpoint/delta/", // 17 |
| 31 | + "./dat/v0.0.3/reader_tests/generated/with_schema_change/delta/", // 18 |
| 32 | +]; |
| 33 | + |
| 34 | +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] |
| 35 | +async fn main() -> Result<(), DeltaTableError> { |
| 36 | + let session = Arc::new(SessionContext::new()); |
| 37 | + let engine = DataFusionEngine::new_from_session(&session.state()); |
| 38 | + |
| 39 | + let path = std::fs::canonicalize(CASES[5]).unwrap(); |
| 40 | + let table_url = Url::from_directory_path(path).unwrap(); |
| 41 | + let snapshot = |
| 42 | + Snapshot::try_new_with_engine(engine.clone(), table_url, Default::default(), None).await?; |
| 43 | + |
| 44 | + let state = session.state_ref().read().clone(); |
| 45 | + |
| 46 | + let plan = snapshot.scan(&state, None, &[], None).await?; |
| 47 | + |
| 48 | + let batches: Vec<_> = collect_partitioned(plan, session.task_ctx()) |
| 49 | + .await? |
| 50 | + .into_iter() |
| 51 | + .flatten() |
| 52 | + .collect(); |
| 53 | + |
| 54 | + print_batches(&batches).unwrap(); |
| 55 | + |
| 56 | + Ok(()) |
| 57 | +} |
0 commit comments