Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/backend/executor/execParallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "jit/jit.h"
#include "nodes/nodeFuncs.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/dsa.h"
Expand Down Expand Up @@ -78,6 +79,7 @@ typedef struct FixedParallelExecutorState
dsa_pointer param_exec;
int eflags;
int jit_flags;
int dirtied_localbufs; /* Just for debugging purposes */
} FixedParallelExecutorState;

/*
Expand Down Expand Up @@ -768,6 +770,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
fpes->param_exec = InvalidDsaPointer;
fpes->eflags = estate->es_top_eflags;
fpes->jit_flags = estate->es_jit_flags;
fpes->dirtied_localbufs = dirtied_localbufs;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);

/* Store query string */
Expand Down Expand Up @@ -1464,6 +1467,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)

/* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
dirtied_localbufs = fpes->dirtied_localbufs;

/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
Expand Down
1 change: 1 addition & 0 deletions src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ CreateExecutorState(void)
estate->es_use_parallel_mode = false;
estate->es_parallel_workers_to_launch = 0;
estate->es_parallel_workers_launched = 0;
estate->es_tempbufs_flushed = false; /* Is the backend's temp buffers were flushed? */

estate->es_jit_flags = 0;
estate->es_jit = NULL;
Expand Down
12 changes: 12 additions & 0 deletions src/backend/executor/nodeGather.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"
#include "utils/wait_event.h"


Expand Down Expand Up @@ -161,6 +162,17 @@ ExecGather(PlanState *pstate)
{
ParallelContext *pcxt;

/*
* Flush temporary buffers if this parallel section contains
* any objects with temporary storage type. Don't bother to do it
* more than once per the query execution.
*/
if (gather->process_temp_tables && !estate->es_tempbufs_flushed)
{
FlushAllBuffers();
estate->es_tempbufs_flushed = true;
}

/* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(outerPlanState(node),
Expand Down
8 changes: 8 additions & 0 deletions src/backend/executor/nodeGatherMerge.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"

/*
* When we read tuples from workers, it's a good idea to read several at once
Expand Down Expand Up @@ -205,6 +206,13 @@ ExecGatherMerge(PlanState *pstate)
{
ParallelContext *pcxt;

/* The same as in the ExecGather */
if (gm->process_temp_tables && !estate->es_tempbufs_flushed)
{
FlushAllBuffers();
estate->es_tempbufs_flushed = true;
}

/* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(outerPlanState(node),
Expand Down
30 changes: 16 additions & 14 deletions src/backend/optimizer/path/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,23 +655,25 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
/* This should only be called for baserels and appendrel children. */
Assert(IS_SIMPLE_REL(rel));

/* Set if the data source refers temp storage somehow */
rel->needs_temp_safety = false;

/* Assorted checks based on rtekind. */
switch (rte->rtekind)
{
case RTE_RELATION:

/*
* Currently, parallel workers can't access the leader's temporary
* tables. We could possibly relax this if we wrote all of its
* local buffers at the start of the query and made no changes
* thereafter (maybe we could allow hint bit changes), and if we
* taught the workers to read them. Writing a large number of
* temporary buffers could be expensive, though, and we don't have
* the rest of the necessary infrastructure right now anyway. So
* for now, bail out if we see a temporary table.
* It is not free to process objects with a temporary storage in
* parallel because we need to flush temporary buffers beforehand.
* So, hide this feature under a GUC.
*/
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
return;
{
if (!extended_parallel_processing)
return;
rel->needs_temp_safety = true;
}

/*
* Table sampling can be pushed down to workers if the sample
Expand All @@ -683,7 +685,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

if (proparallel != PROPARALLEL_SAFE)
return;
if (!is_parallel_safe(root, (Node *) rte->tablesample->args))
if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &rel->needs_temp_safety))
return;
}

Expand Down Expand Up @@ -749,7 +751,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

case RTE_FUNCTION:
/* Check for parallel-restricted functions. */
if (!is_parallel_safe(root, (Node *) rte->functions))
if (!is_parallel_safe(root, (Node *) rte->functions, &rel->needs_temp_safety))
return;
break;

Expand All @@ -759,7 +761,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

case RTE_VALUES:
/* Check for parallel-restricted functions. */
if (!is_parallel_safe(root, (Node *) rte->values_lists))
if (!is_parallel_safe(root, (Node *) rte->values_lists, &rel->needs_temp_safety))
return;
break;

Expand Down Expand Up @@ -800,14 +802,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
* outer join clauses work correctly. It would likely break equivalence
* classes, too.
*/
if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo))
if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &rel->needs_temp_safety))
return;

