From 55f28b2ab7db46254a9bf65bb65507b447e62c8b Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Wed, 24 Sep 2025 14:50:28 +0200 Subject: [PATCH 1/3] Update README.md --- README.md | 261 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 155 insertions(+), 106 deletions(-) diff --git a/README.md b/README.md index f97e48e..bb26392 100644 --- a/README.md +++ b/README.md @@ -28,139 +28,188 @@ Before diving into the architecture, it's important to clarify some terms and wh - `worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. - `network boundary`: a node in the plan that streams data from a network interface rather than directly from its - children. Implemented as an `ArrowFlightReadExec` physical DataFusion node. -- `stage`: a portion of the plan separated by a network boundary from other parts of the plan. Implemented as any - other physical node in DataFusion. -- `task`: a unit of work inside a stage that executes a subset of its partitions in a specific worker. + children. Implemented as DataFusion `ExecutionPlan`s: `NeworkShuffle` and `NetworkCoalesce`. +- `stage`: a portion of the plan separated by a network boundary from other parts of the plan. + Implemented as a DataFusion `ExecutionPlan`: `StageExec`. +- `task`: a unit of work in a stage that executes the inner plan in parallel to other tasks. - `subplan`: a slice of the overall plan A distributed DataFusion query is executed in a very similar fashion as a normal DataFusion query with one key difference: -The physical plan is divided into stages, each stage is assigned tasks that run in parallel in different workers. All of -this is done at the physical plan level, and is implemented as a `PhysicalOptimizerRule` that: +The physical plan is divided into stages and each stage is assigned tasks that run the inner plan in parallel in +different workers. All of this is done at the physical plan level, implemented as a `PhysicalOptimizerRule` that: -1. Inspects the non-distributed plan, placing network boundaries (`ArrowFlightReadExec` nodes) in the appropriate places +1. Inspects the non-distributed plan, placing network boundaries (`NetworkShuflle` and `NetworkCoalesce` nodes) in + the appropriate places. 2. Based on the placed network boundaries, divides the plan into stages and assigns tasks to them. +### Types of network boundaries + +There are different flavors of network boundaries that can stream data over the wire across stages: + +- `NetworkShuffle`: the equivalent to a `RepartitionExec` that spreads out different partitions to different tasks + across stages. Refer to the [network_shuffle.rs](./src/execution_plans/network_shuffle.rs) docs for a more detailed + explanation. +- `NetworkCoalesce`: the equivalent to a `CoalescePartitionsExec` but with tasks. It collapses P partitions across + N tasks into a single task with N*P partitions. Refer to + the [network_coalesce.rs](./src/execution_plans/network_coalesce.rs) + docs for a more detailed explanation + +### Example plan distribution + For example, imagine we have a plan that looks like this: ``` - ┌──────────────────────┐ - │ ProjectionExec │ - └──────────────────────┘ - ▲ - ┌──────────┴───────────┐ - │ AggregateExec │ - │ (final) │ - └──────────────────────┘ - ▲ - ┌──────────┴───────────┐ - │ RepartionExec │ - │ (3 input partitions) │ - └──────────────────────┘ - ▲ - ┌──────────┴───────────┐ - │ AggregateExec │ - │ (partial) │ - └──────────────────────┘ - ▲ - ┌──────────┴───────────┐ - │ DataSourceExec │ - └──────────────────────┘ +┌───────────────────────┐ +│CoalescePartitionsExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ ProjectionExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ AggregateExec │ +│ (final) │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ RepartitionExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ AggregateExec │ +│ (partial) │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ DataSourceExec │ +└───────────────────────┘ ``` -We want to distribute the aggregation to something like this: +By looking at the existing nodes, we can decide to place some network boundaries like this: ``` - ┌──────────────────────┐ - │ ProjectionExec │ - └──────────────────────┘ - ▲ - ┌──────────┴───────────┐ - │ AggregateExec │ - │ (final) │ - └──────────────────────┘ - ▲ ▲ ▲ - ┌────────────────────────┘ │ └─────────────────────────┐ - ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ - │ AggregateExec │ │ AggregateExec │ │ AggregateExec │ - │ (partial) │ │ (partial) │ │ (partial) │ - └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ - ▲ ▲ ▲ - ┌──────────┴───────────┐ ┌──────────┴───────────┐ ┌───────────┴──────────┐ - │ DataSourceExec │ │ DataSourceExec │ │ DataSourceExec │ - └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ +┌───────────────────────┐ +│CoalescePartitionsExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ NetworkCoalesceExec │ <- injected during distributed planning. This will collapse all tasks into one, +└───────────────────────┘ wihtout performing any changes in the overall partitioning scheme. + ▲ + │ +┌───────────────────────┐ +│ ProjectionExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ AggregateExec │ +│ (final) │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ NetworkShuffleExec │ <- injected during distributed planning. This will shuffle the data across the network, +└───────────────────────┘ fanning out the different partitions to different workers. + ▲ + │ +┌───────────────────────┐ +│ RepartitionExec │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ AggregateExec │ +│ (partial) │ +└───────────────────────┘ + ▲ + │ +┌───────────────────────┐ +│ ParititonIsolatorExec │ <- injected during distributed planning. As this lower part of the plan will run in multiple +└───────────────────────┘ tasks in parallel, this node makes sure no two same partitions from the underlying + ▲ DataSourceExec are read in two different tasks. + │ +┌───────────────────────┐ +│ DataSourceExec │ +└───────────────────────┘ ``` -The first step is to place the `ArrowFlightReadExec` network boundary in the appropriate place (the following drawing -shows the partitioning scheme in each node): +Once the network boundaries are properly placed, the distributed planner will break down the plan into stages, +and will assign tasks to them: ``` - ┌──────────────────────┐ - │ ProjectionExec │ - └─────────[0]──────────┘ - ▲ - ┌──────────┴───────────┐ - │ AggregateExec │ - │ (final) │ - └─────────[0]──────────┘ - ▲ - ┌──────────┴───────────┐ - │ ArrowFlightRead │ <- this node was injected to tell the distributed planner - │ (3 input tasks) │ that there must be a network boundary here. - └──────[0][1][2]───────┘ - ▲ ▲ ▲ - ┌───────┴──┴──┴────────┐ - │ AggregateExec │ - │ (partial) │ - └──────[0][1][2]───────┘ - ▲ ▲ ▲ - ┌───────┴──┴──┴────────┐ - │ DataSourceExec │ - └──────[0][1][2]───────┘ -``` - -Based on that boundary, the plan is divided into stages, and tasks are assigned to each stage. Each task will be -responsible for the different partitions in the original plan. - -``` - ┌────── (stage 2) ───────┐ - │┌──────────────────────┐│ - ││ ProjectionExec ││ - │└──────────┬───────────┘│ - │┌──────────┴───────────┐│ - ││ AggregateExec ││ - ││ (final) ││ - │└──────────┬───────────┘│ - │┌──────────┴───────────┐│ - ││ ArrowFlightReadExec ││ - │└──────[0][1][2]───────┘│ - └─────────▲─▲─▲──────────┘ - ┌────────────────────────┘ │ └─────────────────────────┐ - │ │ │ - ┌─── task 0 (stage 1) ───┐ ┌── task 1 (stage 1) ────┐ ┌── task 2 (stage 1) ────┐ - │ │ │ │ │ │ │ │ │ - │┌─────────[0]──────────┐│ │┌─────────[1]──────────┐│ │┌──────────[2]─────────┐│ - ││ AggregateExec ││ ││ AggregateExec ││ ││ AggregateExec ││ - ││ (partial) ││ ││ (partial) ││ ││ (partial) ││ - │└──────────┬───────────┘│ │└──────────┬───────────┘│ │└───────────┬──────────┘│ - │┌─────────[0]──────────┐│ │┌─────────[1]──────────┐│ │┌──────────[2]─────────┐│ - ││ DataSourceExec ││ ││ DataSourceExec ││ ││ DataSourceExec ││ - │└──────────────────────┘│ │└──────────────────────┘│ │└──────────────────────┘│ - └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ +┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Stage 3 + ┌───────────────────────┐ +│ │CoalescePartitionsExec │ │ + └───────────────────────┘ +│ ▲ │ + │ +│ ┌───────────────────────┐ │ + │ NetworkCoalesceExec │ +│ └───────────────────────┘ │ + ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▲ ─ ▲ ─ ▲ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + ┌────────────────┘ │ └────────────────┐ +┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ Stage 2 + ┌───────────────────┐┌───────────────────┐┌───────────────────┐ +│ │ ProjectionExec ││ ProjectionExec ││ ProjectionExec │ │ + └───────────────────┘└───────────────────┘└───────────────────┘ +│ ▲ ▲ ▲ │ + │ │ │ +│ ┌───────────────────┐┌───────────────────┐┌───────────────────┐ │ + │ AggregateExec ││ AggregateExec ││ AggregateExec │ +│ │ (final) ││ (final) ││ (final) │ │ + └───────────────────┘└───────────────────┘└───────────────────┘ +│ ▲ ▲ ▲ │ + │ │ │ +│ ┌───────────────────┐┌───────────────────┐┌───────────────────┐ │ + │NetworkShuffleExec ││NetworkShuffleExec ││NetworkShuffleExec │ +│ └───────────────────┘└───────────────────┘└───────────────────┘ │ + ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▲ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ + └────────┬───────────┴──────────┬─────────┘ +┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ Stage 1 + ┌─────────────────────┐┌─────────────────────┐ +│ │ RepartitionExec ││ RepartitionExec │ │ + └─────────────────────┘└─────────────────────┘ +│ ▲ ▲ │ + │ │ +│ ┌─────────────────────┐┌─────────────────────┐ │ + │ AggregateExec ││ AggregateExec │ +│ │ (partial) ││ (partial) │ │ + └─────────────────────┘└─────────────────────┘ +│ ▲ ▲ │ + │ │ +│ ┌─────────────────────┐┌─────────────────────┐ │ + │PartitionIsolatorExec││PartitionIsolatorExec│ +│ └─────────────────────┘└─────────────────────┘ │ + ▲ ▲ +│ │ │ │ + ┌─────────────────────┐┌─────────────────────┐ +│ │ DataSourceExec ││ DataSourceExec │ │ + └─────────────────────┘└─────────────────────┘ +└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ``` The plan is immediately executable, and the same process that planned the distributed query can start executing the head -stage (stage 2). The `ArrowFlightReadExec` in that stage will know from which tasks to gather data from stage 1, and -will issue 3 concurrent Arrow Flight requests to the appropriate physical nodes. +stage (stage 3). The `NetworkCoalesceExec` in that stage will know from which tasks to gather data from stage 2, and +will issue 3 concurrent Arrow Flight requests to the appropriate physical nodes. Same goes from stage 2 to stage 1, +but with the difference that this time data is repartitioned and shuffled, that way each task in stage 2 handles a +different set of partitions. This means that: 1. The head stage is executed normally as if the query was not distributed. -2. Upon calling `.execute()` on `ArrowFlightReadExec`, instead of propagating the `.execute()` call on its child, +2. Upon calling `.execute()` on `NetworkCoalesceExec`, instead of propagating the `.execute()` call on its child, the subplan is serialized and sent over the wire to be executed on another worker. -3. The next node, which is hosting an Arrow Flight Endpoint listening for gRPC requests over an HTTP server, will pick +3. The next worker, which is hosting an Arrow Flight Endpoint listening for gRPC requests over an HTTP server, will pick up the request containing the serialized chunk of the overall plan and execute it. 4. This is repeated for each stage, and data will start flowing from bottom to top until it reaches the head stage. From c8016f01662c203551014b9d58a384836e21ddfd Mon Sep 17 00:00:00 2001 From: Gabriel <45515538+gabotechs@users.noreply.github.com> Date: Wed, 24 Sep 2025 15:51:40 +0200 Subject: [PATCH 2/3] Update README.md Co-authored-by: Jayant Shrivastava --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bb26392..67aa801 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ difference: The physical plan is divided into stages and each stage is assigned tasks that run the inner plan in parallel in different workers. All of this is done at the physical plan level, implemented as a `PhysicalOptimizerRule` that: -1. Inspects the non-distributed plan, placing network boundaries (`NetworkShuflle` and `NetworkCoalesce` nodes) in +1. Inspects the non-distributed plan, placing network boundaries (`NetworkShuffle` and `NetworkCoalesce` nodes) in the appropriate places. 2. Based on the placed network boundaries, divides the plan into stages and assigns tasks to them. From 664f520fe76f8ded4ba017c090c4032127e9371e Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Wed, 24 Sep 2025 15:52:30 +0200 Subject: [PATCH 3/3] Fix lines in diagrams --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 67aa801..bcfabe8 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ and will assign tasks to them: │ └───────────────────────┘ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▲ ─ ▲ ─ ▲ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌────────────────┘ │ └────────────────┐ -┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ Stage 2 +┌ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ Stage 2 ┌───────────────────┐┌───────────────────┐┌───────────────────┐ │ │ ProjectionExec ││ ProjectionExec ││ ProjectionExec │ │ └───────────────────┘└───────────────────┘└───────────────────┘ @@ -175,7 +175,7 @@ and will assign tasks to them: │ └───────────────────┘└───────────────────┘└───────────────────┘ │ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▲ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ └────────┬───────────┴──────────┬─────────┘ -┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ Stage 1 +┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ Stage 1 ┌─────────────────────┐┌─────────────────────┐ │ │ RepartitionExec ││ RepartitionExec │ │ └─────────────────────┘└─────────────────────┘