Skip to content

Conversation

@avamingli
Copy link
Contributor

Previously in CBDB, we had to disable parallel execution for UNION ALL queries containing Motion nodes due to a subtle but critical correctness issue.

The problem occurred when Parallel Append workers marked subnodes as completed, causing other workers to skip them. While normally harmless, this became critical in MPP databases where Motion nodes are ubiquitous. This limitation forced us to disable parallel plans for most UNION ALL queries involving distributed tables, missing significant optimization opportunities.

As a result, we fell back to serial execution:

explain(costs off) select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
                            QUERY PLAN
------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Append
         ->  HashAggregate
               Group Key: t1.b
               ->  Redistribute Motion 3:3  (slice2; segments: 3)
                     Hash Key: t1.b
                     ->  Seq Scan on t1
         ->  HashAggregate
               Group Key: t2.b
               ->  Redistribute Motion 3:3  (slice3; segments: 3)
                     Hash Key: t2.b
                     ->  Seq Scan on t2
 Optimizer: Postgres query optimizer
(13 rows)

The commit makes plan parallel by first attempting a parallel-aware Append when it's safe to do so, but crucially, we now have a robust fallback path: when Motion hazards are detected, we switch to using a parallel-oblivious Append. This works because while Parallel Append might skip slices containing Motions, regular Append doesn't have this problem - it will reliably execute all subnodes regardless of whether they contain Motion nodes or not. Moreover, since CBDB's Motion nodes are designed to handle tuples individually, we don't need to worry about coordination between workers when processing these Motion nodes.

This approach unlocks powerful new optimization opportunities, as shown in this example where we can now execute the query with different levels of parallelism for each subplan (2 workers for t1 and 3 workers for t2):

explain(costs off) select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
                            QUERY PLAN
------------------------------------------------------------------
 Gather Motion 9:1  (slice1; segments: 9)
   ->  Append
         ->  HashAggregate
               Group Key: t1.b
               ->  Redistribute Motion 6:9  (slice2; segments: 6)
                     Hash Key: t1.b
                     Hash Module: 3
                     ->  Parallel Seq Scan on t1
         ->  HashAggregate
               Group Key: t2.b
               ->  Redistribute Motion 9:9  (slice3; segments: 9)
                     Hash Key: t2.b
                     Hash Module: 3
                     ->  Parallel Seq Scan on t2
 Optimizer: Postgres query optimizer
(15 rows)

This change represents a significant improvement in CBDB's query optimizer, allowing UNION ALL queries to benefit from parallel execution even when they contain Motion nodes, while maintaining correctness and supporting flexible parallelism configurations across different parts of the query. The optimization is particularly valuable for complex queries like TPC-DS tests where UNION ALL operations are common.

Authored-by: Zhang Mingli [email protected]

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

CI Skip Instructions


@avamingli
Copy link
Contributor Author

Case failed, based on #1284 to be merged first.

Previously in CBDB, we had to disable parallel execution for UNION ALL
queries containing Motion nodes due to a subtle but critical correctness
issue.

The problem occurred when Parallel Append workers marked subnodes as
completed, causing other workers to skip them. While normally harmless,
this became critical in MPP databases where Motion nodes are ubiquitous.
This limitation forced us to disable parallel plans for most UNION ALL
queries involving distributed tables, missing significant optimization
opportunities.

As a result, we fell back to serial execution:

explain(costs off) select b, count(*) from t1 group by b union all
select b, count(*) from t2 group by b;
                            QUERY PLAN
------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Append
         ->  HashAggregate
               Group Key: t1.b
               ->  Redistribute Motion 3:3  (slice2; segments: 3)
                     Hash Key: t1.b
                     ->  Seq Scan on t1
         ->  HashAggregate
               Group Key: t2.b
               ->  Redistribute Motion 3:3  (slice3; segments: 3)
                     Hash Key: t2.b
                     ->  Seq Scan on t2
 Optimizer: Postgres query optimizer
(13 rows)

The commit makes plan parallel by first attempting a parallel-aware
Append when it's safe to do so, but crucially, we now have a robust
fallback path: when Motion hazards are detected, we switch to using a
parallel-oblivious Append. This works because while Parallel Append
might skip slices containing Motions, regular Append doesn't have this
problem - it will reliably execute all subnodes regardless of whether
they contain Motion nodes or not. Moreover, since CBDB's Motion nodes
are designed to handle tuples individually, we don't need to worry
about coordination between workers when processing these Motion nodes.

This approach unlocks powerful new optimization opportunities, as shown
in this example where we can now execute the query with different levels
of parallelism for each subplan (2 workers for t1 and 3 workers for t2):

explain(costs off) select b, count(*) from t1 group by b union all
select b, count(*) from t2 group by b;
                            QUERY PLAN
------------------------------------------------------------------
 Gather Motion 9:1  (slice1; segments: 9)
   ->  Append
         ->  HashAggregate
               Group Key: t1.b
               ->  Redistribute Motion 6:9  (slice2; segments: 6)
                     Hash Key: t1.b
                     Hash Module: 3
                     ->  Parallel Seq Scan on t1
         ->  HashAggregate
               Group Key: t2.b
               ->  Redistribute Motion 9:9  (slice3; segments: 9)
                     Hash Key: t2.b
                     Hash Module: 3
                     ->  Parallel Seq Scan on t2
 Optimizer: Postgres query optimizer
(15 rows)

This change represents a significant improvement in CBDB's query
optimizer, allowing UNION ALL queries to benefit from parallel execution
even when they contain Motion nodes, while maintaining correctness and
supporting flexible parallelism configurations across different parts of
the query. The optimization is particularly valuable for complex queries
like TPC-DS tests where UNION ALL operations are common.

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli merged commit f61fbd9 into apache:main Sep 12, 2025
27 checks passed
@avamingli avamingli deleted the fix_dispatch_typo branch September 12, 2025 03:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants