Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: 2
updates:
- package-ecosystem: "cargo"
directory: "/"
schedule:
interval: "weekly"
groups:
rust-dependencies:
patterns:
- "*"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
88 changes: 73 additions & 15 deletions src/physical_plan/exec/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use datafusion::physical_plan::{
use futures::stream::{Stream, StreamExt};

use crate::physical_plan::exec::index::IndexScanExec;
use crate::physical_plan::exec::sequential_union::SequentialUnionExec;
use crate::physical_plan::fetcher::RecordFetcher;
use crate::physical_plan::joins::try_create_index_lookup_join;
use crate::physical_plan::{create_index_schema, ROW_ID_COLUMN_NAME};
use crate::types::{IndexFilter, IndexFilters};
use crate::types::{IndexFilter, IndexFilters, UnionMode};
use datafusion::arrow::datatypes::Schema;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::empty::EmptyExec;
Expand All @@ -68,15 +69,25 @@ pub struct RecordFetchExec {
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
schema: SchemaRef,
/// Controls how union operations are executed for OR conditions.
union_mode: UnionMode,
}

impl RecordFetchExec {
/// Create a new `RecordFetchExec` plan.
///
/// # Arguments
/// * `indexes` - Index filters to use for scanning
/// * `limit` - Optional limit on the number of rows
/// * `record_fetcher` - The fetcher to retrieve records by row ID
/// * `schema` - Output schema
/// * `union_mode` - Controls whether OR conditions use parallel or sequential union
pub fn try_new(
indexes: Vec<IndexFilter>,
limit: Option<usize>,
record_fetcher: Arc<dyn RecordFetcher>,
schema: SchemaRef,
union_mode: UnionMode,
) -> Result<Self> {
if indexes.is_empty() {
return Err(DataFusionError::Plan(
Expand All @@ -91,7 +102,7 @@ impl RecordFetchExec {
}

let input = match indexes.first() {
Some(index_filter) => Self::build_scan_exec(index_filter, limit)?,
Some(index_filter) => Self::build_scan_exec(index_filter, limit, union_mode)?,
None => {
return Err(DataFusionError::Plan(
"RecordFetchExec requires at least one index".to_string(),
Expand All @@ -114,6 +125,7 @@ impl RecordFetchExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
schema,
union_mode,
})
}

Expand Down Expand Up @@ -179,6 +191,7 @@ impl RecordFetchExec {
/// # Arguments
/// * `index_filter` - The [`IndexFilter`] tree specifying which indexes to scan and how to combine them
/// * `limit` - Optional limit on the number of rows to return, passed through to individual index scans
/// * `union_mode` - Controls whether OR conditions use parallel or sequential union
///
/// # Returns
/// An [`Arc<dyn ExecutionPlan>`] that produces a stream of row IDs matching the filter criteria.
Expand All @@ -192,6 +205,7 @@ impl RecordFetchExec {
fn build_scan_exec(
index_filter: &IndexFilter,
limit: Option<usize>,
union_mode: UnionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
match index_filter {
IndexFilter::Single { index, filter } => {
Expand All @@ -217,7 +231,7 @@ impl RecordFetchExec {
IndexFilter::And(filters) => {
let mut plans = filters
.iter()
.map(|f| Self::build_scan_exec(f, limit))
.map(|f| Self::build_scan_exec(f, limit, union_mode))
.collect::<Result<Vec<_>>>()?;

if plans.is_empty() {
Expand All @@ -236,7 +250,7 @@ impl RecordFetchExec {
IndexFilter::Or(filters) => {
let original_plans = filters
.iter()
.map(|f| Self::build_scan_exec(f, limit))
.map(|f| Self::build_scan_exec(f, limit, union_mode))
.collect::<Result<Vec<_>>>()?;

if original_plans.is_empty() {
Expand Down Expand Up @@ -284,8 +298,13 @@ impl RecordFetchExec {
}
}

// Now all plans have identical schemas, UnionExec will work
let union_input = UnionExec::try_new(normalized_plans)?;
// Now all plans have identical schemas - create union based on mode
let union_input: Arc<dyn ExecutionPlan> = match union_mode {
UnionMode::Parallel => UnionExec::try_new(normalized_plans)?,
UnionMode::Sequential => {
Arc::new(SequentialUnionExec::try_new(normalized_plans)?)
}
};

// Create aggregate to deduplicate row IDs
let group_expr = PhysicalGroupBy::new_single(vec![(
Expand Down Expand Up @@ -377,6 +396,7 @@ impl ExecutionPlan for RecordFetchExec {
input: children[0].clone(),
metrics: self.metrics.clone(),
schema: self.schema.clone(),
union_mode: self.union_mode,
}))
}

Expand Down Expand Up @@ -949,8 +969,14 @@ mod tests {
#[tokio::test]
async fn test_record_fetch_exec_no_indexes() {
let fetcher = Arc::new(MockRecordFetcher::new());
let err =
RecordFetchExec::try_new(vec![], None, fetcher, Arc::new(Schema::empty())).unwrap_err();
let err = RecordFetchExec::try_new(
vec![],
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)
.unwrap_err();
assert!(
matches!(err, DataFusionError::Plan(ref msg) if msg == "RecordFetchExec requires at least one index"),
"Unexpected error: {err:?}"
Expand All @@ -970,7 +996,13 @@ mod tests {
}];

let fetcher = Arc::new(MockRecordFetcher::new());
let exec = RecordFetchExec::try_new(indexes, None, fetcher, Arc::new(Schema::empty()))?;
let exec = RecordFetchExec::try_new(
indexes,
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)?;

// The input plan should be just the IndexScanExec
assert_eq!(exec.input.name(), "IndexScanExec");
Expand Down Expand Up @@ -1004,7 +1036,13 @@ mod tests {
])];

let fetcher = Arc::new(MockRecordFetcher::new());
let exec = RecordFetchExec::try_new(indexes, None, fetcher, Arc::new(Schema::empty()))?;
let exec = RecordFetchExec::try_new(
indexes,
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)?;

// The input plan should be a HashJoinExec
assert_eq!(exec.input.name(), "HashJoinExec");
Expand All @@ -1027,7 +1065,13 @@ mod tests {

let indexes = vec![IndexFilter::And(indexes_vec)];
let fetcher = Arc::new(MockRecordFetcher::new());
let exec = RecordFetchExec::try_new(indexes, None, fetcher, Arc::new(Schema::empty()))?;
let exec = RecordFetchExec::try_new(
indexes,
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)?;

// The input plan should be a tree of HashJoinExecs
assert_eq!(exec.input.name(), "HashJoinExec");
Expand Down Expand Up @@ -1063,7 +1107,8 @@ mod tests {
let schema = fetcher.schema();

// 2. Create exec plan
let exec = RecordFetchExec::try_new(indexes, None, fetcher, schema.clone())?;
let exec =
RecordFetchExec::try_new(indexes, None, fetcher, schema.clone(), UnionMode::Parallel)?;

// 3. Execute and collect results
let task_ctx = Arc::new(TaskContext::default());
Expand Down Expand Up @@ -1100,7 +1145,13 @@ mod tests {
let fetcher = Arc::new(MockRecordFetcher::new().with_data());

// 2. Create exec plan
let exec = RecordFetchExec::try_new(indexes, None, fetcher, Arc::new(Schema::empty()))?;
let exec = RecordFetchExec::try_new(
indexes,
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)?;

// 3. Execute and collect results
let task_ctx = Arc::new(TaskContext::default());
Expand Down Expand Up @@ -1136,7 +1187,8 @@ mod tests {
let schema = fetcher.schema();

// 2. Create exec plan
let exec = RecordFetchExec::try_new(indexes, None, fetcher, schema.clone())?;
let exec =
RecordFetchExec::try_new(indexes, None, fetcher, schema.clone(), UnionMode::Parallel)?;

// 3. Execute and collect results
let task_ctx = Arc::new(TaskContext::default());
Expand Down Expand Up @@ -1193,7 +1245,13 @@ mod tests {
let fetcher = Arc::new(ErrorFetcher);

// 2. Create exec plan
let exec = RecordFetchExec::try_new(indexes, None, fetcher, Arc::new(Schema::empty()))?;
let exec = RecordFetchExec::try_new(
indexes,
None,
fetcher,
Arc::new(Schema::empty()),
UnionMode::Parallel,
)?;

// 3. Execute and expect an error
let task_ctx = Arc::new(TaskContext::default());
Expand Down
1 change: 1 addition & 0 deletions src/physical_plan/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
//! Physical `ExecutionPlan` operators.
pub mod fetch;
pub mod index;
pub mod sequential_union;
Loading