/*
* Likewise, if the relation's outputs are not parallel-safe, give up.
* (Usually, they're just Vars, but sometimes they're not.)
*/
if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs))
if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &rel->needs_temp_safety))
return;

/* We have a winner. */
Expand Down
26 changes: 26 additions & 0 deletions src/backend/optimizer/path/costsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
#include "optimizer/plancat.h"
#include "optimizer/restrictinfo.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "utils/lsyscache.h"
#include "utils/selfuncs.h"
#include "utils/spccache.h"
Expand All @@ -129,6 +130,7 @@

double seq_page_cost = DEFAULT_SEQ_PAGE_COST;
double random_page_cost = DEFAULT_RANDOM_PAGE_COST;
double write_page_cost = DEFAULT_WRITE_PAGE_COST;
double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
Expand Down Expand Up @@ -164,6 +166,8 @@ bool enable_partition_pruning = true;
bool enable_presorted_aggregate = true;
bool enable_async_append = true;

bool extended_parallel_processing = true;

typedef struct
{
PlannerInfo *root;
Expand Down Expand Up @@ -6653,3 +6657,25 @@ compute_gather_rows(Path *path)

return clamp_row_est(path->rows * get_parallel_divisor(path));
}

/*
* Before the launch parallel workers in a SELECT query, the leader process must
* flush all dirty pages in temp buffers to guarantee equal access to the data
* in each parallel worker.
* It seems difficult to calculate specific set of tables, indexes and toasts
* that may be touched inside the subtree. Moreover, stored procedures may also
* scan temporary tables. So, it makes sense to flush all temporary buffers.
* Here we calculate the cost of such operation to allow small queries do not
* activate expensive parallel scan over temp resources.
*/
Cost
tempbuf_flush_extra_cost()
{
if (!extended_parallel_processing)
/* Fast exit if feature is disabled */
return 0.0;

/* Hopefully, we have an statistics on the number of dirtied buffers */
Assert(dirtied_localbufs >= 0);
return write_page_cost * dirtied_localbufs;
}
9 changes: 7 additions & 2 deletions src/backend/optimizer/path/equivclass.c
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ find_computable_ec_member(PlannerInfo *root,
{
List *emvars;
ListCell *lc2;
bool needs_temp_flush = false;

/*
* We shouldn't be trying to sort by an equivalence class that
Expand Down Expand Up @@ -1049,9 +1050,11 @@ find_computable_ec_member(PlannerInfo *root,
/*
* If requested, reject expressions that are not parallel-safe. We
* check this last because it's a rather expensive test.
* TODO: Not sure if it is really necessary.
*/
if (require_parallel_safe &&
!is_parallel_safe(root, (Node *) em->em_expr))
(!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) ||
needs_temp_flush))
continue;

return em; /* found usable expression */
Expand Down Expand Up @@ -1093,6 +1096,7 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel,
foreach(lc, target->exprs)
{
Expr *targetexpr = (Expr *) lfirst(lc);
bool needs_temp_flush = false;

em = find_ec_member_matching_expr(ec, targetexpr, rel->relids);
if (!em)
Expand All @@ -1112,7 +1116,8 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel,
* check this last because it's a rather expensive test.
*/
if (require_parallel_safe &&
!is_parallel_safe(root, (Node *) em->em_expr))
(!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) ||
needs_temp_flush))
continue;

