Skip to content

Commit f05fc48

Browse files
committed
feat: datafusion table provider next
Signed-off-by: Robert Pack <[email protected]>
1 parent cb672ac commit f05fc48

File tree

16 files changed

+2678
-30
lines changed

16 files changed

+2678
-30
lines changed

crates/core/examples/df_scan.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::sync::Arc;
2+
3+
use arrow_cast::pretty::print_batches;
4+
use datafusion::datasource::TableProvider;
5+
use datafusion::physical_plan::collect_partitioned;
6+
use datafusion::prelude::SessionContext;
7+
use deltalake_core::delta_datafusion::engine::DataFusionEngine;
8+
use deltalake_core::kernel::Snapshot;
9+
use deltalake_core::DeltaTableError;
10+
use url::Url;
11+
12+
static CASES: &[&str] = &[
13+
"./dat/v0.0.3/reader_tests/generated/all_primitive_types/delta/", // 0
14+
"./dat/v0.0.3/reader_tests/generated/basic_append/delta/", // 1
15+
"./dat/v0.0.3/reader_tests/generated/basic_partitioned/delta/", // 2
16+
"./dat/v0.0.3/reader_tests/generated/cdf/delta/", // 3
17+
"./dat/v0.0.3/reader_tests/generated/check_constraints/delta/", // 4
18+
"./dat/v0.0.3/reader_tests/generated/column_mapping/delta/", // 5
19+
"./dat/v0.0.3/reader_tests/generated/deletion_vectors/delta/", // 6
20+
"./dat/v0.0.3/reader_tests/generated/generated_columns/delta/", // 7
21+
"./dat/v0.0.3/reader_tests/generated/iceberg_compat_v1/delta/", // 8
22+
"./dat/v0.0.3/reader_tests/generated/multi_partitioned/delta/", // 9
23+
"./dat/v0.0.3/reader_tests/generated/multi_partitioned_2/delta/", // 10
24+
"./dat/v0.0.3/reader_tests/generated/nested_types/delta/", // 11
25+
"./dat/v0.0.3/reader_tests/generated/no_replay/delta/", // 12
26+
"./dat/v0.0.3/reader_tests/generated/no_stats/delta/", // 13
27+
"./dat/v0.0.3/reader_tests/generated/partitioned_with_null/delta/", // 14
28+
"./dat/v0.0.3/reader_tests/generated/stats_as_struct/delta/", // 15
29+
"./dat/v0.0.3/reader_tests/generated/timestamp_ntz/delta/", // 16
30+
"./dat/v0.0.3/reader_tests/generated/with_checkpoint/delta/", // 17
31+
"./dat/v0.0.3/reader_tests/generated/with_schema_change/delta/", // 18
32+
];
33+
34+
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
35+
async fn main() -> Result<(), DeltaTableError> {
36+
let session = Arc::new(SessionContext::new());
37+
let engine = DataFusionEngine::new_from_session(&session.state());
38+
39+
let path = std::fs::canonicalize(CASES[5]).unwrap();
40+
let table_url = Url::from_directory_path(path).unwrap();
41+
let snapshot =
42+
Snapshot::try_new_with_engine(engine.clone(), table_url, Default::default(), None).await?;
43+
44+
let state = session.state_ref().read().clone();
45+
46+
let plan = snapshot.scan(&state, None, &[], None).await?;
47+
48+
let batches: Vec<_> = collect_partitioned(plan, session.task_ctx())
49+
.await?
50+
.into_iter()
51+
.flatten()
52+
.collect();
53+
54+
print_batches(&batches).unwrap();
55+
56+
Ok(())
57+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub(crate) use self::to_df::*;
2+
pub(crate) use self::to_kernel::*;
3+
4+
mod to_df;
5+
mod to_kernel;

0 commit comments

Comments
 (0)