diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 4832f25d1ac..c02fcd4ea73 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -1839,8 +1839,38 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * estimate. */ partial_rows = appendpath->path.rows; - /* Add the path if subpath has not Motion.*/ - if (appendpath->path.parallel_safe && appendpath->path.motionHazard == false) + + if (enable_parallel_append) + { + /* Add the path if subpath didn't encounter motion hazard.*/ + if (appendpath->path.parallel_safe && (appendpath->path.motionHazard == false)) + add_partial_path(rel, (Path *)appendpath); + else + { + /* + * CBDB_PARALLEL: + * When a parallel-aware Append is dropped due to motion hazard, + * we attempt a second pass using parallel-oblivious Append. + * + * This approach is feasible in CBDB because: + * 1. All Motions in a parallel plan handle tuples individually + * 2. Parallel Append might miss executing slices containing Motions, + * whereas regular Append does not have this problem + * + * This behavior is conceptually similar to UPSTREAM's Append node + * with partial paths implementation. + */ + appendpath = create_append_path(root, rel, NIL, partial_subpaths, + NIL, NULL, parallel_workers, + false /*enable_parallel_append*/, + -1); + partial_rows = appendpath->path.rows; + + if (appendpath->path.parallel_safe) + add_partial_path(rel, (Path *)appendpath); + } + } + else if (appendpath->path.parallel_safe) add_partial_path(rel, (Path *)appendpath); } } diff --git a/src/test/regress/expected/window_parallel.out b/src/test/regress/expected/window_parallel.out index c9e226b7355..55fd0e820e0 100644 --- a/src/test/regress/expected/window_parallel.out +++ b/src/test/regress/expected/window_parallel.out @@ -861,6 +861,180 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p -- -- End of test of Parallel process of Window Functions. -- +-- +-- Test Parallel UNION ALL +-- +create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +create table t2(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into t1 select i, i from generate_series(1, 10000) i; +insert into t1 select i, i from generate_series(1, 10000) i; +analyze t1; +analyze t2; +begin; +set local enable_parallel = on; +set local enable_parallel_append = on; +set local min_parallel_table_scan_size = 0; +-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append. +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + Output: t1.b, (count(*)) + -> Append + -> HashAggregate + Output: t1.b, count(*) + Group Key: t1.b + -> Redistribute Motion 6:6 (slice2; segments: 6) + Output: t1.b + Hash Key: t1.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t1 + Output: t1.b + -> HashAggregate + Output: t2.b, count(*) + Group Key: t2.b + -> Redistribute Motion 6:6 (slice3; segments: 6) + Output: t2.b + Hash Key: t2.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t2 + Output: t2.b + Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off' + Optimizer: Postgres query optimizer +(23 rows) + +set local enable_parallel_append = off; +-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled. +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + Output: t1.b, (count(*)) + -> Append + -> HashAggregate + Output: t1.b, count(*) + Group Key: t1.b + -> Redistribute Motion 6:6 (slice2; segments: 6) + Output: t1.b + Hash Key: t1.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t1 + Output: t1.b + -> HashAggregate + Output: t2.b, count(*) + Group Key: t2.b + -> Redistribute Motion 6:6 (slice3; segments: 6) + Output: t2.b + Hash Key: t2.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t2 + Output: t2.b + Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off' + Optimizer: Postgres query optimizer +(23 rows) + +-- Ensure compatibility between different paths when using parallel workers +set local enable_parallel_append = on; +set max_parallel_workers_per_gather = 3; +alter table t2 set(parallel_workers=3); +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Gather Motion 9:1 (slice1; segments: 9) + Output: t1.b, (count(*)) + -> Append + -> HashAggregate + Output: t1.b, count(*) + Group Key: t1.b + -> Redistribute Motion 6:9 (slice2; segments: 6) + Output: t1.b + Hash Key: t1.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t1 + Output: t1.b + -> HashAggregate + Output: t2.b, count(*) + Group Key: t2.b + -> Redistribute Motion 9:9 (slice3; segments: 9) + Output: t2.b + Hash Key: t2.b + Hash Module: 3 + -> Parallel Seq Scan on window_parallel.t2 + Output: t2.b + Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off' + Optimizer: Postgres query optimizer +(23 rows) + +-- Could not drive a parallel plan if no partial paths are avaliable +alter table t2 set(parallel_workers=0); +-- parallel-aware +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.b, (count(*)) + -> Append + -> HashAggregate + Output: t1.b, count(*) + Group Key: t1.b + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.b + Hash Key: t1.b + -> Seq Scan on window_parallel.t1 + Output: t1.b + -> HashAggregate + Output: t2.b, count(*) + Group Key: t2.b + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: t2.b + Hash Key: t2.b + -> Seq Scan on window_parallel.t2 + Output: t2.b + Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off' + Optimizer: Postgres query optimizer +(21 rows) + +set local enable_parallel_append = off; +-- Also applies to parallel-oblivious +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.b, (count(*)) + -> Append + -> HashAggregate + Output: t1.b, count(*) + Group Key: t1.b + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.b + Hash Key: t1.b + -> Seq Scan on window_parallel.t1 + Output: t1.b + -> HashAggregate + Output: t2.b, count(*) + Group Key: t2.b + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: t2.b + Hash Key: t2.b + -> Seq Scan on window_parallel.t2 + Output: t2.b + Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off' + Optimizer: Postgres query optimizer +(21 rows) + +abort; +-- +-- End of test Parallel UNION ALL +-- -- start_ignore drop schema window_parallel cascade; NOTICE: drop cascades to table empsalary diff --git a/src/test/regress/sql/window_parallel.sql b/src/test/regress/sql/window_parallel.sql index 9ba4f1b9fc4..c6cd5e197cc 100644 --- a/src/test/regress/sql/window_parallel.sql +++ b/src/test/regress/sql/window_parallel.sql @@ -213,6 +213,52 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p -- -- End of test of Parallel process of Window Functions. -- + +-- +-- Test Parallel UNION ALL +-- +create table t1(a int, b int) with(parallel_workers=2); +create table t2(a int, b int) with(parallel_workers=2); +insert into t1 select i, i from generate_series(1, 10000) i; +insert into t1 select i, i from generate_series(1, 10000) i; +analyze t1; +analyze t2; + +begin; +set local enable_parallel = on; +set local enable_parallel_append = on; +set local min_parallel_table_scan_size = 0; + +-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append. +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + +set local enable_parallel_append = off; +-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled. +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + +-- Ensure compatibility between different paths when using parallel workers +set local enable_parallel_append = on; +set max_parallel_workers_per_gather = 3; +alter table t2 set(parallel_workers=3); +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; + +-- Could not drive a parallel plan if no partial paths are avaliable +alter table t2 set(parallel_workers=0); +-- parallel-aware +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; +set local enable_parallel_append = off; +-- Also applies to parallel-oblivious +explain(costs off, verbose) +select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b; +abort; + +-- +-- End of test Parallel UNION ALL +-- -- start_ignore drop schema window_parallel cascade; -- end_ignore