return true;
Expand Down
31 changes: 21 additions & 10 deletions src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static Plan *create_projection_plan(PlannerInfo *root,
ProjectionPath *best_path,
int flags);
static Plan *inject_projection_plan(Plan *subplan, List *tlist,
bool parallel_safe);
ParallelSafe parallel_safe);
static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags);
static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root,
IncrementalSortPath *best_path, int flags);
Expand Down Expand Up @@ -298,7 +298,8 @@ static Unique *make_unique_from_pathkeys(Plan *lefttree,
List *pathkeys, int numCols,
Relids relids);
static Gather *make_gather(List *qptlist, List *qpqual,
int nworkers, int rescan_param, bool single_copy, Plan *subplan);
int nworkers, int rescan_param, bool single_copy,
Plan *subplan, bool process_temp_tables);
static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy,
List *tlist, Plan *lefttree, Plan *righttree,
List *groupList, Cardinality numGroups);
Expand Down Expand Up @@ -1777,12 +1778,14 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)

tlist = build_path_tlist(root, &best_path->path);

Assert(best_path->subpath->parallel_safe > PARALLEL_UNSAFE);
gather_plan = make_gather(tlist,
NIL,
best_path->num_workers,
assign_special_exec_param(root),
best_path->single_copy,
subplan);
subplan,
best_path->subpath->parallel_safe == NEEDS_TEMP_FLUSH);

copy_generic_path_info(&gather_plan->plan, &best_path->path);

Expand Down Expand Up @@ -1956,7 +1959,7 @@ create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags)
* to apply (since the tlist might be unsafe even if the child plan is safe).
*/
static Plan *
inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe)
inject_projection_plan(Plan *subplan, List *tlist, ParallelSafe parallel_safe)
{
Plan *plan;

Expand Down Expand Up @@ -1988,7 +1991,7 @@ inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe)
* flag of the FDW's own Path node.
*/
Plan *
change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe)
change_plan_targetlist(Plan *subplan, List *tlist, ParallelSafe tlist_parallel_safe)
{
/*
* If the top plan node can't do projections and its existing target list
Expand All @@ -2004,7 +2007,7 @@ change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe)
{
/* Else we can just replace the plan node's tlist */
subplan->targetlist = tlist;
subplan->parallel_safe &= tlist_parallel_safe;
subplan->parallel_safe = tlist_parallel_safe;
}
return subplan;
}
Expand Down Expand Up @@ -4195,7 +4198,8 @@ create_nestloop_plan(PlannerInfo *root,
List *otherclauses;
List *nestParams;
List *outer_tlist;
bool outer_parallel_safe;
ParallelSafe outer_parallel_safe;
bool needs_temp_flush = false;
Relids saveOuterRels = root->curOuterRels;
ListCell *lc;

Expand Down Expand Up @@ -4311,8 +4315,13 @@ create_nestloop_plan(PlannerInfo *root,
true);
outer_tlist = lappend(outer_tlist, tle);
/* ... and track whether tlist is (still) parallel-safe */
if (outer_parallel_safe)
outer_parallel_safe = is_parallel_safe(root, (Node *) phv);
if (outer_parallel_safe > PARALLEL_UNSAFE)
{
if (!is_parallel_safe(root, (Node *) phv, &needs_temp_flush))
outer_parallel_safe = PARALLEL_UNSAFE;
else if (needs_temp_flush)
outer_parallel_safe = NEEDS_TEMP_FLUSH;
}
}
if (outer_tlist != outer_plan->targetlist)
outer_plan = change_plan_targetlist(outer_plan, outer_tlist,
Expand Down Expand Up @@ -6784,7 +6793,8 @@ make_gather(List *qptlist,
int nworkers,
int rescan_param,
bool single_copy,
Plan *subplan)
Plan *subplan,
bool process_temp_tables)
{
Gather *node = makeNode(Gather);
Plan *plan = &node->plan;
Expand All @@ -6798,6 +6808,7 @@ make_gather(List *qptlist,
node->single_copy = single_copy;
node->invisible = false;
node->initParam = NULL;
node->process_temp_tables = process_temp_tables;

return node;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/optimizer/plan/planmain.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ query_planner(PlannerInfo *root,
(root->query_level > 1 ||
debug_parallel_query != DEBUG_PARALLEL_OFF))
final_rel->consider_parallel =
is_parallel_safe(root, parse->jointree->quals);
is_parallel_safe(root, parse->jointree->quals, &final_rel->needs_temp_safety);

/*
* The only path for it is a trivial Result path. We cheat a
Expand Down
Loading