Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Aug 6, 2025

Splits the distributed planning in two steps:

  1. A first step that places the appropriate ArrowFlightReadExec nodes in "pending" mode through the plan. These will serve as placeholders, just puts a logical network boundary in the appropriate places so that the second step can build stages out of that.
  2. A second step that inspects the plan looking for placed ArrowFlightReadExec nodes in "pending" mode, puts them in "ready" mode and wraps the plan in stages.

For example:

Given the following query:

SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)

and the following plan

        ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
          SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
            SortExec: expr=[count(Int64(1))@2 ASC NULLS LAST], preserve_partitioning=[true]
              ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
                AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
                  CoalesceBatchesExec: target_batch_size=8192
                    RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=4
                      RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
                        AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
                          DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
  1. Places the appropriate ArrowFlightReadExec nodes
        ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
          SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
            SortExec: expr=[count(Int64(1))@2 ASC NULLS LAST], preserve_partitioning=[true]
              ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
                AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
                  CoalesceBatchesExec: target_batch_size=8192
this ->             ArrowFlightReadExec
                      RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=2
this ->                 PartitionIsolatorExec [providing upto 2 partitions]
this ->                   ArrowFlightReadExec
                            RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
this ->                       PartitionIsolatorExec [providing upto 2 partitions]
                                AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
                                  DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
  1. Distributes the plan into stages
        ┌───── Stage 3   Task: partitions: 0,unassigned]
        │partitions [out:1  <-- in:1  ] ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
        │partitions [out:1  <-- in:4  ]   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
        │partitions [out:4  <-- in:4  ]     SortExec: expr=[count(Int64(1))@2 ASC NULLS LAST], preserve_partitioning=[true]
        │partitions [out:4  <-- in:4  ]       ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
        │partitions [out:4  <-- in:4  ]         AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
        │partitions [out:4  <-- in:4  ]           CoalesceBatchesExec: target_batch_size=8192
        │partitions [out:4            ]             ArrowFlightReadExec: Stage 2  
        │
        └──────────────────────────────────────────────────
          ┌───── Stage 2   Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned]
          │partitions [out:4  <-- in:2  ] RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=2
          │partitions [out:2  <-- in:4  ]   PartitionIsolatorExec [providing upto 2 partitions]
          │partitions [out:4            ]     ArrowFlightReadExec: Stage 1  
          │
          └──────────────────────────────────────────────────
            ┌───── Stage 1   Task: partitions: 0,1,unassigned],Task: partitions: 2,3,unassigned]
            │partitions [out:4  <-- in:2  ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
            │partitions [out:2  <-- in:1  ]   PartitionIsolatorExec [providing upto 2 partitions]
            │partitions [out:1  <-- in:1  ]     AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
            │partitions [out:1            ]       DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[RainToday], file_type=parquet
            │
            └──────────────────────────────────────────────────

The benefit is that now users are able to skip the first step, and choose to place the ArrowFlightReadExec nodes wherever they want, and just reuse the 2 step to convert their plans into stages.

@gabotechs gabotechs force-pushed the gabrielmusat/add-stage-planner-tests branch from 8cd19c2 to 8067b44 Compare August 6, 2025 10:29
@gabotechs gabotechs force-pushed the gabrielmusat/injectabe-arrow-flight-read-execs branch 2 times, most recently from b1c72c3 to c1f6d4c Compare August 6, 2025 10:37
@gabotechs gabotechs marked this pull request as ready for review August 6, 2025 13:42
Comment on lines -444 to -457
┌───── Stage 2 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
│partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
└──────────────────────────────────────────────────
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the depth here was wrong in the previous implementation, the new depth looks correct

Base automatically changed from gabrielmusat/add-stage-planner-tests to main August 6, 2025 16:41
@gabotechs gabotechs force-pushed the gabrielmusat/injectabe-arrow-flight-read-execs branch from c8d1e7f to 7b7c405 Compare August 6, 2025 16:46
@gabotechs gabotechs merged commit 4da5c5d into main Aug 6, 2025
3 checks passed
@gabotechs gabotechs deleted the gabrielmusat/injectabe-arrow-flight-read-execs branch August 6, 2025 18:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants