Skip to content

Commit cad121f

Browse files
committed
feat: datafusion table provider next
Signed-off-by: Robert Pack <[email protected]>
1 parent e6d9a2a commit cad121f

File tree

11 files changed

+1461
-25
lines changed

11 files changed

+1461
-25
lines changed

crates/core/examples/df_scan.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::sync::Arc;
2+
3+
use arrow_cast::pretty::print_batches;
4+
use datafusion::datasource::TableProvider;
5+
use datafusion::physical_plan::collect_partitioned;
6+
use datafusion::prelude::SessionContext;
7+
use deltalake_core::delta_datafusion::engine::DataFusionEngine;
8+
use deltalake_core::delta_datafusion::next::DeltaTableProvider;
9+
use deltalake_core::kernel::Snapshot;
10+
use deltalake_core::DeltaTableError;
11+
use url::Url;
12+
13+
static CASES: &[&str] = &[
14+
"./dat/v0.0.3/reader_tests/generated/all_primitive_types/delta/", // 0
15+
"./dat/v0.0.3/reader_tests/generated/basic_append/delta/", // 1
16+
"./dat/v0.0.3/reader_tests/generated/basic_partitioned/delta/", // 2
17+
"./dat/v0.0.3/reader_tests/generated/cdf/delta/", // 3
18+
"./dat/v0.0.3/reader_tests/generated/check_constraints/delta/", // 4
19+
"./dat/v0.0.3/reader_tests/generated/column_mapping/delta/", // 5
20+
"./dat/v0.0.3/reader_tests/generated/deletion_vectors/delta/", // 6
21+
"./dat/v0.0.3/reader_tests/generated/generated_columns/delta/", // 7
22+
"./dat/v0.0.3/reader_tests/generated/iceberg_compat_v1/delta/", // 8
23+
"./dat/v0.0.3/reader_tests/generated/multi_partitioned/delta/", // 9
24+
"./dat/v0.0.3/reader_tests/generated/multi_partitioned_2/delta/", // 10
25+
"./dat/v0.0.3/reader_tests/generated/nested_types/delta/", // 11
26+
"./dat/v0.0.3/reader_tests/generated/no_replay/delta/", // 12
27+
"./dat/v0.0.3/reader_tests/generated/no_stats/delta/", // 13
28+
"./dat/v0.0.3/reader_tests/generated/partitioned_with_null/delta/", // 14
29+
"./dat/v0.0.3/reader_tests/generated/stats_as_struct/delta/", // 15
30+
"./dat/v0.0.3/reader_tests/generated/timestamp_ntz/delta/", // 16
31+
"./dat/v0.0.3/reader_tests/generated/with_checkpoint/delta/", // 17
32+
"./dat/v0.0.3/reader_tests/generated/with_schema_change/delta/", // 18
33+
];
34+
35+
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
36+
async fn main() -> Result<(), DeltaTableError> {
37+
let session = Arc::new(SessionContext::new());
38+
let engine = DataFusionEngine::new_from_session(&session.state());
39+
40+
let path = std::fs::canonicalize(CASES[5]).unwrap();
41+
let table_url = Url::from_directory_path(path).unwrap();
42+
let snapshot =
43+
Snapshot::try_new_with_engine(engine.clone(), table_url, Default::default(), None).await?;
44+
let provider = DeltaTableProvider::new(snapshot);
45+
46+
let state = session.state_ref().read().clone();
47+
48+
let plan = provider.scan(&state, None, &[], None).await?;
49+
50+
let batches: Vec<_> = collect_partitioned(plan, session.task_ctx())
51+
.await?
52+
.into_iter()
53+
.flatten()
54+
.collect();
55+
56+
print_batches(&batches).unwrap();
57+
58+
Ok(())
59+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod to_kernel;

0 commit comments

Comments
 (0)