Skip to content

Commit 028db69

Browse files
committed
refactor: move LazyTableProvider into python crate
Signed-off-by: Robert Pack <[email protected]>
1 parent caaf56a commit 028db69

File tree

6 files changed

+381
-116
lines changed

6 files changed

+381
-116
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,10 @@ use datafusion_expr::{
6767
col, BinaryExpr, Expr, Extension, LogicalPlan, Operator, TableProviderFilterPushDown,
6868
Volatility,
6969
};
70-
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
70+
use datafusion_physical_expr::PhysicalExpr;
7171
use datafusion_physical_plan::filter::FilterExec;
72-
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
73-
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
72+
use datafusion_physical_plan::limit::LocalLimitExec;
7473
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
75-
use datafusion_physical_plan::projection::ProjectionExec;
7674
use datafusion_physical_plan::{
7775
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
7876
Statistics,
@@ -85,7 +83,6 @@ use either::Either;
8583
use futures::TryStreamExt;
8684
use itertools::Itertools;
8785
use object_store::ObjectMeta;
88-
use parking_lot::RwLock;
8986
use serde::{Deserialize, Serialize};
9087

9188
use url::Url;
@@ -1036,107 +1033,6 @@ impl TableProvider for DeltaTableProvider {
10361033
}
10371034
}
10381035

1039-
#[derive(Debug)]
1040-
pub struct LazyTableProvider {
1041-
schema: Arc<ArrowSchema>,
1042-
batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
1043-
}
1044-
1045-
impl LazyTableProvider {
1046-
/// Build a DeltaTableProvider
1047-
pub fn try_new(
1048-
schema: Arc<ArrowSchema>,
1049-
batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
1050-
) -> DeltaResult<Self> {
1051-
Ok(LazyTableProvider { schema, batches })
1052-
}
1053-
}
1054-
1055-
#[async_trait]
1056-
impl TableProvider for LazyTableProvider {
1057-
fn as_any(&self) -> &dyn Any {
1058-
self
1059-
}
1060-
1061-
fn schema(&self) -> Arc<ArrowSchema> {
1062-
self.schema.clone()
1063-
}
1064-
1065-
fn table_type(&self) -> TableType {
1066-
TableType::Base
1067-
}
1068-
1069-
fn get_table_definition(&self) -> Option<&str> {
1070-
None
1071-
}
1072-
1073-
fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
1074-
None
1075-
}
1076-
1077-
async fn scan(
1078-
&self,
1079-
_session: &dyn Session,
1080-
projection: Option<&Vec<usize>>,
1081-
filters: &[Expr],
1082-
limit: Option<usize>,
1083-
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
1084-
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(LazyMemoryExec::try_new(
1085-
self.schema(),
1086-
self.batches.clone(),
1087-
)?);
1088-
1089-
let df_schema: DFSchema = plan.schema().try_into()?;
1090-
1091-
if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
1092-
let physical_expr =
1093-
create_physical_expr(&filter_expr, &df_schema, &ExecutionProps::new())?;
1094-
plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
1095-
}
1096-
1097-
if let Some(projection) = projection {
1098-
let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
1099-
if projection != &current_projection {
1100-
let execution_props = &ExecutionProps::new();
1101-
let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
1102-
.iter()
1103-
.map(|i| {
1104-
let (table_ref, field) = df_schema.qualified_field(*i);
1105-
create_physical_expr(
1106-
&Expr::Column(Column::from((table_ref, field))),
1107-
&df_schema,
1108-
execution_props,
1109-
)
1110-
.map(|expr| (expr, field.name().clone()))
1111-
.map_err(DeltaTableError::from)
1112-
})
1113-
.collect();
1114-
plan = Arc::new(ProjectionExec::try_new(fields?, plan)?);
1115-
}
1116-
}
1117-
1118-
if let Some(limit) = limit {
1119-
plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
1120-
};
1121-
1122-
Ok(plan)
1123-
}
1124-
1125-
fn supports_filters_pushdown(
1126-
&self,
1127-
filter: &[&Expr],
1128-
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
1129-
Ok(filter
1130-
.iter()
1131-
.map(|_| TableProviderFilterPushDown::Inexact)
1132-
.collect())
1133-
}
1134-
1135-
fn statistics(&self) -> Option<Statistics> {
1136-
None
1137-
}
1138-
}
1139-
11401036
// TODO: this will likely also need to perform column mapping later when we support reader protocol v2
11411037
/// A wrapper for parquet scans
11421038
#[derive(Debug)]

