Skip to content

Commit e93217a

Browse files
committed
Initial more extendable access plan
Signed-off-by: Adam Gutglick <[email protected]>
1 parent 2e137f2 commit e93217a

File tree

3 files changed

+39
-22
lines changed

3 files changed

+39
-22
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex::scan::ScanBuilder;
5+
use vortex::scan::Selection;
6+
7+
/// Custom Vortex-specific information that can be provided by external indexes or other sources.
8+
///
9+
/// This is intended as a low-level interface for users building their own data systems, see the [advance index] example from the DataFusion repo for a similar usage with Parquet.
10+
///
11+
/// [advance index]: https://github.com/apache/datafusion/blob/47df535d2cd5aac5ad5a92bdc837f38e05ea0f0f/datafusion-examples/examples/data_io/parquet_advanced_index.rs
12+
pub struct VortexAccessPlan {
13+
selection: Option<Selection>,
14+
}
15+
16+
impl VortexAccessPlan {
17+
/// Apply the plan to the scan's builder.
18+
pub fn apply_to_builder<A>(&self, mut scan_builder: ScanBuilder<A>) -> ScanBuilder<A>
19+
where
20+
A: 'static + Send,
21+
{
22+
let Self { selection } = self;
23+
24+
if let Some(selection) = selection {
25+
scan_builder = scan_builder.with_selection(selection.clone());
26+
}
27+
28+
scan_builder
29+
}
30+
}

vortex-datafusion/src/persistent/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
//! Persistent implementation of a Vortex table provider.
5+
mod access_plan;
56
mod cache;
67
mod format;
78
pub mod metrics;
89
mod opener;
910
mod sink;
1011
mod source;
1112

13+
pub use access_plan::VortexAccessPlan;
1214
pub use format::VortexFormat;
1315
pub use format::VortexFormatFactory;
1416
pub use format::VortexOptions;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ use vortex::layout::LayoutReader;
4141
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
4242
use vortex::metrics::VortexMetrics;
4343
use vortex::scan::ScanBuilder;
44-
use vortex::scan::Selection;
4544
use vortex::session::VortexSession;
4645
use vortex_utils::aliases::dash_map::DashMap;
4746
use vortex_utils::aliases::dash_map::Entry;
4847

4948
use super::cache::VortexFileCache;
49+
use crate::VortexAccessPlan;
5050
use crate::convert::exprs::can_be_pushed_down;
5151
use crate::convert::exprs::make_vortex_predicate;
5252

@@ -102,7 +102,6 @@ impl FileOpener for VortexOpener {
102102
let metrics = self.metrics.clone();
103103
let layout_reader = self.layout_readers.clone();
104104
let has_output_ordering = self.has_output_ordering;
105-
let extensions = file.extensions.clone();
106105

107106
let projected_schema = match projection.as_ref() {
108107
None => table_schema.file_schema().clone(),
@@ -229,9 +228,13 @@ impl FileOpener for VortexOpener {
229228
};
230229

231230
let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader);
232-
if let Some(selection) = get_selection_from_extensions(extensions) {
233-
scan_builder = scan_builder.with_selection(selection);
231+
232+
if let Some(extensions) = file.extensions
233+
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
234+
{
235+
scan_builder = vortex_plan.apply_to_builder(scan_builder);
234236
}
237+
235238
if let Some(file_range) = file.range {
236239
scan_builder = apply_byte_range(
237240
file_range,
@@ -361,24 +364,6 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
361364
start_row..u64::min(row_count, end_row)
362365
}
363366

364-
/// Attempts to extract a `Selection` from the extensions object, if present.
365-
///
366-
/// This function is used to retrieve the row selection plan that may have been
367-
/// attached to a `PartitionedFile` via its `extensions` field.
368-
///
369-
/// # Arguments
370-
///
371-
/// * `extensions` - Optional type-erased extensions object that may contain a `Selection`
372-
///
373-
/// # Returns
374-
///
375-
/// Returns `Some(Selection)` if the extensions contain a valid `Selection`, otherwise `None`.
376-
fn get_selection_from_extensions(
377-
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
378-
) -> Option<Selection> {
379-
extensions?.downcast_ref::<Selection>().cloned()
380-
}
381-
382367
#[cfg(test)]
383368
mod tests {
384369
use std::sync::LazyLock;

0 commit comments

Comments
 (0)