Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions src/backend/optimizer/path/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -1839,8 +1839,38 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
* estimate.
*/
partial_rows = appendpath->path.rows;
/* Add the path if subpath has not Motion.*/
if (appendpath->path.parallel_safe && appendpath->path.motionHazard == false)

if (enable_parallel_append)
{
/* Add the path if subpath didn't encounter motion hazard.*/
if (appendpath->path.parallel_safe && (appendpath->path.motionHazard == false))
add_partial_path(rel, (Path *)appendpath);
else
{
/*
* CBDB_PARALLEL:
* When a parallel-aware Append is dropped due to motion hazard,
* we attempt a second pass using parallel-oblivious Append.
*
* This approach is feasible in CBDB because:
* 1. All Motions in a parallel plan handle tuples individually
* 2. Parallel Append might miss executing slices containing Motions,
* whereas regular Append does not have this problem
*
* This behavior is conceptually similar to UPSTREAM's Append node
* with partial paths implementation.
*/
appendpath = create_append_path(root, rel, NIL, partial_subpaths,
NIL, NULL, parallel_workers,
false /*enable_parallel_append*/,
-1);
partial_rows = appendpath->path.rows;

if (appendpath->path.parallel_safe)
add_partial_path(rel, (Path *)appendpath);
}
}
else if (appendpath->path.parallel_safe)
add_partial_path(rel, (Path *)appendpath);
}
}
Expand Down
174 changes: 174 additions & 0 deletions src/test/regress/expected/window_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,180 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p
--
-- End of test of Parallel process of Window Functions.
--
--
-- Test Parallel UNION ALL
--
create table t1(a int, b int) with(parallel_workers=2);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t2(a int, b int) with(parallel_workers=2);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
insert into t1 select i, i from generate_series(1, 10000) i;
insert into t1 select i, i from generate_series(1, 10000) i;
analyze t1;
analyze t2;
begin;
set local enable_parallel = on;
set local enable_parallel_append = on;
set local min_parallel_table_scan_size = 0;
-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append.
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
Gather Motion 6:1 (slice1; segments: 6)
Output: t1.b, (count(*))
-> Append
-> HashAggregate
Output: t1.b, count(*)
Group Key: t1.b
-> Redistribute Motion 6:6 (slice2; segments: 6)
Output: t1.b
Hash Key: t1.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t1
Output: t1.b
-> HashAggregate
Output: t2.b, count(*)
Group Key: t2.b
-> Redistribute Motion 6:6 (slice3; segments: 6)
Output: t2.b
Hash Key: t2.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t2
Output: t2.b
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
Optimizer: Postgres query optimizer
(23 rows)

set local enable_parallel_append = off;
-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled.
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Gather Motion 6:1 (slice1; segments: 6)
Output: t1.b, (count(*))
-> Append
-> HashAggregate
Output: t1.b, count(*)
Group Key: t1.b
-> Redistribute Motion 6:6 (slice2; segments: 6)
Output: t1.b
Hash Key: t1.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t1
Output: t1.b
-> HashAggregate
Output: t2.b, count(*)
Group Key: t2.b
-> Redistribute Motion 6:6 (slice3; segments: 6)
Output: t2.b
Hash Key: t2.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t2
Output: t2.b
Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off'
Optimizer: Postgres query optimizer
(23 rows)

-- Ensure compatibility between different paths when using parallel workers
set local enable_parallel_append = on;
set max_parallel_workers_per_gather = 3;
alter table t2 set(parallel_workers=3);
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
Gather Motion 9:1 (slice1; segments: 9)
Output: t1.b, (count(*))
-> Append
-> HashAggregate
Output: t1.b, count(*)
Group Key: t1.b
-> Redistribute Motion 6:9 (slice2; segments: 6)
Output: t1.b
Hash Key: t1.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t1
Output: t1.b
-> HashAggregate
Output: t2.b, count(*)
Group Key: t2.b
-> Redistribute Motion 9:9 (slice3; segments: 9)
Output: t2.b
Hash Key: t2.b
Hash Module: 3
-> Parallel Seq Scan on window_parallel.t2
Output: t2.b
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
Optimizer: Postgres query optimizer
(23 rows)

-- Could not drive a parallel plan if no partial paths are avaliable
alter table t2 set(parallel_workers=0);
-- parallel-aware
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
Output: t1.b, (count(*))
-> Append
-> HashAggregate
Output: t1.b, count(*)
Group Key: t1.b
-> Redistribute Motion 3:3 (slice2; segments: 3)
Output: t1.b
Hash Key: t1.b
-> Seq Scan on window_parallel.t1
Output: t1.b
-> HashAggregate
Output: t2.b, count(*)
Group Key: t2.b
-> Redistribute Motion 3:3 (slice3; segments: 3)
Output: t2.b
Hash Key: t2.b
-> Seq Scan on window_parallel.t2
Output: t2.b
Settings: enable_parallel = 'on', enable_parallel_append = 'on', min_parallel_table_scan_size = '0', optimizer = 'off'
Optimizer: Postgres query optimizer
(21 rows)

set local enable_parallel_append = off;
-- Also applies to parallel-oblivious
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
Output: t1.b, (count(*))
-> Append
-> HashAggregate
Output: t1.b, count(*)
Group Key: t1.b
-> Redistribute Motion 3:3 (slice2; segments: 3)
Output: t1.b
Hash Key: t1.b
-> Seq Scan on window_parallel.t1
Output: t1.b
-> HashAggregate
Output: t2.b, count(*)
Group Key: t2.b
-> Redistribute Motion 3:3 (slice3; segments: 3)
Output: t2.b
Hash Key: t2.b
-> Seq Scan on window_parallel.t2
Output: t2.b
Settings: enable_parallel = 'on', enable_parallel_append = 'off', min_parallel_table_scan_size = '0', optimizer = 'off'
Optimizer: Postgres query optimizer
(21 rows)

abort;
--
-- End of test Parallel UNION ALL
--
-- start_ignore
drop schema window_parallel cascade;
NOTICE: drop cascades to table empsalary
Expand Down
46 changes: 46 additions & 0 deletions src/test/regress/sql/window_parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,52 @@ select sum(salary) over (order by enroll_date range between '1 year'::interval p
--
-- End of test of Parallel process of Window Functions.
--

--
-- Test Parallel UNION ALL
--
create table t1(a int, b int) with(parallel_workers=2);
create table t2(a int, b int) with(parallel_workers=2);
insert into t1 select i, i from generate_series(1, 10000) i;
insert into t1 select i, i from generate_series(1, 10000) i;
analyze t1;
analyze t2;

begin;
set local enable_parallel = on;
set local enable_parallel_append = on;
set local min_parallel_table_scan_size = 0;

-- If parallel-aware append encounters a motion hazard, fall back to parallel-oblivious append.
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;

set local enable_parallel_append = off;
-- Naturally, use parallel-oblivious append directly when parallel-aware mode is disabled.
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;

-- Ensure compatibility between different paths when using parallel workers
set local enable_parallel_append = on;
set max_parallel_workers_per_gather = 3;
alter table t2 set(parallel_workers=3);
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;

-- Could not drive a parallel plan if no partial paths are avaliable
alter table t2 set(parallel_workers=0);
-- parallel-aware
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
set local enable_parallel_append = off;
-- Also applies to parallel-oblivious
explain(costs off, verbose)
select b, count(*) from t1 group by b union all select b, count(*) from t2 group by b;
abort;

--
-- End of test Parallel UNION ALL
--
-- start_ignore
drop schema window_parallel cascade;
-- end_ignore
Expand Down
Loading