-
Notifications
You must be signed in to change notification settings - Fork 257
Description
Executive Summary
- Create a distributed query planner which can change stages, add/or remove stages, or cancel stage (partitioning) to reflect collected runtime statistics.
- An extendable distributed query planner, simple to extend, just like adding a new physical rule.
Overview
Currently, jobs in Ballista are divided into stages at job submission time according to predefined rules.
Ballista can optimize stages based on runtime information using set of physical optimizers, changing the join order for example. This has been implemented here with only few physical optimizer rules used.
Also, few more rules are scattered around here and here
Current approach may fail to generate valid plan in case there is dependency between physical optimizers. Furthermore, some optimizations can’t be propagated across stage boundaries potentially impacting overall performance.
Physical optimizers used work well with a major downside, rules being scattered across code, making adding or removing rules non-trivial task. Scattered rules are hard to control, thus centralized rule "facility" is need. It would make it easier to change or alter behavior of physical planners or to experiment with new solutions.
Another major problem arises when stages are split; it becomes impossible to change their layout, which can be necessary in certain situations. For instance, if an exchange produces an empty result, we’d need to run a subsequent stage even though the system knows it will also produce an empty result. This can lead to significant overhead on the scheduler, especially with many tasks. Having the ability to remove a stage would be helpful in this scenario.
Proposal
Implement adaptive query planner which would address some of the issues with the current approach. In the core of the design are pluggable physical optimizer rules which would re-optimize plan after each stage.
This proposal is strongly influenced by Adaptive and Robust Query Execution for Lakehouses at Scale paper.
Major difference from it is that ballista works on physical plan rather than logical plan.
Impact on DataFusion
Currently, in DataFusion, physical optimizer rules are run once per plan execution, with current proposal, whole set of rules will be run after each stage, thus if physical optimizer rules are not idempotent, they may keep adding physical plane exec nodes even they have been already added and unnecessary.
Physical optimizer rules have single pass, which may not work well with current approach (see "adding new stage" after join reordering further below).
Impact on Ballista
Stage planning in Ballista revolves around ExecutionGraph, which prescribes a translation from a physical plan to a set of stages executing that plan. ExecutionGraph was implemented as a concrete trait, hence it’s not a trivial task to replace it. ExecutionGraph does accept a dyn DistributedPlanner, but unfortunately DistributedPlanner returns a static list of stages, which would not work with adaptive query planning.
ExecutionGraph is initialized in TaskManager when new job is submitted.
There are two approaches which we can take with ExecutionGraph:
- Keep
ExecutionGraphas it is, adding code required forAQE. This approach might be possible, but it would be hard to maintain, and probably messy to implement. - Rename
ExecutionGraphtoStaticExecutionGraphintroducedyn ExecutionGraphwhich would be implemented byStaticExecutionGraphand newAdaptiveExecutionGraph. Change implementation inTaskMangerbased on predefined configuration parameter. This approach looks like easier route to take. It would enable us to have two implementation running in parallel until AQE implementation matures.
Adaptive planner will need some time to get it right. It will work with current distributed planner for foreseeable future, will be disabled by default with configuration option to turn it on.
Plan Arithmetics
Adding New Stage to Running Plan
Some optimization may produce a need for an exchange to be inserted into a running plan bug 1 and bug 2
┌───────────────────────────┐
│ AdaptiveDatafusionExec │
│ -------------------- │
│ is_final: false │
│ plan_id: 1 │
│ │
│ stage_id │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ CrossJoinExec ├──────────────┐
└─────────────┬─────────────┘ │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ CoalescePartitionsExec ││ CooperativeExec │
│ ││ -------------------- │
│ ││ CooperativeExec │
└─────────────┬─────────────┘└─────────────┬─────────────┘
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ ExchangeExec ││ File Scan │
│ -------------------- ││ -------------------- │
│ partitioning: ││ num_partitions: 2 │
│ Hash([big_col@0], 2) ││ │
│ ││ statistics: │
│ plan_id: 0 ││ [Rows=Inexact(1024), Bytes│
│ stage_id: None ││ =Inexact(8192), [(Col[0]: │
│ stage_resolved: false ││ )]] │
└─────────────┬─────────────┘└───────────────────────────┘
┌─────────────┴─────────────┐
│ CooperativeExec │
│ -------------------- │
│ CooperativeExec │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ File Scan │
└───────────────────────────┘
plan after join reordering and new stage addition (plan_id: 3), looks similar to:
┌───────────────────────────┐
│ AdaptiveDatafusionExec │
│ -------------------- │
│ is_final: true │
│ plan_id: 1 │
│ stage_id: 2 │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ ProjectionExec │
│ -------------------- │
│ big_col: big_col │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ CrossJoinExec ├──────────────┐
└─────────────┬─────────────┘ │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ CoalescePartitionsExec ││ ExchangeExec │
│ ││ -------------------- │
│ ││ partitioning: │
│ ││ Hash([big_col@0], 2) │
│ ││ │
│ ││ plan_id: 0 │
│ ││ stage_id: 0 │
│ ││ stage_resolved: true │
└─────────────┬─────────────┘└─────────────┬─────────────┘
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ ExchangeExec ││ CooperativeExec │
│ -------------------- ││ -------------------- │
│ partitioning: ││ CooperativeExec │
│ None ││ │
│ ││ │
│ plan_id: 3 ││ │
│ stage_id: 1 ││ │
│ stage_resolved: true ││ │
└─────────────┬─────────────┘└─────────────┬─────────────┘
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│ CooperativeExec ││ File Scan │
│ -------------------- ││ │
│ CooperativeExec ││ │
└─────────────┬─────────────┘└───────────────────────────┘
┌─────────────┴─────────────┐
│ File Scan │
│ -------------------- │
│ num_partitions: 2 │
│ │
│ statistics: │
│ [Rows=Inexact(1024), Bytes│
│ =Inexact(8192), [(Col[0]: │
│ )]] │
└───────────────────────────┘
Removing Stages from a Running Plan
Apart from adding stages, stages should be removed if they will produce empty result.
This will reduce pressure on scheduler as it will skip task scheduling.
Take for example plan
[stage 2] --------------------------------------------------------------------------------------------------
AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
ProjectionExec: expr=[c0@0 as c0, count(Int64(1))@1 as count(*)]
AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], aggr=[count(Int64(1))]
CoalesceBatchesExec: target_batch_size=8192
[stage 1] -------------------------------------------------------------------------------------------
ExchangeExec: partitioning=Hash([c0@0], 2), plan_id=1, stage_id=pending, stage_resolved=false
AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[count(Int64(1))]
ProjectionExec: expr=[min(t.a)@1 as c0]
AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[min(t.a)]
CoalesceBatchesExec: target_batch_size=8192
[stage 0] ---------------------------------------------------------------------------------
ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, stage_id=pending, stage_resolved=false
AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: a@0 = 42
DataSourceExec: partitions=1, partition_sizes=[1]
Running first stage, producing no results
ShuffleWriterExec: partitioning:Hash([c@0], 2)
AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: a@0 = 42
DataSourceExec: partitions=1, partition_sizes=[1]
statistic information will be used, by adaptive planner to skip all other stages and just return empty result.
┌───────────────────────────┐
│ ShuffleWriterExec │
│ -------------------- │
│ partitioning: None │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ EmptyExec │
└───────────────────────────┘
Coalesce Shuffle Partitions (Elastic Shuffle Parallelism)
Should shuffle partitions produce very little data, planner should coalesce partitions, reducing number of task needed to produce results. The rule modifies the partitioning specification in the ShuffleRead operator
Canceling Redundant (Running) Stages
Adding or removing stages may arise need to cancel already running stages. This can happen, for example, if some stage produces empty result making other stages unnecessary, or if there is better, semantic equal but considered superior plan (this case needs further investigation).
Take for example plan
AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)], projection=[a@1, b@2, c@3]
CoalesceBatchesExec: target_batch_size=8192
ExchangeExec: partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, stage_resolved=false
CoalesceBatchesExec: target_batch_size=8192
FilterExec: b@1 = 42, projection=[a@0]
DataSourceExec: partitions=1, partition_sizes=[1]
CoalesceBatchesExec: target_batch_size=8192
ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, stage_resolved=false
DataSourceExec: partitions=1, partition_sizes=[1]
we have two runnable stages, a build side of join ExchangeExec: partitioning=Hash([a@0], 2), plan_id=0, stage_id=0, stage_resolved=false and a probe side ExchangeExec: partitioning=Hash([a@0], 2), plan_id=1, stage_id=1, stage_resolved=false.
Let's say probe side needs some time to finish.
If the build side, stage (0), finishes first with shuffle producing no data hash join will produce no data as well, thus whole hash join could be replaced with EmptyExec.
Something like:
AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
EmptyExec
Plan could not finish until both stages finish, increasing overall result latency. Cancelling redundant stages would make sense, as it will reduce cluster utilization (and result latency).