Skip to content

Conversation

@robtandy
Copy link
Collaborator

@robtandy robtandy commented Aug 21, 2025

Currently our strategy further divide stages into smaller units of work involves splitting the number of partitions in the stage into tasks, each responsible for a portion of the partitions (facilitated by including a PartitionIsolator).

This works when we separate stages at a RepartitionExec boundary, but fails when a node of the plan needs to materialize all data across all partitions. NestedLoopJoinExec is one such node.

This PR adds a can_be_divided() function to return a boolean if a plan can be divided further. This is used in planning to decide if the stage can be separated into tasks.

For example, for TPCH Query 22, with partitions=3 and partitions_per_task=2 we generate a
physical plan:

┌───── Stage 6   Task: partitions: 0,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
│partitions [out:1  <-- in:3  ] SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
│partitions [out:3  <-- in:3  ]   SortExec: expr=[cntrycode@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
│partitions [out:3  <-- in:3  ]     ProjectionExec: expr=[cntrycode@0 as cntrycode, count(Int64(1))@1 as numcust, sum(custsale.c_acctbal)@2 as totacctbal]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
│partitions [out:3  <-- in:3  ]       AggregateExec: mode=FinalPartitioned, gby=[cntrycode@0 as cntrycode], aggr=[count(Int64(1)), sum(custsale.c_acctbal)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
│partitions [out:3  <-- in:3  ]         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
│partitions [out:3            ]           ArrowFlightReadExec: Stage 5                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
└──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
  ┌───── Stage 5   Task: partitions: 0,1,unassigned],Task: partitions: 2,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
  │partitions [out:3  <-- in:2  ] RepartitionExec: partitioning=Hash([cntrycode@0], 3), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
  │partitions [out:2  <-- in:3  ]   PartitionIsolatorExec [providing upto 2 partitions]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
  │partitions [out:3  <-- in:3  ]     AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], aggr=[count(Int64(1)), sum(custsale.c_acctbal)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
  │partitions [out:3  <-- in:3  ]       ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
  │partitions [out:3  <-- in:1  ]         NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > avg(customer.c_acctbal)@1, projection=[c_phone@1, c_acctbal@2]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
  │partitions [out:1  <-- in:1  ]           AggregateExec: mode=Final, gby=[], aggr=[avg(customer.c_acctbal)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
  │partitions [out:1  <-- in:3  ]             CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
  │partitions [out:3  <-- in:3  ]               AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
  │partitions [out:3  <-- in:3  ]                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
  │partitions [out:3  <-- in:3  ]                   FilterExec: c_acctbal@1 > Some(0),15,2 AND substr(c_phone@0, 1, 2) IN ([Literal { value: Utf8View("13"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("31"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("23"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("29"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_i
d: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("30"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("18"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("17"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]), projection=[c_acctbal@1]                                                                                                                                                           
  │partitions [out:3            ]                     ArrowFlightReadExec: Stage 1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
  │partitions [out:3  <-- in:3  ]           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
  │partitions [out:3  <-- in:3  ]             HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
  │partitions [out:3  <-- in:3  ]               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
  │partitions [out:3            ]                 ArrowFlightReadExec: Stage 3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
  │partitions [out:3  <-- in:3  ]               CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
  │partitions [out:3            ]                 ArrowFlightReadExec: Stage 4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
  └──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
    ┌───── Stage 1   Task: partitions: 0,1,unassigned],Task: partitions: 2,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
    │partitions [out:3  <-- in:2  ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
    │partitions [out:2  <-- in:1  ]   PartitionIsolatorExec [providing upto 2 partitions]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
    │partitions [out:1            ]     DataSourceExec: file_groups={1 group: [[Users/rob.tandy/github/datafusion-distributed/testdata/tpch/data/customer.parquet/1.parquet.parquet]]}, projection=[c_phone, c_acctbal], file_type=parquet, predicate=c_acctbal@1 > Some(0),15,2 AND substr(c_phone@0, 1, 2) IN ([Literal { value: Utf8View("13"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("31"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("23"), field: Field { name: "lit", data_type: Utf8
View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("29"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("30"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("18"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("17"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]
), pruning_predicate=c_acctbal_null_count@1 != row_count@2 AND c_acctbal_max@0 > Some(0),15,2, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
    └──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
    ┌───── Stage 3   Task: partitions: 0,1,unassigned],Task: partitions: 2,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
    │partitions [out:3  <-- in:2  ] RepartitionExec: partitioning=Hash([c_custkey@0], 3), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
    │partitions [out:2  <-- in:3  ]   PartitionIsolatorExec [providing upto 2 partitions]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
    │partitions [out:3  <-- in:3  ]     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
    │partitions [out:3  <-- in:3  ]       FilterExec: substr(c_phone@1, 1, 2) IN ([Literal { value: Utf8View("13"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("31"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("23"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("29"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {
} } }, Literal { value: Utf8View("30"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("18"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("17"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }])                                                                                                                                                                                                                              
    │partitions [out:3            ]         ArrowFlightReadExec: Stage 2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
    └──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
      ┌───── Stage 2   Task: partitions: 0,1,unassigned],Task: partitions: 2,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
      │partitions [out:3  <-- in:2  ] RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
      │partitions [out:2  <-- in:1  ]   PartitionIsolatorExec [providing upto 2 partitions]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
      │partitions [out:1            ]     DataSourceExec: file_groups={1 group: [[Users/rob.tandy/github/datafusion-distributed/testdata/tpch/data/customer.parquet/1.parquet.parquet]]}, projection=[c_custkey, c_phone, c_acctbal], file_type=parquet, predicate=substr(c_phone@1, 1, 2) IN ([Literal { value: Utf8View("13"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("31"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("23"), field: Field { name: "lit", data_type: Utf8View, nullable: fa
lse, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("29"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("30"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("18"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("17"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }])                 
      └──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
    ┌───── Stage 4   Task: partitions: 0,1,unassigned],Task: partitions: 2,unassigned]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
    │partitions [out:3  <-- in:2  ] RepartitionExec: partitioning=Hash([o_custkey@0], 3), input_partitions=2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
    │partitions [out:2  <-- in:1  ]   PartitionIsolatorExec [providing upto 2 partitions]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
    │partitions [out:1            ]     DataSourceExec: file_groups={1 group: [[Users/rob.tandy/github/datafusion-distributed/testdata/tpch/data/orders.parquet/1.parquet.parquet]]}, projection=[o_custkey], file_type=parquet                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
    └──────────────────────────────────────────────────                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

and a distributed plan that looks like:
image

Note that Stage 5 in TPCH 22 has a NestedLoopJoin which wants to fully materialize one side of the query. Because stage 3 is in two tasks, we attempt to do this twice, causing an error when we read the same partition twice in child stages.

Specifying that NestedLoopJoinExec cannot be split produces a plan like:
image
Keeping the nested loop join in a single task, and addressing the problem.

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

///
/// The plans we cannot split are:
/// - NestedLoopJoinExec
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, if a function is only used in one place, it can be better in the long run to just place it where it's used, as it's not really a common function.

Otherwise, following the same rule, we can end-up with a massive "utils" or "common" modules with a lot of unrelated stuff that is not really commonly used across the project.

This one for example could just be placed in physical_optimizer.rs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! This is why i always end up with massive utils and common in my projects! Good feedback. I'll move to physical_optimizer.rs. 😅

Comment on lines -122 to -123
// TODO: Add support for NestedLoopJoinExec to support query 22.
#[ignore]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@robtandy robtandy merged commit 113cc1b into main Aug 21, 2025
2 of 3 checks passed
@robtandy robtandy deleted the robtandy/nested_loop_joins branch August 21, 2025 13:51
@robtandy robtandy mentioned this pull request Aug 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants