Skip to content

Conversation

@gabotechs
Copy link
Collaborator

There's a couple of reasons why we want to manually enforce that the head node of the head stage of a distributed plan is a CoalescePartitionsExec:

  • The StageExec node calls try_assing_urls lazily upon calling .execute(). This means that .execute() must only be called once, as we cannot afford to perform several random URL assignation while calling multiple partitions, as they will differ, producing an invalid plan
  • Having a CoalescePartitionsExec on top of the plan allows us to place a new NetworkCoalesceExec on the top of the plan, executing the full plan in parallel.

format!("Stage {:<3}{}", self.num, child_str)
}

pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result<Self> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove as it's unused

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a very nice fix and improvement. I just wonder what happened before this PR regarding the stage assigning task URLs.

│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
└──────────────────────────────────────────────────
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turns out to be a very good optimization for large data set that the final aggregation is done in many tasks/workers 👍 🎉 .

This isn’t directly related to the current PR, but worth noting as a future optimization:
If the input files are partitioned on the group-by columns RainToday, WindGustDir (i.e., the three files don’t overlap on those values), we can eliminate RoundRobin partitioning. This would allow stage 1 and stage 2 to merge into a single stage, executing both partial and final aggregation together. I’m currently exploring use cases for this and will share query plans soon to illustrate the idea more clearly.

// once on 1 partition.
if depth == 0 && distributed.data.output_partitioning().partition_count() > 1 {
distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What did we do before this PR? I am asking because I do not see any removed or modified code in this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we did pretty much nothing, and therefore, we were potentially executing multiple partitions with potentially conflicting URL assignations.

Copy link
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall :) It reminds me of how SQL databases stream a single stream to the client.

Some random thoughts

  • It will be interesting to see how this evolves if we decide to make a "distributed endpoint" as @fmonjalet mentioned where we may stream different partitions to a distributed consumer / client like kafka etc.
  • I'm curious if we can break the distributed optimizer rule into smaller rules (although having more than one rule makes it harder to migrate to distributed DF from vanilla DF)

@NGA-TRAN
Copy link
Collaborator

I'm curious if we can break the distributed optimizer rule into smaller rules (although having more than one rule makes it harder to migrate to distributed DF from vanilla DF)

Actually this is a a good idea. I bet in the future, we will be able to add more rules

@gabotechs
Copy link
Collaborator Author

It will be interesting to see how this evolves if we decide to make a "distributed endpoint" as @fmonjalet mentioned where we may stream different partitions to a distributed consumer / client like kafka etc.

👍 that makes sense. I'd say that's way too soon for the stage in which the project is in right now. Hopefully this feature can be driven by demand.

@gabotechs
Copy link
Collaborator Author

I'm curious if we can break the distributed optimizer rule into smaller rules (although having more than one rule makes it harder to migrate to distributed DF from vanilla DF)

I'd say right now is not too bad. The rule is pretty condensed and all the modifications provided go hand in hand with each other.

As this grows, it's something we should definitely keep an eye on.

@gabotechs gabotechs merged commit 3337ce8 into main Sep 29, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/add-batch-coalescing-at-the-head branch September 29, 2025 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants