Skip to content

Low number of stages in TPCH tests #112

@gabotechs

Description

@gabotechs

The following PR:

Extended the TPCH tests asserting the generated plans, and one thing that stands out is that the queries are getting only slightly distributed, having at most 1 or 2 network boundaries per query.

For example, TPCH query 21 has only 2 stages and 1 network boundary, even though the query is huge:

┌───── Stage 2   Task: partitions: 0,unassigned]
│partitions [out:1  <-- in:3  ] SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST]
│partitions [out:3  <-- in:3  ]   SortExec: expr=[numwait@1 DESC, s_name@0 ASC NULLS LAST], preserve_partitioning=[true]
│partitions [out:3  <-- in:3  ]     ProjectionExec: expr=[s_name@0 as s_name, count(Int64(1))@1 as numwait]
│partitions [out:3  <-- in:3  ]       AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name], aggr=[count(Int64(1))]
│partitions [out:3  <-- in:3  ]         CoalesceBatchesExec: target_batch_size=8192
│partitions [out:3            ]           ArrowFlightReadExec: Stage 1  
└──────────────────────────────────────────────────
  ┌───── Stage 1   Task: partitions: 0..2,unassigned]
  │partitions [out:3  <-- in:3  ] RepartitionExec: partitioning=Hash([s_name@0], 3), input_partitions=3
  │partitions [out:3  <-- in:3  ]   AggregateExec: mode=Partial, gby=[s_name@0 as s_name], aggr=[count(Int64(1))]
  │partitions [out:3  <-- in:3  ]     CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:1  ]       HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(l_orderkey@1, l_orderkey@0)], filter=l_suppkey@1 != l_suppkey@0, projection=[s_name@0]
  │partitions [out:1  <-- in:3  ]         CoalescePartitionsExec
  │partitions [out:3  <-- in:3  ]           CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:1  ]             HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(l_orderkey@1, l_orderkey@0)], filter=l_suppkey@1 != l_suppkey@0
  │partitions [out:1  <-- in:3  ]               CoalescePartitionsExec
  │partitions [out:3  <-- in:3  ]                 CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:1  ]                   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@1)], projection=[s_name@1, l_orderkey@3, l_suppkey@4]
  │partitions [out:1  <-- in:3  ]                     CoalescePartitionsExec
  │partitions [out:3  <-- in:3  ]                       CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:3  ]                         FilterExec: n_name@1 = SAUDI ARABIA, projection=[n_nationkey@0]
  │partitions [out:3            ]                           DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/nation/1.parquet], [/testdata/tpch/data/nation/2.parquet], [/testdata/tpch/data/nation/3.parquet]]}, projection=[n_nationkey, n_name], file_type=parquet, predicate=n_name@1 = SAUDI ARABIA, pruning_predicate=n_name_null_count@2 != row_count@3 AND n_name_min@0 <= SAUDI ARABIA AND SAUDI ARABIA <= n_name_max@1, required_guarantees=[n_name in (SAUDI ARABIA)]
  │partitions [out:3  <-- in:3  ]                     CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:1  ]                       HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(o_orderkey@0, l_orderkey@2)], projection=[s_name@1, s_nationkey@2, l_orderkey@3, l_suppkey@4]
  │partitions [out:1  <-- in:3  ]                         CoalescePartitionsExec
  │partitions [out:3  <-- in:3  ]                           CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:3  ]                             FilterExec: o_orderstatus@1 = F, projection=[o_orderkey@0]
  │partitions [out:3            ]                               DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/orders/1.parquet], [/testdata/tpch/data/orders/2.parquet], [/testdata/tpch/data/orders/3.parquet]]}, projection=[o_orderkey, o_orderstatus], file_type=parquet, predicate=o_orderstatus@1 = F, pruning_predicate=o_orderstatus_null_count@2 != row_count@3 AND o_orderstatus_min@0 <= F AND F <= o_orderstatus_max@1, required_guarantees=[o_orderstatus in (F)]
  │partitions [out:3  <-- in:3  ]                         CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:1  ]                           HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_suppkey@0, l_suppkey@1)], projection=[s_name@1, s_nationkey@2, l_orderkey@3, l_suppkey@4]
  │partitions [out:1  <-- in:3  ]                             CoalescePartitionsExec
  │partitions [out:3            ]                               DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/supplier/1.parquet], [/testdata/tpch/data/supplier/2.parquet], [/testdata/tpch/data/supplier/3.parquet]]}, projection=[s_suppkey, s_name, s_nationkey], file_type=parquet
  │partitions [out:3  <-- in:3  ]                             CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:3  ]                               FilterExec: l_receiptdate@3 > l_commitdate@2, projection=[l_orderkey@0, l_suppkey@1]
  │partitions [out:3            ]                                 DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/lineitem/1.parquet], [/testdata/tpch/data/lineitem/2.parquet], [/testdata/tpch/data/lineitem/3.parquet]]}, projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate], file_type=parquet, predicate=l_receiptdate@3 > l_commitdate@2
  │partitions [out:3            ]               DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/lineitem/1.parquet], [/testdata/tpch/data/lineitem/2.parquet], [/testdata/tpch/data/lineitem/3.parquet]]}, projection=[l_orderkey, l_suppkey], file_type=parquet
  │partitions [out:3  <-- in:3  ]         CoalesceBatchesExec: target_batch_size=8192
  │partitions [out:3  <-- in:3  ]           FilterExec: l_receiptdate@3 > l_commitdate@2, projection=[l_orderkey@0, l_suppkey@1]
  │partitions [out:3            ]             DataSourceExec: file_groups={3 groups: [[/testdata/tpch/data/lineitem/1.parquet], [/testdata/tpch/data/lineitem/2.parquet], [/testdata/tpch/data/lineitem/3.parquet]]}, projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate], file_type=parquet, predicate=l_receiptdate@3 > l_commitdate@2
  └──────────────────────────────────────────────────

Maybe there's some opportunity in the planner to improve the distribution logic?

Probably getting some inspiration from other distributed engines like Ballista or Trino can help inform distributed planning decicions.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions