Skip to content

Commit 01f023f

Browse files
committed
Give UNION ALL more opportunities for parallel plans in MPP.
Previously in CBDB, we had to disable parallel execution for UNION ALL queries containing Motion nodes due to a subtle but critical correctness issue. The problem occurred when Parallel Append workers marked subnodes as completed, causing other workers to skip them. While normally harmless, this became critical in MPP databases where Motion nodes are ubiquitous. This limitation forced us to disable parallel plans for most UNION ALL queries involving distributed tables, missing significant optimization opportunities. As a result, we fell back to serial execution: explain(costs off) select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; QUERY PLAN ------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Append -> HashAggregate Group Key: t1.b -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: t1.b -> Seq Scan on t1 -> HashAggregate Group Key: t2.b -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: t2.b -> Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) The commit makes plan parallel by first attempting a parallel-aware Append when it's safe to do so, but crucially, we now have a robust fallback path: when Motion hazards are detected, we switch to using a parallel-oblivious Append. This works because while Parallel Append might skip slices containing Motions, regular Append doesn't have this problem - it will reliably execute all subnodes regardless of whether they contain Motion nodes or not. Moreover, since CBDB's Motion nodes are designed to handle tuples individually, we don't need to worry about coordination between workers when processing these Motion nodes. This approach unlocks powerful new optimization opportunities, as shown in this example where we can now execute the query with different levels of parallelism for each subplan (2 workers for t1 and 3 workers for t2): explain(costs off) select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) -> Append -> HashAggregate Group Key: t1.b -> Redistribute Motion 6:9 (slice2; segments: 6) Hash Key: t1.b Hash Module: 3 -> Parallel Seq Scan on t1 -> HashAggregate Group Key: t2.b -> Redistribute Motion 9:9 (slice3; segments: 9) Hash Key: t2.b Hash Module: 3 -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (15 rows) This change represents a significant improvement in CBDB's query optimizer, allowing UNION ALL queries to benefit from parallel execution even when they contain Motion nodes, while maintaining correctness and supporting flexible parallelism configurations across different parts of the query. The optimization is particularly valuable for complex queries like TPC-DS tests where UNION ALL operations are common. Authored-by: Zhang Mingli [email protected]
1 parent 39ef2bd commit 01f023f

File tree

3 files changed

+252
-2
lines changed

3 files changed

+252
-2
lines changed

src/backend/optimizer/path/allpaths.c

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,8 +1839,38 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
18391839
* estimate.
18401840
*/
18411841
partial_rows = appendpath->path.rows;
1842-
/* Add the path if subpath has not Motion.*/
1843-
if (appendpath->path.parallel_safe && appendpath->path.motionHazard == false)
1842+
1843+
if (enable_parallel_append)
1844+
{
1845+
/* Add the path if subpath didn't encounter motion hazard.*/
1846+
if (appendpath->path.parallel_safe && (appendpath->path.motionHazard == false))
1847+
add_partial_path(rel, (Path *)appendpath);
1848+
else
1849+
{
1850+
/*
1851+
* CBDB_PARALLEL:
1852+
* When a parallel-aware Append is dropped due to motion hazard,
1853+
* we attempt a second pass using parallel-oblivious Append.
1854+
*
1855+
* This approach is feasible in CBDB because:
1856+
* 1. All Motions in a parallel plan handle tuples individually
1857+
* 2. Parallel Append might miss executing slices containing Motions,
1858+
* whereas regular Append does not have this problem
1859+
*
1860+
* This behavior is conceptually similar to UPSTREAM's Append node
1861+
* with partial paths implementation.
1862+
*/
1863+
appendpath = create_append_path(root, rel, NIL, partial_subpaths,
1864+
NIL, NULL, parallel_workers,
1865+
false /*enable_parallel_append*/,
1866+
-1);
1867+
partial_rows = appendpath->path.rows;
1868+
1869+
if (appendpath->path.parallel_safe)
1870+
add_partial_path(rel, (Path *)appendpath);
1871+
}
1872+
}
1873+
else if (appendpath->path.parallel_safe)
18441874
add_partial_path(rel, (Path *)appendpath);
18451875
}
18461876
}

src/test/regress/expected/window_parallel.out

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,180 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p
858858
--
859859
-- End of test of Parallel process of Window Functions.
860860
--
861+
--
862+
-- Test Parallel UNION ALL
863+
--
864+
create table t1(a int, b int) with(parallel_workers=2);
865+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table.
866+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
867+
create table t2(a int, b int) with(parallel_workers=2);
868+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table.
869+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
870+
insert into t1 select i, i from generate_series(1, 10000) i;
871+
insert into t1 select i, i from generate_series(1, 10000) i;
872+
analyze t1;
873+
analyze t2;
874+
begin;
875+
set local enable_parallel = on;
876+
set local enable_parallel_append = on;
877+
set local min_parallel_table_scan_size = 0;
878+
-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append.
879+
explain(costs off, verbose)
880+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
881+
QUERY PLAN
882+
------------------------------------------------------------------------------------------------------------------------
883+
Gather Motion 6:1 (slice1; segments: 6)
884+
Output: t1.b, (count(*))
885+
-> Append
886+
-> HashAggregate
887+
Output: t1.b, count(*)
888+
Group Key: t1.b
889+
-> Redistribute Motion 6:6 (slice2; segments: 6)
890+
Output: t1.b
891+
Hash Key: t1.b
892+
Hash Module: 3
893+
-> Parallel Seq Scan on window_parallel.t1
894+
Output: t1.b
895+
-> HashAggregate
896+
Output: t2.b, count(*)
897+
Group Key: t2.b
898+
-> Redistribute Motion 6:6 (slice3; segments: 6)
899+
Output: t2.b
900+
Hash Key: t2.b
901+
Hash Module: 3
902+
-> Parallel Seq Scan on window_parallel.t2
903+
Output: t2.b
904+
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
905+
Optimizer: Postgres query optimizer
906+
(23 rows)
907+
908+
set local enable_parallel_append = off;
909+
-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled.
910+
explain(costs off, verbose)
911+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
912+
QUERY PLAN
913+
-------------------------------------------------------------------------------------------------------------------------
914+
Gather Motion 6:1 (slice1; segments: 6)
915+
Output: t1.b, (count(*))
916+
-> Append
917+
-> HashAggregate
918+
Output: t1.b, count(*)
919+
Group Key: t1.b
920+
-> Redistribute Motion 6:6 (slice2; segments: 6)
921+
Output: t1.b
922+
Hash Key: t1.b
923+
Hash Module: 3
924+
-> Parallel Seq Scan on window_parallel.t1
925+
Output: t1.b
926+
-> HashAggregate
927+
Output: t2.b, count(*)
928+
Group Key: t2.b
929+
-> Redistribute Motion 6:6 (slice3; segments: 6)
930+
Output: t2.b
931+
Hash Key: t2.b
932+
Hash Module: 3
933+
-> Parallel Seq Scan on window_parallel.t2
934+
Output: t2.b
935+
Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off'
936+
Optimizer: Postgres query optimizer
937+
(23 rows)
938+
939+
-- Ensure compatibility between different paths when using parallel workers
940+
set local enable_parallel_append = on;
941+
set max_parallel_workers_per_gather = 3;
942+
alter table t2 set(parallel_workers=3);
943+
explain(costs off, verbose)
944+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
945+
QUERY PLAN
946+
------------------------------------------------------------------------------------------------------------------------
947+
Gather Motion 9:1 (slice1; segments: 9)
948+
Output: t1.b, (count(*))
949+
-> Append
950+
-> HashAggregate
951+
Output: t1.b, count(*)
952+
Group Key: t1.b
953+
-> Redistribute Motion 6:9 (slice2; segments: 6)
954+
Output: t1.b
955+
Hash Key: t1.b
956+
Hash Module: 3
957+
-> Parallel Seq Scan on window_parallel.t1
958+
Output: t1.b
959+
-> HashAggregate
960+
Output: t2.b, count(*)
961+
Group Key: t2.b
962+
-> Redistribute Motion 9:9 (slice3; segments: 9)
963+
Output: t2.b
964+
Hash Key: t2.b
965+
Hash Module: 3
966+
-> Parallel Seq Scan on window_parallel.t2
967+
Output: t2.b
968+
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
969+
Optimizer: Postgres query optimizer
970+
(23 rows)
971+
972+
-- Could not drive a parallel plan if no partial paths are avaliable
973+
alter table t2 set(parallel_workers=0);
974+
-- parallel-aware
975+
explain(costs off, verbose)
976+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
977+
QUERY PLAN
978+
------------------------------------------------------------------------------------------------------------------------
979+
Gather Motion 3:1 (slice1; segments: 3)
980+
Output: t1.b, (count(*))
981+
-> Append
982+
-> HashAggregate
983+
Output: t1.b, count(*)
984+
Group Key: t1.b
985+
-> Redistribute Motion 3:3 (slice2; segments: 3)
986+
Output: t1.b
987+
Hash Key: t1.b
988+
-> Seq Scan on window_parallel.t1
989+
Output: t1.b
990+
-> HashAggregate
991+
Output: t2.b, count(*)
992+
Group Key: t2.b
993+
-> Redistribute Motion 3:3 (slice3; segments: 3)
994+
Output: t2.b
995+
Hash Key: t2.b
996+
-> Seq Scan on window_parallel.t2
997+
Output: t2.b
998+
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
999+
Optimizer: Postgres query optimizer
1000+
(21 rows)
1001+
1002+
set local enable_parallel_append = off;
1003+
-- Also applies to parallel-oblivious
1004+
explain(costs off, verbose)
1005+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
1006+
QUERY PLAN
1007+
-------------------------------------------------------------------------------------------------------------------------
1008+
Gather Motion 3:1 (slice1; segments: 3)
1009+
Output: t1.b, (count(*))
1010+
-> Append
1011+
-> HashAggregate
1012+
Output: t1.b, count(*)
1013+
Group Key: t1.b
1014+
-> Redistribute Motion 3:3 (slice2; segments: 3)
1015+
Output: t1.b
1016+
Hash Key: t1.b
1017+
-> Seq Scan on window_parallel.t1
1018+
Output: t1.b
1019+
-> HashAggregate
1020+
Output: t2.b, count(*)
1021+
Group Key: t2.b
1022+
-> Redistribute Motion 3:3 (slice3; segments: 3)
1023+
Output: t2.b
1024+
Hash Key: t2.b
1025+
-> Seq Scan on window_parallel.t2
1026+
Output: t2.b
1027+
Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off'
1028+
Optimizer: Postgres query optimizer
1029+
(21 rows)
1030+
1031+
abort;
1032+
--
1033+
-- End of test Parallel UNION ALL
1034+
--
8611035
-- start_ignore
8621036
drop schema window_parallel cascade;
8631037
NOTICE: drop cascades to table empsalary

