Skip to content

Commit 5169a98

Browse files
authored
chore(datafusion): prune crate structure (#15)
* feat: setup simple configuration * chore: prune crate structure
1 parent 904cb50 commit 5169a98

File tree

13 files changed

+339
-333
lines changed

13 files changed

+339
-333
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ delta_kernel = { version = "0.14.0", features = [
2121
# unitycatalog-common = { path = "../unitycatalog-rs/crates/common", default-features = false, features = [
2222
# "rest-client",
2323
# ] }
24-
unitycatalog-common = { git = "https://github.com/unitycatalog-incubator/unitycatalog-rs", rev = "b840bd195c2c1c3ec5c692e245acef794f677190", default-features = false, features = [
24+
unitycatalog-common = { git = "https://github.com/unitycatalog-incubator/unitycatalog-rs", rev = "1db3e80940499b851c7cd070ecfd257f7e552fc7", default-features = false, features = [
2525
"rest-client",
2626
] }

crates/datafusion/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,22 @@
44
pub mod config;
55
mod engine;
66
mod error;
7-
mod exec;
8-
mod log_table_provider;
97
mod planner;
108
mod schema_provider;
119
mod session;
1210
pub mod sql;
13-
mod table_provider;
11+
pub mod table_provider;
12+
mod unity;
1413
mod utils;
1514

1615
// re-export the version type as it is part of public api of this crate.
1716
pub use delta_kernel::Version;
1817
pub use engine::DataFusionEngine;
19-
pub use log_table_provider::{DeltaLogReplayProvider, DeltaLogTableProvider};
2018
pub use schema_provider::DeltaLakeSchemaProvider;
2119
pub use session::{
2220
KernelContextExt, KernelExtensionConfig, KernelSessionExt, KernelTaskContextExt,
2321
ObjectStoreFactory,
2422
};
25-
pub use table_provider::{DeltaTableProvider, ScanFileContext, TableScan, TableSnapshot};
2623

2724
#[cfg(test)]
2825
pub(crate) mod tests {

crates/datafusion/src/planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ use sqlparser::dialect::dialect_from_str;
1313
use unitycatalog_common::client::UnityCatalogClient;
1414

1515
use crate::{
16-
exec::{ExecutableUnityCatalogStement, UnityCatalogRequestExec},
1716
sql::{
1817
CREATE_UC_RETURN_SCHEMA, DROP_UC_RETURN_SCHEMA, ExecuteUnityCatalogPlanNode,
1918
HFParserBuilder, Statement, UnityCatalogStatement, uc_statement_to_plan,
2019
},
20+
unity::{ExecutableUnityCatalogStement, UnityCatalogRequestExec},
2121
};
2222

2323
#[derive(Debug)]

crates/datafusion/src/schema_provider.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use delta_kernel::Snapshot;
88
use parking_lot::RwLock;
99
use url::Url;
1010

11+
use crate::KernelSessionExt as _;
1112
use crate::session::ensure_object_store;
12-
use crate::{DeltaTableProvider, KernelSessionExt as _};
13+
use crate::table_provider::DeltaTableProvider;
1314

1415
#[derive(Debug)]
1516
pub struct DeltaLakeSchemaProvider {

crates/datafusion/src/sql/statements.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use datafusion::{
1717
pub use catalogs::*;
1818
use serde::Serialize;
1919

20-
use crate::exec::ExecutableUnityCatalogStement;
20+
use crate::unity::exec::ExecutableUnityCatalogStement;
2121

2222
mod catalogs;
2323

File renamed without changes.
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
//! Datafusion `TableProvider` implementation for Delta tables.
2+
3+
use std::sync::Arc;
4+
5+
use async_trait::async_trait;
6+
use datafusion::catalog::{Session, TableProvider};
7+
use datafusion::common::error::Result;
8+
use datafusion::common::{DFSchema, DataFusionError, HashMap, ScalarValue, not_impl_err};
9+
use datafusion::datasource::listing::PartitionedFile;
10+
use datafusion::datasource::physical_plan::FileSource;
11+
use datafusion::datasource::physical_plan::parquet::{
12+
DefaultParquetFileReaderFactory, ParquetAccessPlan, RowGroupAccess,
13+
};
14+
use datafusion::datasource::physical_plan::{
15+
FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
16+
};
17+
use datafusion::datasource::source::DataSourceExec;
18+
use datafusion::execution::object_store::ObjectStoreUrl;
19+
use datafusion::logical_expr::dml::InsertOp;
20+
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
21+
use datafusion::parquet::arrow::arrow_reader::RowSelection;
22+
use datafusion::parquet::file::metadata::RowGroupMetaData;
23+
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
24+
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr, union::UnionExec};
25+
use delta_kernel::ExpressionRef;
26+
use delta_kernel::arrow::datatypes::SchemaRef as ArrowSchemaRef;
27+
use delta_kernel::schema::DataType as DeltaDataType;
28+
use delta_kernel::snapshot::Snapshot;
29+
use futures::stream::{StreamExt, TryStreamExt};
30+
use itertools::Itertools;
31+
32+
use self::exec::{DeltaScanExec, FILE_ID_FIELD};
33+
pub use self::snapshot::DeltaTableSnapshot;
34+
pub use self::table_format::{ScanFileContext, TableScan, TableSnapshot};
35+
use crate::engine::NestedSchemaAdapterFactory;
36+
use crate::engine::{to_datafusion_expr, to_delta_predicate};
37+
use crate::session::KernelSessionExt as _;
38+
use crate::utils::{AsObjectStorePath, AsObjectStoreUrl};
39+
40+
mod exec;
41+
mod snapshot;
42+
mod table_format;
43+
44+
pub struct DeltaTableProvider {
45+
snapshot: Arc<DeltaTableSnapshot>,
46+
}
47+
48+
impl std::fmt::Debug for DeltaTableProvider {
49+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50+
f.debug_struct("DeltaTableProvider")
51+
.field("snapshot", &self.snapshot)
52+
.finish()
53+
}
54+
}
55+
56+
impl DeltaTableProvider {
57+
pub fn try_new(snapshot: Arc<Snapshot>) -> Result<Self> {
58+
let snapshot = DeltaTableSnapshot::try_new(snapshot)?;
59+
Ok(Self {
60+
snapshot: Arc::new(snapshot),
61+
})
62+
}
63+
64+
pub(crate) fn current_snapshot(&self) -> &Arc<Snapshot> {
65+
self.snapshot.current_snapshot()
66+
}
67+
}
68+
69+
#[async_trait]
70+
impl TableProvider for DeltaTableProvider {
71+
fn as_any(&self) -> &dyn std::any::Any {
72+
self
73+
}
74+
75+
fn schema(&self) -> ArrowSchemaRef {
76+
Arc::clone(self.snapshot.table_schema())
77+
}
78+
79+
fn table_type(&self) -> TableType {
80+
TableType::Base
81+
}
82+
83+
fn supports_filters_pushdown(
84+
&self,
85+
filters: &[&Expr],
86+
) -> Result<Vec<TableProviderFilterPushDown>> {
87+
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
88+
}
89+
90+
async fn scan(
91+
&self,
92+
state: &dyn Session,
93+
projection: Option<&Vec<usize>>,
94+
filters: &[Expr],
95+
limit: Option<usize>,
96+
) -> Result<Arc<dyn ExecutionPlan>> {
97+
let pred = to_delta_predicate(filters)?.into();
98+
let table_scan = self.snapshot.scan_metadata(state, projection, pred).await?;
99+
100+
let delta_schema = self.snapshot.schema().into();
101+
102+
// Convert the delta expressions from the scan into a map of file id to datafusion physical expression
103+
// these will be applied to convert the raw data read from disk into the logical table schema
104+
let physical_schema_df = table_scan.physical_schema.clone().try_into()?;
105+
let file_transform = |file_ctx: &ScanFileContext| {
106+
file_ctx.transform.as_ref().map(|t| {
107+
to_physical(state, &physical_schema_df, t, &delta_schema)
108+
.map(|expr| (file_ctx.file_url.to_string(), expr))
109+
})
110+
};
111+
let transform_by_file = table_scan
112+
.files
113+
.iter()
114+
.filter_map(file_transform)
115+
.try_collect::<_, HashMap<_, _>, _>()
116+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
117+
118+
// Convert the files into datafusions `PartitionedFile`s grouped by the object store they are stored in
119+
// this is used to create a DataSourceExec plan for each store
120+
// To correlate the data with the original file, we add the file url as a partition value
121+
// This is required to apply the correct transform to the data in downstream processing.
122+
let to_partitioned_file = |f: ScanFileContext| {
123+
let file_path = f.file_url.as_object_store_path();
124+
let mut partitioned_file = PartitionedFile::new(file_path.to_string(), f.size);
125+
partitioned_file.partition_values =
126+
vec![ScalarValue::Utf8(Some(f.file_url.to_string()))];
127+
// NB: we need to reassign the location since the 'new' method does
128+
// incorrect or inconsistent encoding internally.
129+
partitioned_file.object_meta.location = file_path;
130+
Ok::<_, DataFusionError>((
131+
f.file_url.as_object_store_url(),
132+
(partitioned_file, f.selection_vector),
133+
))
134+
};
135+
136+
let files_by_store = table_scan
137+
.files
138+
.into_iter()
139+
.flat_map(to_partitioned_file)
140+
.into_group_map();
141+
142+
let plan = get_read_plan(files_by_store, &table_scan.physical_schema, state, limit).await?;
143+
144+
Ok(Arc::new(DeltaScanExec::new(
145+
table_scan.logical_schema,
146+
plan,
147+
Arc::new(transform_by_file),
148+
)))
149+
}
150+
151+
/// Return an [`ExecutionPlan`] to insert data into this table, if
152+
/// supported.
153+
///
154+
/// The returned plan should return a single row in a UInt64
155+
/// column called "count" such as the following
156+
///
157+
/// ```text
158+
/// +-------+,
159+
/// | count |,
160+
/// +-------+,
161+
/// | 6 |,
162+
/// +-------+,
163+
/// ```
164+
///
165+
/// # See Also
166+
///
167+
/// See [`DataSinkExec`] for the common pattern of inserting a
168+
/// streams of `RecordBatch`es as files to an ObjectStore.
169+
///
170+
/// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
171+
async fn insert_into(
172+
&self,
173+
_state: &dyn Session,
174+
_input: Arc<dyn ExecutionPlan>,
175+
_insert_op: InsertOp,
176+
) -> Result<Arc<dyn ExecutionPlan>> {
177+
not_impl_err!("Insert into not implemented for this table")
178+
}
179+
}
180+
181+
async fn get_read_plan(
182+
files_by_store: impl IntoIterator<
183+
Item = (ObjectStoreUrl, Vec<(PartitionedFile, Option<Vec<bool>>)>),
184+
>,
185+
physical_schema: &ArrowSchemaRef,
186+
state: &dyn Session,
187+
limit: Option<usize>,
188+
) -> Result<Arc<dyn ExecutionPlan>> {
189+
// TODO: update parquet source.
190+
let source = ParquetSource::default();
191+
let metrics = ExecutionPlanMetricsSet::new();
192+
193+
let mut plans = Vec::new();
194+
195+
for (store_url, files) in files_by_store.into_iter() {
196+
state.ensure_object_store(store_url.as_ref()).await?;
197+
198+
let store = state.runtime_env().object_store(&store_url)?;
199+
let reader_factory = source
200+
.parquet_file_reader_factory()
201+
.cloned()
202+
.unwrap_or_else(|| Arc::new(DefaultParquetFileReaderFactory::new(store)));
203+
204+
let file_group = compute_parquet_access_plans(&reader_factory, files, &metrics).await?;
205+
206+
// TODO: convert passed predicate to an expression in terms of physical columns
207+
// and add it to the FileScanConfig
208+
let file_source =
209+
source.with_schema_adapter_factory(Arc::new(NestedSchemaAdapterFactory))?;
210+
let config = FileScanConfigBuilder::new(store_url, physical_schema.clone(), file_source)
211+
.with_file_group(file_group.into_iter().collect())
212+
.with_table_partition_cols(vec![FILE_ID_FIELD.clone()])
213+
.with_limit(limit)
214+
.build();
215+
let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
216+
plans.push(plan);
217+
}
218+
219+
let plan = match plans.len() {
220+
1 => plans.remove(0),
221+
_ => Arc::new(UnionExec::new(plans)),
222+
};
223+
Ok(match plan.with_fetch(limit) {
224+
Some(limit) => limit,
225+
None => plan,
226+
})
227+
}
228+
229+
// convert a delta expression to a datafusion physical expression
230+
// we return a vector of expressions implicitly representing structs,
231+
// as there is no top-level Struct expression type in datafusion
232+
fn to_physical(
233+
state: &dyn Session,
234+
physical_schema_df: &DFSchema,
235+
transform: &ExpressionRef,
236+
output_type: &DeltaDataType,
237+
) -> Result<Arc<dyn PhysicalExpr>> {
238+
state.create_physical_expr(
239+
to_datafusion_expr(transform, output_type)?,
240+
physical_schema_df,
241+
)
242+
}
243+
244+
async fn compute_parquet_access_plans(
245+
reader_factory: &Arc<dyn ParquetFileReaderFactory>,
246+
files: Vec<(PartitionedFile, Option<Vec<bool>>)>,
247+
metrics: &ExecutionPlanMetricsSet,
248+
) -> Result<Vec<PartitionedFile>> {
249+
futures::stream::iter(files)
250+
// HACK: using filter_map here since 'map' somehow does not accept futures.
251+
.filter_map(|(partitioned_file, selection_vector)| async {
252+
if let Some(sv) = selection_vector {
253+
Some(pq_access_plan(reader_factory, partitioned_file, sv, metrics).await)
254+
} else {
255+
Some(Ok(partitioned_file))
256+
}
257+
})
258+
.try_collect::<Vec<_>>()
259+
.await
260+
}
261+
262+
async fn pq_access_plan(
263+
reader_factory: &Arc<dyn ParquetFileReaderFactory>,
264+
partitioned_file: PartitionedFile,
265+
selection_vector: Vec<bool>,
266+
metrics: &ExecutionPlanMetricsSet,
267+
) -> Result<PartitionedFile> {
268+
let mut parquet_file_reader = reader_factory.create_reader(
269+
0,
270+
partitioned_file.object_meta.clone().into(),
271+
None,
272+
metrics,
273+
)?;
274+
275+
let parquet_metadata = parquet_file_reader.get_metadata(None).await?;
276+
let total_rows = parquet_metadata
277+
.row_groups()
278+
.iter()
279+
.map(RowGroupMetaData::num_rows)
280+
.sum::<i64>();
281+
282+
let selection_vector = get_full_selection_vector(&selection_vector, total_rows as usize);
283+
284+
// Create a ParquetAccessPlan that will be used to skip rows based on the selection vector
285+
let mut row_groups: Vec<RowGroupAccess> = vec![];
286+
let mut row_group_row_start = 0;
287+
for row_group in parquet_metadata.row_groups().iter() {
288+
// If all rows in the row group are deleted, skip the row group
289+
let row_group_access = get_row_group_access(
290+
&selection_vector,
291+
row_group_row_start,
292+
row_group.num_rows() as usize,
293+
);
294+
row_groups.push(row_group_access);
295+
row_group_row_start += row_group.num_rows() as usize;
296+
}
297+
298+
let plan = ParquetAccessPlan::new(row_groups);
299+
300+
Ok(partitioned_file.with_extensions(Arc::new(plan)))
301+
}
302+
303+
fn get_row_group_access(selection_vector: &[bool], start: usize, offset: usize) -> RowGroupAccess {
304+
// If all rows in the row group are deleted (i.e. not selected), skip the row group
305+
if !selection_vector[start..start + offset].iter().any(|&x| x) {
306+
return RowGroupAccess::Skip;
307+
}
308+
// If all rows in the row group are present (i.e. selected), scan the full row group
309+
if selection_vector[start..start + offset].iter().all(|&x| x) {
310+
return RowGroupAccess::Scan;
311+
}
312+
313+
let mask = selection_vector[start..start + offset].to_vec();
314+
315+
// If some rows are deleted, get a row selection that skips the deleted rows
316+
let row_selection = RowSelection::from_filters(&[mask.into()]);
317+
RowGroupAccess::Selection(row_selection)
318+
}
319+
320+
fn get_full_selection_vector(selection_vector: &[bool], total_rows: usize) -> Vec<bool> {
321+
let mut new_selection_vector = vec![true; total_rows];
322+
let copy_len = std::cmp::min(selection_vector.len(), total_rows);
323+
new_selection_vector[..copy_len].copy_from_slice(&selection_vector[..copy_len]);
324+
new_selection_vector
325+
}
File renamed without changes.

crates/datafusion/src/table_provider/table_format.rs renamed to crates/datafusion/src/table_provider/delta/table_format.rs

File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)