Skip to content

Commit f8dcef3

Browse files
committed
feat: add convenience extensions for kernel engine types
Signed-off-by: Robert Pack <[email protected]>
1 parent 028db69 commit f8dcef3

File tree

3 files changed

+147
-21
lines changed

3 files changed

+147
-21
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
//! Utilities for interacting with Kernel APIs using Arrow data structures.
2+
//!
3+
use delta_kernel::arrow::array::BooleanArray;
4+
use delta_kernel::arrow::compute::filter_record_batch;
5+
use delta_kernel::arrow::record_batch::RecordBatch;
6+
use delta_kernel::engine::arrow_data::ArrowEngineData;
7+
use delta_kernel::scan::{Scan, ScanMetadata};
8+
use delta_kernel::{
9+
DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version,
10+
};
11+
use itertools::Itertools;
12+
13+
/// [`ScanMetadata`] contains (1) a [`RecordBatch`] specifying data files to be scanned
14+
/// and (2) a vector of transforms (one transform per scan file) that must be applied to the data read
15+
/// from those files.
16+
pub(crate) struct ScanMetadataArrow {
17+
/// Record batch with one row per file to scan
18+
pub scan_files: RecordBatch,
19+
20+
/// Row-level transformations to apply to data read from files.
21+
///
22+
/// Each entry in this vector corresponds to a row in the `scan_files` data. The entry is an
23+
/// expression that must be applied to convert the file's data into the logical schema
24+
/// expected by the scan:
25+
///
26+
/// - `Some(expr)`: Apply this expression to transform the data to match [`Scan::schema()`].
27+
/// - `None`: No transformation is needed; the data is already in the correct logical form.
28+
///
29+
/// Note: This vector can be indexed by row number.
30+
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
31+
}
32+
33+
pub(crate) trait ScanExt {
34+
/// Get the metadata for a table scan.
35+
///
36+
/// This method handles translation between `EngineData` and `RecordBatch`
37+
/// and will already apply any selection vectors to the data.
38+
/// See [`Scan::scan_metadata`] for details.
39+
fn scan_metadata_arrow(
40+
&self,
41+
engine: &dyn Engine,
42+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
43+
44+
fn scan_metadata_from_arrow(
45+
&self,
46+
engine: &dyn Engine,
47+
existing_version: Version,
48+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
49+
existing_predicate: Option<PredicateRef>,
50+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
51+
}
52+
53+
impl ScanExt for Scan {
54+
fn scan_metadata_arrow(
55+
&self,
56+
engine: &dyn Engine,
57+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
58+
Ok(self
59+
.scan_metadata(engine)?
60+
.map_ok(kernel_to_arrow)
61+
.flatten())
62+
}
63+
64+
fn scan_metadata_from_arrow(
65+
&self,
66+
engine: &dyn Engine,
67+
existing_version: Version,
68+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
69+
existing_predicate: Option<PredicateRef>,
70+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
71+
let engine_iter =
72+
existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box<dyn EngineData>);
73+
Ok(self
74+
.scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)?
75+
.map_ok(kernel_to_arrow)
76+
.flatten())
77+
}
78+
}
79+
80+
fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
81+
let scan_file_transforms = metadata
82+
.scan_file_transforms
83+
.into_iter()
84+
.enumerate()
85+
.filter_map(|(i, v)| metadata.scan_files.selection_vector[i].then_some(v))
86+
.collect();
87+
let batch = ArrowEngineData::try_from_engine_data(metadata.scan_files.data)?.into();
88+
let scan_files = filter_record_batch(
89+
&batch,
90+
&BooleanArray::from(metadata.scan_files.selection_vector),
91+
)?;
92+
Ok(ScanMetadataArrow {
93+
scan_files,
94+
scan_file_transforms,
95+
})
96+
}
97+
98+
pub(crate) trait ExpressionEvaluatorExt {
99+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
100+
}
101+
102+
impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
103+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch> {
104+
let engine_data = ArrowEngineData::new(batch);
105+
Ok(ArrowEngineData::try_from_engine_data(T::evaluate(self, &engine_data)?)?.into())
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
mod tests {
111+
use std::sync::Arc;
112+
113+
use super::ExpressionEvaluatorExt as _;
114+
115+
use delta_kernel::arrow::array::Int32Array;
116+
use delta_kernel::arrow::datatypes::{DataType, Field, Schema};
117+
use delta_kernel::arrow::record_batch::RecordBatch;
118+
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
119+
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
120+
use delta_kernel::expressions::*;
121+
use delta_kernel::EvaluationHandler;
122+
123+
#[test]
124+
fn test_evaluate_arrow() {
125+
let handler = ArrowEvaluationHandler;
126+
127+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
128+
let values = Int32Array::from(vec![1, 2, 3]);
129+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
130+
131+
let expression = column_expr!("a");
132+
let expr = handler.new_expression_evaluator(
133+
Arc::new((&schema).try_into_kernel().unwrap()),
134+
expression,
135+
delta_kernel::schema::DataType::INTEGER,
136+
);
137+
138+
let result = expr.evaluate_arrow(batch);
139+
assert!(result.is_ok());
140+
}
141+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
//! Conversions between Delta and Arrow data types
2+
pub(crate) mod engine_ext;
23
pub(crate) mod extract;
34
pub(crate) mod json;

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -561,12 +561,12 @@ mod datafusion {
561561
use datafusion_common::scalar::ScalarValue;
562562
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
563563
use datafusion_common::Column;
564-
use delta_kernel::engine::arrow_data::ArrowEngineData;
565564
use delta_kernel::expressions::Expression;
566565
use delta_kernel::schema::{DataType, PrimitiveType};
567566
use delta_kernel::{EvaluationHandler, ExpressionEvaluator};
568567

569568
use super::*;
569+
use crate::kernel::arrow::engine_ext::ExpressionEvaluatorExt as _;
570570
use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column};
571571
use crate::kernel::ARROW_HANDLER;
572572

@@ -801,16 +801,8 @@ mod datafusion {
801801
);
802802
let mut results = Vec::with_capacity(self.data.len());
803803
for batch in self.data.iter() {
804-
let engine = ArrowEngineData::new(batch.clone());
805-
let result = evaluator.evaluate(&engine).ok()?;
806-
let result = result
807-
.any_ref()
808-
.downcast_ref::<ArrowEngineData>()
809-
.ok_or(DeltaTableError::generic(
810-
"failed to downcast evaluator result to ArrowEngineData.",
811-
))
812-
.ok()?;
813-
results.push(result.record_batch().clone());
804+
let result = evaluator.evaluate_arrow(batch.clone()).ok()?;
805+
results.push(result);
814806
}
815807
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
816808
batch.column_by_name("output").cloned()
@@ -875,16 +867,8 @@ mod datafusion {
875867

876868
let mut results = Vec::with_capacity(self.data.len());
877869
for batch in self.data.iter() {
878-
let engine = ArrowEngineData::new(batch.clone());
879-
let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?;
880-
let result = result
881-
.any_ref()
882-
.downcast_ref::<ArrowEngineData>()
883-
.ok_or(DeltaTableError::generic(
884-
"failed to downcast evaluator result to ArrowEngineData.",
885-
))
886-
.ok()?;
887-
results.push(result.record_batch().clone());
870+
let result = ROW_COUNTS_EVAL.evaluate_arrow(batch.clone()).ok()?;
871+
results.push(result);
888872
}
889873
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
890874
arrow_cast::cast(batch.column_by_name("output")?, &ArrowDataType::UInt64).ok()

0 commit comments

Comments
 (0)