Skip to content

Commit db7b8cc

Browse files
authored
Implement partition_statistics API for NestedLoopJoinExec (#19468)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #15873 ## Rationale for this change After the introduction of the `partition_statistics` API there are some operators which don't support this new API, the goal is to update the operators with new API. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Updated `NestedLoopJoinExec` with the new `partition_statistics` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? - Added relevant tests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 8959b3d commit db7b8cc

File tree

3 files changed

+130
-10
lines changed

3 files changed

+130
-10
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ mod test {
4646
use datafusion_physical_plan::common::compute_record_batch_statistics;
4747
use datafusion_physical_plan::empty::EmptyExec;
4848
use datafusion_physical_plan::filter::FilterExec;
49-
use datafusion_physical_plan::joins::CrossJoinExec;
49+
use datafusion_physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec};
5050
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5151
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
5252
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
@@ -638,6 +638,81 @@ mod test {
638638
Ok(())
639639
}
640640

641+
#[tokio::test]
642+
async fn test_statistic_by_partition_of_nested_loop_join() -> Result<()> {
643+
use datafusion_expr::JoinType;
644+
645+
let left_scan = create_scan_exec_with_statistics(None, Some(2)).await;
646+
let left_scan_coalesced: Arc<dyn ExecutionPlan> =
647+
Arc::new(CoalescePartitionsExec::new(left_scan));
648+
649+
let right_scan = create_scan_exec_with_statistics(None, Some(2)).await;
650+
651+
let nested_loop_join: Arc<dyn ExecutionPlan> =
652+
Arc::new(NestedLoopJoinExec::try_new(
653+
left_scan_coalesced,
654+
right_scan,
655+
None,
656+
&JoinType::RightSemi,
657+
None,
658+
)?);
659+
660+
// Test partition_statistics(None) - returns overall statistics
661+
// For RightSemi join, output columns come from right side only
662+
let full_statistics = nested_loop_join.partition_statistics(None)?;
663+
// With empty join columns, estimate_join_statistics returns Inexact row count
664+
// based on the outer side (right side for RightSemi)
665+
let mut expected_full_statistics = create_partition_statistics(
666+
4,
667+
32,
668+
1,
669+
4,
670+
Some((DATE_2025_03_01, DATE_2025_03_04)),
671+
);
672+
expected_full_statistics.num_rows = Precision::Inexact(4);
673+
expected_full_statistics.total_byte_size = Precision::Absent;
674+
assert_eq!(full_statistics, expected_full_statistics);
675+
676+
// Test partition_statistics(Some(idx)) - returns partition-specific statistics
677+
// Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
678+
let mut expected_statistic_partition_1 = create_partition_statistics(
679+
2,
680+
16,
681+
3,
682+
4,
683+
Some((DATE_2025_03_01, DATE_2025_03_02)),
684+
);
685+
expected_statistic_partition_1.num_rows = Precision::Inexact(2);
686+
expected_statistic_partition_1.total_byte_size = Precision::Absent;
687+
688+
// Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
689+
let mut expected_statistic_partition_2 = create_partition_statistics(
690+
2,
691+
16,
692+
1,
693+
2,
694+
Some((DATE_2025_03_03, DATE_2025_03_04)),
695+
);
696+
expected_statistic_partition_2.num_rows = Precision::Inexact(2);
697+
expected_statistic_partition_2.total_byte_size = Precision::Absent;
698+
699+
let statistics = (0..nested_loop_join.output_partitioning().partition_count())
700+
.map(|idx| nested_loop_join.partition_statistics(Some(idx)))
701+
.collect::<Result<Vec<_>>>()?;
702+
assert_eq!(statistics.len(), 2);
703+
assert_eq!(statistics[0], expected_statistic_partition_1);
704+
assert_eq!(statistics[1], expected_statistic_partition_2);
705+
706+
// Check the statistics_by_partition with real results
707+
let expected_stats = vec![
708+
ExpectedStatistics::NonEmpty(3, 4, 2),
709+
ExpectedStatistics::NonEmpty(1, 2, 2),
710+
];
711+
validate_statistics_with_data(nested_loop_join, expected_stats, 0).await?;
712+
713+
Ok(())
714+
}
715+
641716
#[tokio::test]
642717
async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
643718
let scan = create_scan_exec_with_statistics(None, Some(2)).await;

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -550,17 +550,34 @@ impl ExecutionPlan for NestedLoopJoinExec {
550550
}
551551

552552
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
553-
if partition.is_some() {
554-
return Ok(Statistics::new_unknown(&self.schema()));
555-
}
553+
// NestedLoopJoinExec is designed for joins without equijoin keys in the
554+
// ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join
555+
// predicates are stored in `self.filter`, but `estimate_join_statistics`
556+
// currently doesn't support selectivity estimation for such arbitrary
557+
// filter expressions. We pass an empty join column list, which means
558+
// the cardinality estimation cannot use column statistics and returns
559+
// unknown row counts.
556560
let join_columns = Vec::new();
557-
estimate_join_statistics(
558-
self.left.partition_statistics(None)?,
559-
self.right.partition_statistics(None)?,
561+
562+
// Left side is always a single partition (Distribution::SinglePartition),
563+
// so we always request overall stats with `None`. Right side can have
564+
// multiple partitions, so we forward the partition parameter to get
565+
// partition-specific statistics when requested.
566+
let left_stats = self.left.partition_statistics(None)?;
567+
let right_stats = match partition {
568+
Some(partition) => self.right.partition_statistics(Some(partition))?,
569+
None => self.right.partition_statistics(None)?,
570+
};
571+
572+
let stats = estimate_join_statistics(
573+
left_stats,
574+
right_stats,
560575
&join_columns,
561576
&self.join_type,
562-
&self.schema(),
563-
)
577+
&self.join_schema,
578+
)?;
579+
580+
Ok(stats.project(self.projection.as_ref()))
564581
}
565582

