diff --git a/vortex-datafusion/src/persistent/access_plan.rs b/vortex-datafusion/src/persistent/access_plan.rs new file mode 100644 index 00000000000..2a523ce8e16 --- /dev/null +++ b/vortex-datafusion/src/persistent/access_plan.rs @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex::scan::ScanBuilder; +use vortex::scan::Selection; + +/// Custom Vortex-specific information that can be provided by external indexes or other sources. +/// +/// This is intended as a low-level interface for users building their own data systems, see the [advance index] example from the DataFusion repo for a similar usage with Parquet. +/// +/// [advance index]: https://github.com/apache/datafusion/blob/47df535d2cd5aac5ad5a92bdc837f38e05ea0f0f/datafusion-examples/examples/data_io/parquet_advanced_index.rs +#[derive(Default)] +pub struct VortexAccessPlan { + selection: Option, +} + +impl VortexAccessPlan { + /// Sets a [`Selection`] for this plan. + pub fn with_selection(mut self, selection: Selection) -> Self { + self.selection = Some(selection); + self + } +} + +impl VortexAccessPlan { + /// Apply the plan to the scan's builder. + pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder + where + A: 'static + Send, + { + let Self { selection } = self; + + if let Some(selection) = selection { + scan_builder = scan_builder.with_selection(selection.clone()); + } + + scan_builder + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index b1b1e5f727f..2e7f419cd99 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors //! Persistent implementation of a Vortex table provider. +mod access_plan; mod cache; mod format; pub mod metrics; @@ -9,6 +10,7 @@ mod opener; mod sink; mod source; +pub use access_plan::VortexAccessPlan; pub use format::VortexFormat; pub use format::VortexFormatFactory; pub use format::VortexOptions; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 11b1ff18a3e..b6f43338788 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -41,12 +41,12 @@ use vortex::layout::LayoutReader; use vortex::layout::layouts::USE_VORTEX_OPERATORS; use vortex::metrics::VortexMetrics; use vortex::scan::ScanBuilder; -use vortex::scan::Selection; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; +use crate::VortexAccessPlan; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; @@ -102,7 +102,6 @@ impl FileOpener for VortexOpener { let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; - let extensions = file.extensions.clone(); let projected_schema = match projection.as_ref() { None => table_schema.file_schema().clone(), @@ -229,9 +228,13 @@ impl FileOpener for VortexOpener { }; let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); - if let Some(selection) = get_selection_from_extensions(extensions) { - scan_builder = scan_builder.with_selection(selection); + + if let Some(extensions) = file.extensions + && let Some(vortex_plan) = extensions.downcast_ref::() + { + scan_builder = vortex_plan.apply_to_builder(scan_builder); } + if let Some(file_range) = file.range { scan_builder = apply_byte_range( file_range, @@ -361,30 +364,13 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u start_row..u64::min(row_count, end_row) } -/// Attempts to extract a `Selection` from the extensions object, if present. -/// -/// This function is used to retrieve the row selection plan that may have been -/// attached to a `PartitionedFile` via its `extensions` field. -/// -/// # Arguments -/// -/// * `extensions` - Optional type-erased extensions object that may contain a `Selection` -/// -/// # Returns -/// -/// Returns `Some(Selection)` if the extensions contain a valid `Selection`, otherwise `None`. -fn get_selection_from_extensions( - extensions: Option>, -) -> Option { - extensions?.downcast_ref::().cloned() -} - #[cfg(test)] mod tests { use std::sync::LazyLock; use arrow_schema::Field; use arrow_schema::Fields; + use arrow_schema::SchemaRef; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; @@ -411,6 +397,7 @@ mod tests { use vortex::session::VortexSession; use super::*; + use crate::VortexAccessPlan; use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory; static SESSION: LazyLock = LazyLock::new(VortexSession::default); @@ -901,152 +888,191 @@ mod tests { Ok(()) } - // #[tokio::test] - // // Test that Selection::IncludeByIndex filters to specific row indices. - // async fn test_selection_include_by_index() -> anyhow::Result<()> { - // use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - - // let object_store = Arc::new(InMemory::new()) as Arc; - // let file_path = "/path/file.vortex"; - - // let batch = make_test_batch_with_10_rows(); - // let data_size = - // write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - // let table_schema = batch.schema(); - // let file_meta = make_meta(file_path, data_size); - // let mut file = PartitionedFile::new(file_path.to_string(), data_size); - // file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter( - // vec![1, 3, 5, 7], - // )))); - - // let opener = make_test_opener( - // object_store.clone(), - // table_schema.clone(), - // Some(vec![0, 1].into()), - // ); - - // let stream = opener.open(file_meta, file)?.await?; - // let data = stream.try_collect::>().await?; - // let format_opts = FormatOptions::new().with_types_info(true); - - // assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" - // +-------+------+ - // | a | b | - // | Int32 | Utf8 | - // +-------+------+ - // | 1 | r1 | - // | 3 | r3 | - // | 5 | r5 | - // | 7 | r7 | - // +-------+------+ - // "); - - // Ok(()) - // } - - // #[tokio::test] - // // Test that Selection::ExcludeByIndex excludes specific row indices. - // async fn test_selection_exclude_by_index() -> anyhow::Result<()> { - // use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - - // let object_store = Arc::new(InMemory::new()) as Arc; - // let file_path = "/path/file.vortex"; - - // let batch = make_test_batch_with_10_rows(); - // let data_size = - // write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - // let table_schema = batch.schema(); - // let file_meta = make_meta(file_path, data_size); - // let mut file = PartitionedFile::new(file_path.to_string(), data_size); - // file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter( - // vec![0, 2, 4, 6, 8], - // )))); - - // let opener = make_test_opener( - // object_store.clone(), - // table_schema.clone(), - // Some(vec![0, 1].into()), - // ); - - // let stream = opener.open(file_meta, file)?.await?; - // let data = stream.try_collect::>().await?; - // let format_opts = FormatOptions::new().with_types_info(true); - - // assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" - // +-------+------+ - // | a | b | - // | Int32 | Utf8 | - // +-------+------+ - // | 1 | r1 | - // | 3 | r3 | - // | 5 | r5 | - // | 7 | r7 | - // | 9 | r9 | - // +-------+------+ - // "); - - // Ok(()) - // } - - // #[tokio::test] - // // Test that Selection::All returns all rows. - // async fn test_selection_all() -> anyhow::Result<()> { - // let object_store = Arc::new(InMemory::new()) as Arc; - // let file_path = "/path/file.vortex"; - - // let batch = make_test_batch_with_10_rows(); - // let data_size = - // write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - // let table_schema = batch.schema(); - // let file_meta = make_meta(file_path, data_size); - // let mut file = PartitionedFile::new(file_path.to_string(), data_size); - // file.extensions = Some(Arc::new(Selection::All)); - - // let opener = make_test_opener( - // object_store.clone(), - // table_schema.clone(), - // Some(vec![0].into()), - // ); - - // let stream = opener.open(file_meta, file)?.await?; - // let data = stream.try_collect::>().await?; - - // let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); - // assert_eq!(total_rows, 10); - - // Ok(()) - // } - - // #[tokio::test] - // // Test that when no extensions are provided, all rows are returned (backward compatibility). - // async fn test_selection_no_extensions() -> anyhow::Result<()> { - // let object_store = Arc::new(InMemory::new()) as Arc; - // let file_path = "/path/file.vortex"; - - // let batch = make_test_batch_with_10_rows(); - // let data_size = - // write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - // let table_schema = batch.schema(); - // let file_meta = make_meta(file_path, data_size); - // let file = PartitionedFile::new(file_path.to_string(), data_size); - // // file.extensions is None by default - - // let opener = make_test_opener( - // object_store.clone(), - // table_schema.clone(), - // Some(vec![0].into()), - // ); - - // let stream = opener.open(file_meta, file)?.await?; - // let data = stream.try_collect::>().await?; - - // let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); - // assert_eq!(total_rows, 10); - - // Ok(()) - // } + fn make_test_batch_with_10_rows() -> RecordBatch { + record_batch!( + ("a", Int32, (0..=9).map(Some).collect::>()), + ( + "b", + Utf8, + (0..=9).map(|i| Some(format!("r{}", i))).collect::>() + ) + ) + .unwrap() + } + + fn make_test_opener( + object_store: Arc, + schema: SchemaRef, + projection: Option>, + ) -> VortexOpener { + VortexOpener { + session: SESSION.clone(), + object_store, + projection, + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + table_schema: TableSchema::from_file_schema(schema), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + } + } + + #[tokio::test] + // Test that Selection::IncludeByIndex filters to specific row indices. + async fn test_selection_include_by_index() -> anyhow::Result<()> { + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; + use vortex::buffer::Buffer; + use vortex::scan::Selection; + + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection( + Selection::IncludeByIndex(Buffer::from_iter(vec![1, 3, 5, 7])), + ))); + + let opener = make_test_opener( + object_store.clone(), + table_schema.clone(), + Some(vec![0, 1].into()), + ); + + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + let format_opts = FormatOptions::new().with_types_info(true); + + assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" + +-------+------+ + | a | b | + | Int32 | Utf8 | + +-------+------+ + | 1 | r1 | + | 3 | r3 | + | 5 | r5 | + | 7 | r7 | + +-------+------+ + "); + + Ok(()) + } + + #[tokio::test] + // Test that Selection::ExcludeByIndex excludes specific row indices. + async fn test_selection_exclude_by_index() -> anyhow::Result<()> { + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; + use vortex::buffer::Buffer; + use vortex::scan::Selection; + + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection( + Selection::ExcludeByIndex(Buffer::from_iter(vec![0, 2, 4, 6, 8])), + ))); + + let opener = make_test_opener( + object_store.clone(), + table_schema.clone(), + Some(vec![0, 1].into()), + ); + + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + let format_opts = FormatOptions::new().with_types_info(true); + + assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" + +-------+------+ + | a | b | + | Int32 | Utf8 | + +-------+------+ + | 1 | r1 | + | 3 | r3 | + | 5 | r5 | + | 7 | r7 | + | 9 | r9 | + +-------+------+ + "); + + Ok(()) + } + + #[tokio::test] + // Test that Selection::All returns all rows. + async fn test_selection_all() -> anyhow::Result<()> { + use vortex::scan::Selection; + + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new( + VortexAccessPlan::default().with_selection(Selection::All), + )); + + let opener = make_test_opener( + object_store.clone(), + table_schema.clone(), + Some(vec![0].into()), + ); + + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + + let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 10); + + Ok(()) + } + + #[tokio::test] + // Test that when no extensions are provided, all rows are returned (backward compatibility). + async fn test_selection_no_extensions() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let file = PartitionedFile::new(file_path.to_string(), data_size); + // file.extensions is None by default + + let opener = make_test_opener( + object_store.clone(), + table_schema.clone(), + Some(vec![0].into()), + ); + + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + + let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 10); + + Ok(()) + } }