python/Cargo.toml

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
[package]
22
name = "deltalake-python"
33
version = "1.0.2"
4-
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
4+
authors = [
5+
"Qingping Hou <[email protected]>",
6+
"Will Jones <[email protected]>",
7+
]
58
homepage = "https://github.com/delta-io/delta-rs"
69
license = "Apache-2.0"
710
description = "Native Delta Lake Python binding based on delta-rs with Pandas integration"
@@ -17,19 +20,24 @@ doc = false
1720
[dependencies]
1821
delta_kernel.workspace = true
1922

20-
pyo3-arrow = { version = "0.9.0", default-features = false}
23+
pyo3-arrow = { version = "0.9.0", default-features = false }
2124

2225
# arrow
2326
arrow-schema = { workspace = true, features = ["serde"] }
2427

2528
# datafusion
29+
# datafusion-catalog = { workspace = true }
30+
datafusion-expr = { workspace = true }
2631
datafusion-ffi = { workspace = true }
32+
datafusion-physical-expr = { workspace = true }
33+
datafusion-physical-plan = { workspace = true }
2734

2835
# serde
2936
serde = { workspace = true }
3037
serde_json = { workspace = true }
3138

3239
# "stdlib"
40+
async-trait = { workspace = true }
3341
chrono = { workspace = true }
3442
env_logger = "0"
3543
regex = { workspace = true }
@@ -46,15 +54,23 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
4654
deltalake-mount = { path = "../crates/mount" }
4755

4856
# catalog-unity
49-
deltalake-catalog-unity = { path = "../crates/catalog-unity", features = ["aws", "azure", "gcp", "r2"] }
57+
deltalake-catalog-unity = { path = "../crates/catalog-unity", features = [
58+
"aws",
59+
"azure",
60+
"gcp",
61+
"r2",
62+
] }
5063

5164
# Non-unix or emscripten os
5265
[target.'cfg(any(not(target_family = "unix"), target_os = "emscripten"))'.dependencies]
5366
mimalloc = { version = "0.1", default-features = false }
5467

5568
# Unix (excluding macOS & emscripten) → jemalloc
5669
[target.'cfg(all(target_family = "unix", not(target_os = "macos"), not(target_os = "emscripten")))'.dependencies]
57-
jemallocator = { version = "0.5", features = ["disable_initial_exec_tls", "background_threads"] }
70+
jemallocator = { version = "0.5", features = [
71+
"disable_initial_exec_tls",
72+
"background_threads",
73+
] }
5874

5975
# macOS → jemalloc (without background_threads) (https://github.com/jemalloc/jemalloc/issues/843)
6076
[target.'cfg(all(target_family = "unix", target_os = "macos"))'.dependencies]
@@ -67,7 +83,15 @@ features = ["extension-module", "abi3", "abi3-py39"]
6783
[dependencies.deltalake]
6884
path = "../crates/deltalake"
6985
version = "0"
70-
features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "hdfs", "lakefs"]
86+
features = [
87+
"azure",
88+
"gcs",
89+
"python",
90+
"datafusion",
91+
"unity-experimental",
92+
"hdfs",
93+
"lakefs",
94+
]
7195

7296
[features]
7397
default = ["rustls"]

0 commit comments

Comments
 (0)