566583
/// Tries to push `projection` down through `nested_loop_join`. If possible, performs the

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,35 @@ struct PartialJoinStatistics {
411411
pub column_statistics: Vec<ColumnStatistics>,
412412
}
413413

414-
/// Estimate the statistics for the given join's output.
414+
/// Estimates the output statistics for a join operation based on input statistics.
415+
///
416+
/// # Statistics Propagation
417+
///
418+
/// This function estimates join output statistics using the following approach:
419+
/// - **Row count estimation**: Uses the `on` parameter (equijoin keys) to estimate
420+
/// output cardinality via [`estimate_join_cardinality`]. The estimation is based on
421+
/// column-level statistics (distinct counts, min/max values) of the join keys.
422+
/// - **Column statistics**: Combines column statistics from both inputs. For join types
423+
/// that preserve all columns (Inner, Left, Right, Full), statistics from both sides
424+
/// are concatenated. For semi/anti joins, only the relevant side's statistics are kept.
425+
/// - **Byte size**: Always returns `Precision::Absent` as join output size is difficult
426+
/// to estimate without knowing the actual data.
427+
///
428+
/// # The `on` Parameter
429+
///
430+
/// The `on` parameter represents equijoin keys (e.g., `t1.id = t2.id`). When `on` is
431+
/// empty (as in NestedLoopJoinExec which handles non-equijoin predicates), the
432+
/// cardinality estimation cannot compute selectivity from join keys, and this function
433+
/// returns unknown statistics (`num_rows: Precision::Absent`).
434+
///
435+
/// # Limitations
436+
///
437+
/// - Does not account for selectivity of arbitrary join filter expressions
438+
/// (e.g., `(t1.v1 + t2.v1) % 2 = 0`). Such filters, common in NestedLoopJoinExec,
439+
/// are not factored into the cardinality estimation.
440+
/// - Column statistics for the output are simply combined from inputs without
441+
/// adjusting for join selectivity (acknowledged in the code as needing
442+
/// "filter selectivity analysis").
415443
pub(crate) fn estimate_join_statistics(
416444
left_stats: Statistics,
417445
right_stats: Statistics,

0 commit comments

Comments
 (0)