src/test/regress/sql/window_parallel.sql

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,52 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p
213213
--
214214
-- End of test of Parallel process of Window Functions.
215215
--
216+
217+
--
218+
-- Test Parallel UNION ALL
219+
--
220+
create table t1(a int, b int) with(parallel_workers=2);
221+
create table t2(a int, b int) with(parallel_workers=2);
222+
insert into t1 select i, i from generate_series(1, 10000) i;
223+
insert into t1 select i, i from generate_series(1, 10000) i;
224+
analyze t1;
225+
analyze t2;
226+
227+
begin;
228+
set local enable_parallel = on;
229+
set local enable_parallel_append = on;
230+
set local min_parallel_table_scan_size = 0;
231+
232+
-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append.
233+
explain(costs off, verbose)
234+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
235+
236+
set local enable_parallel_append = off;
237+
-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled.
238+
explain(costs off, verbose)
239+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
240+
241+
-- Ensure compatibility between different paths when using parallel workers
242+
set local enable_parallel_append = on;
243+
set max_parallel_workers_per_gather = 3;
244+
alter table t2 set(parallel_workers=3);
245+
explain(costs off, verbose)
246+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
247+
248+
-- Could not drive a parallel plan if no partial paths are avaliable
249+
alter table t2 set(parallel_workers=0);
250+
-- parallel-aware
251+
explain(costs off, verbose)
252+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
253+
set local enable_parallel_append = off;
254+
-- Also applies to parallel-oblivious
255+
explain(costs off, verbose)
256+
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
257+
abort;
258+
259+
--
260+
-- End of test Parallel UNION ALL
261+
--
216262
-- start_ignore
217263
drop schema window_parallel cascade;
218264
-- end_ignore

0 commit comments

Comments
 (0)