diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 26200c5a3d6e5..40d8fa44c19cf 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -45,6 +45,7 @@ #include "jit/jit.h" #include "nodes/nodeFuncs.h" #include "pgstat.h" +#include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/datum.h" #include "utils/dsa.h" @@ -78,6 +79,7 @@ typedef struct FixedParallelExecutorState dsa_pointer param_exec; int eflags; int jit_flags; + int dirtied_localbufs; /* Just for debugging purposes */ } FixedParallelExecutorState; /* @@ -768,6 +770,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + fpes->dirtied_localbufs = dirtied_localbufs; shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -1464,6 +1467,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + dirtied_localbufs = fpes->dirtied_localbufs; /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index fdc65c2b42b33..09acdb18652d9 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -161,6 +161,7 @@ CreateExecutorState(void) estate->es_use_parallel_mode = false; estate->es_parallel_workers_to_launch = 0; estate->es_parallel_workers_launched = 0; + estate->es_tempbufs_flushed = false; /* Is the backend's temp buffers were flushed? */ estate->es_jit_flags = 0; estate->es_jit = NULL; diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index dc7d1830259f5..572a54df6add2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,6 +36,7 @@ #include "executor/tqueue.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" #include "utils/wait_event.h" @@ -161,6 +162,17 @@ ExecGather(PlanState *pstate) { ParallelContext *pcxt; + /* + * Flush temporary buffers if this parallel section contains + * any objects with temporary storage type. Don't bother to do it + * more than once per the query execution. + */ + if (gather->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index c04522fea4d9e..4232a5a3a0bb4 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -22,6 +22,7 @@ #include "lib/binaryheap.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" /* * When we read tuples from workers, it's a good idea to read several at once @@ -205,6 +206,13 @@ ExecGatherMerge(PlanState *pstate) { ParallelContext *pcxt; + /* The same as in the ExecGather */ + if (gm->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 4c43fd0b19b23..58e64e2063137 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -655,23 +655,25 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, /* This should only be called for baserels and appendrel children. */ Assert(IS_SIMPLE_REL(rel)); + /* Set if the data source refers temp storage somehow */ + rel->needs_temp_safety = false; + /* Assorted checks based on rtekind. */ switch (rte->rtekind) { case RTE_RELATION: /* - * Currently, parallel workers can't access the leader's temporary - * tables. We could possibly relax this if we wrote all of its - * local buffers at the start of the query and made no changes - * thereafter (maybe we could allow hint bit changes), and if we - * taught the workers to read them. Writing a large number of - * temporary buffers could be expensive, though, and we don't have - * the rest of the necessary infrastructure right now anyway. So - * for now, bail out if we see a temporary table. + * It is not free to process objects with a temporary storage in + * parallel because we need to flush temporary buffers beforehand. + * So, hide this feature under a GUC. */ if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP) - return; + { + if (!extended_parallel_processing) + return; + rel->needs_temp_safety = true; + } /* * Table sampling can be pushed down to workers if the sample @@ -683,7 +685,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, if (proparallel != PROPARALLEL_SAFE) return; - if (!is_parallel_safe(root, (Node *) rte->tablesample->args)) + if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &rel->needs_temp_safety)) return; } @@ -749,7 +751,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_FUNCTION: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->functions)) + if (!is_parallel_safe(root, (Node *) rte->functions, &rel->needs_temp_safety)) return; break; @@ -759,7 +761,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_VALUES: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->values_lists)) + if (!is_parallel_safe(root, (Node *) rte->values_lists, &rel->needs_temp_safety)) return; break; @@ -800,14 +802,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * outer join clauses work correctly. It would likely break equivalence * classes, too. */ - if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo)) + if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &rel->needs_temp_safety)) return; /* * Likewise, if the relation's outputs are not parallel-safe, give up. * (Usually, they're just Vars, but sometimes they're not.) */ - if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs)) + if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &rel->needs_temp_safety)) return; /* We have a winner. */ diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index a39cc793b4d82..205baa0dd5cf6 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -104,6 +104,7 @@ #include "optimizer/plancat.h" #include "optimizer/restrictinfo.h" #include "parser/parsetree.h" +#include "storage/bufmgr.h" #include "utils/lsyscache.h" #include "utils/selfuncs.h" #include "utils/spccache.h" @@ -129,6 +130,7 @@ double seq_page_cost = DEFAULT_SEQ_PAGE_COST; double random_page_cost = DEFAULT_RANDOM_PAGE_COST; +double write_page_cost = DEFAULT_WRITE_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; @@ -164,6 +166,8 @@ bool enable_partition_pruning = true; bool enable_presorted_aggregate = true; bool enable_async_append = true; +bool extended_parallel_processing = true; + typedef struct { PlannerInfo *root; @@ -6653,3 +6657,25 @@ compute_gather_rows(Path *path) return clamp_row_est(path->rows * get_parallel_divisor(path)); } + +/* + * Before the launch parallel workers in a SELECT query, the leader process must + * flush all dirty pages in temp buffers to guarantee equal access to the data + * in each parallel worker. + * It seems difficult to calculate specific set of tables, indexes and toasts + * that may be touched inside the subtree. Moreover, stored procedures may also + * scan temporary tables. So, it makes sense to flush all temporary buffers. + * Here we calculate the cost of such operation to allow small queries do not + * activate expensive parallel scan over temp resources. + */ +Cost +tempbuf_flush_extra_cost() +{ + if (!extended_parallel_processing) + /* Fast exit if feature is disabled */ + return 0.0; + + /* Hopefully, we have an statistics on the number of dirtied buffers */ + Assert(dirtied_localbufs >= 0); + return write_page_cost * dirtied_localbufs; +} diff --git a/src/backend/optimizer/path/equivclass.c b/src/backend/optimizer/path/equivclass.c index 441f12f6c50cf..1573ffc5ce0b2 100644 --- a/src/backend/optimizer/path/equivclass.c +++ b/src/backend/optimizer/path/equivclass.c @@ -1015,6 +1015,7 @@ find_computable_ec_member(PlannerInfo *root, { List *emvars; ListCell *lc2; + bool needs_temp_flush = false; /* * We shouldn't be trying to sort by an equivalence class that @@ -1049,9 +1050,11 @@ find_computable_ec_member(PlannerInfo *root, /* * If requested, reject expressions that are not parallel-safe. We * check this last because it's a rather expensive test. + * TODO: Not sure if it is really necessary. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return em; /* found usable expression */ @@ -1093,6 +1096,7 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, foreach(lc, target->exprs) { Expr *targetexpr = (Expr *) lfirst(lc); + bool needs_temp_flush = false; em = find_ec_member_matching_expr(ec, targetexpr, rel->relids); if (!em) @@ -1112,7 +1116,8 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, * check this last because it's a rather expensive test. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return true; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index bc417f9384018..4997cd2722d5a 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -100,7 +100,7 @@ static Plan *create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags); static Plan *inject_projection_plan(Plan *subplan, List *tlist, - bool parallel_safe); + ParallelSafe parallel_safe); static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags); static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path, int flags); @@ -298,7 +298,8 @@ static Unique *make_unique_from_pathkeys(Plan *lefttree, List *pathkeys, int numCols, Relids relids); static Gather *make_gather(List *qptlist, List *qpqual, - int nworkers, int rescan_param, bool single_copy, Plan *subplan); + int nworkers, int rescan_param, bool single_copy, + Plan *subplan, bool process_temp_tables); static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, List *tlist, Plan *lefttree, Plan *righttree, List *groupList, Cardinality numGroups); @@ -1777,12 +1778,14 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) tlist = build_path_tlist(root, &best_path->path); + Assert(best_path->subpath->parallel_safe > PARALLEL_UNSAFE); gather_plan = make_gather(tlist, NIL, best_path->num_workers, assign_special_exec_param(root), best_path->single_copy, - subplan); + subplan, + best_path->subpath->parallel_safe == NEEDS_TEMP_FLUSH); copy_generic_path_info(&gather_plan->plan, &best_path->path); @@ -1956,7 +1959,7 @@ create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags) * to apply (since the tlist might be unsafe even if the child plan is safe). */ static Plan * -inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) +inject_projection_plan(Plan *subplan, List *tlist, ParallelSafe parallel_safe) { Plan *plan; @@ -1988,7 +1991,7 @@ inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) * flag of the FDW's own Path node. */ Plan * -change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) +change_plan_targetlist(Plan *subplan, List *tlist, ParallelSafe tlist_parallel_safe) { /* * If the top plan node can't do projections and its existing target list @@ -2004,7 +2007,7 @@ change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) { /* Else we can just replace the plan node's tlist */ subplan->targetlist = tlist; - subplan->parallel_safe &= tlist_parallel_safe; + subplan->parallel_safe = tlist_parallel_safe; } return subplan; } @@ -4195,7 +4198,8 @@ create_nestloop_plan(PlannerInfo *root, List *otherclauses; List *nestParams; List *outer_tlist; - bool outer_parallel_safe; + ParallelSafe outer_parallel_safe; + bool needs_temp_flush = false; Relids saveOuterRels = root->curOuterRels; ListCell *lc; @@ -4311,8 +4315,13 @@ create_nestloop_plan(PlannerInfo *root, true); outer_tlist = lappend(outer_tlist, tle); /* ... and track whether tlist is (still) parallel-safe */ - if (outer_parallel_safe) - outer_parallel_safe = is_parallel_safe(root, (Node *) phv); + if (outer_parallel_safe > PARALLEL_UNSAFE) + { + if (!is_parallel_safe(root, (Node *) phv, &needs_temp_flush)) + outer_parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + outer_parallel_safe = NEEDS_TEMP_FLUSH; + } } if (outer_tlist != outer_plan->targetlist) outer_plan = change_plan_targetlist(outer_plan, outer_tlist, @@ -6784,7 +6793,8 @@ make_gather(List *qptlist, int nworkers, int rescan_param, bool single_copy, - Plan *subplan) + Plan *subplan, + bool process_temp_tables) { Gather *node = makeNode(Gather); Plan *plan = &node->plan; @@ -6798,6 +6808,7 @@ make_gather(List *qptlist, node->single_copy = single_copy; node->invisible = false; node->initParam = NULL; + node->process_temp_tables = process_temp_tables; return node; } diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c index eefc486a5666c..af9492e20fa96 100644 --- a/src/backend/optimizer/plan/planmain.c +++ b/src/backend/optimizer/plan/planmain.c @@ -126,7 +126,7 @@ query_planner(PlannerInfo *root, (root->query_level > 1 || debug_parallel_query != DEBUG_PARALLEL_OFF)) final_rel->consider_parallel = - is_parallel_safe(root, parse->jointree->quals); + is_parallel_safe(root, parse->jointree->quals, &final_rel->needs_temp_safety); /* * The only path for it is a trivial Result path. We cheat a diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1268ea92b6f0d..4f2bec2f5cd2c 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -514,6 +514,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->num_workers = 1; gather->single_copy = true; gather->invisible = (debug_parallel_query == DEBUG_PARALLEL_REGRESS); + gather->process_temp_tables = (best_path->parallel_safe == NEEDS_TEMP_FLUSH); /* Transfer any initPlans to the new top node */ gather->plan.initPlan = top_plan->initPlan; @@ -536,7 +537,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->plan.plan_rows = top_plan->plan_rows; gather->plan.plan_width = top_plan->plan_width; gather->plan.parallel_aware = false; - gather->plan.parallel_safe = false; + gather->plan.parallel_safe = PARALLEL_UNSAFE; /* * Delete the initplans' cost from top_plan. We needn't add it to the @@ -1473,6 +1474,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, List *final_targets; List *final_targets_contain_srfs; bool final_target_parallel_safe; + bool needs_temp_flush = false; RelOptInfo *current_rel; RelOptInfo *final_rel; FinalPathExtraData extra; @@ -1524,7 +1526,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, /* And check whether it's parallel safe */ final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* The setop result tlist couldn't contain any SRFs */ Assert(!parse->hasTargetSRFs); @@ -1694,7 +1696,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, */ final_target = create_pathtarget(root, root->processed_tlist); final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* * If ORDER BY was given, consider whether we should use a post-sort @@ -1707,7 +1709,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, &have_postponed_srfs); sort_input_target_parallel_safe = - is_parallel_safe(root, (Node *) sort_input_target->exprs); + is_parallel_safe(root, (Node *) sort_input_target->exprs, &needs_temp_flush); } else { @@ -1726,7 +1728,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, activeWindows); grouping_target_parallel_safe = - is_parallel_safe(root, (Node *) grouping_target->exprs); + is_parallel_safe(root, (Node *) grouping_target->exprs, &needs_temp_flush); } else { @@ -1745,7 +1747,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, { scanjoin_target = make_group_input_target(root, final_target); scanjoin_target_parallel_safe = - is_parallel_safe(root, (Node *) scanjoin_target->exprs); + is_parallel_safe(root, (Node *) scanjoin_target->exprs, &needs_temp_flush); } else { @@ -1797,6 +1799,18 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, scanjoin_targets_contain_srfs = NIL; } + /* + * Each path may have individual target containing or not references to + * relations with temporary storages. There were attempts to do it + * smartly that end up with a new Target::needs_temp_flush field that + * seems too invasive for this first attempt. + * So, just set current_rel flag as needed for temp buffers flushing and + * let Gather to do the job earlier than it could be. + * XXX: we need to be sure that no one new path created with all these + * target lists till now. + */ + current_rel->needs_temp_safety |= needs_temp_flush; + /* Apply scan/join target. */ scanjoin_target_same_exprs = list_length(scanjoin_targets) == 1 && equal(scanjoin_target->exprs, current_rel->reltarget->exprs); @@ -1905,9 +1919,13 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, * query. */ if (current_rel->consider_parallel && - is_parallel_safe(root, parse->limitOffset) && - is_parallel_safe(root, parse->limitCount)) + is_parallel_safe(root, parse->limitOffset, &needs_temp_flush) && + is_parallel_safe(root, parse->limitCount, &needs_temp_flush)) + { final_rel->consider_parallel = true; + final_rel->needs_temp_safety |= + current_rel->needs_temp_safety | needs_temp_flush; + } /* * If the current_rel belongs to a single FDW, so does the final_rel. @@ -3950,8 +3968,11 @@ make_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, * target list and HAVING quals are parallel-safe. */ if (input_rel->consider_parallel && target_parallel_safe && - is_parallel_safe(root, havingQual)) + is_parallel_safe(root, (Node *) havingQual, &grouped_rel->needs_temp_safety)) + { grouped_rel->consider_parallel = true; + grouped_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the grouped rel. @@ -4570,8 +4591,11 @@ create_window_paths(PlannerInfo *root, * target list and active windows for non-parallel-safe constructs. */ if (input_rel->consider_parallel && output_target_parallel_safe && - is_parallel_safe(root, (Node *) activeWindows)) + is_parallel_safe(root, (Node *) activeWindows, &window_rel->needs_temp_safety)) + { window_rel->consider_parallel = true; + window_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the window rel. @@ -7033,10 +7057,12 @@ plan_create_index_workers(Oid tableOid, Oid indexOid) * Currently, parallel workers can't access the leader's temporary tables. * Furthermore, any index predicate or index expressions must be parallel * safe. + * TODO: Is this hard to enable? */ if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP || - !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) || - !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index))) + !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index), &rel->needs_temp_safety) || + !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index), &rel->needs_temp_safety) || + rel->needs_temp_safety) { parallel_workers = 0; goto done; @@ -8822,7 +8848,8 @@ create_partial_unique_paths(PlannerInfo *root, RelOptInfo *input_rel, * nothing to do if there's anything in the targetlist that's * parallel-restricted. */ - if (!is_parallel_safe(root, (Node *) unique_rel->reltarget->exprs)) + if (!is_parallel_safe(root, (Node *) unique_rel->reltarget->exprs, + &unique_rel->needs_temp_safety)) return; cheapest_partial_path = linitial(input_rel->partial_pathlist); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index cd7ea1e6b5873..72f5efd747600 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1582,7 +1582,7 @@ clean_up_removed_plan_level(Plan *parent, Plan *child) child->startup_cost += initplan_cost; child->total_cost += initplan_cost; if (unsafe_initplans) - child->parallel_safe = false; + child->parallel_safe = PARALLEL_UNSAFE; /* * Attach plans this way so that parent's initplans are processed diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index ff63d20f8d536..cd1061e339d43 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -1009,7 +1009,7 @@ SS_process_ctes(PlannerInfo *root) * CTE scans are not considered for parallelism (cf * set_rel_consider_parallel). */ - splan->parallel_safe = false; + splan->parallel_safe = PARALLEL_UNSAFE; splan->setParam = NIL; splan->parParam = NIL; splan->args = NIL; @@ -2308,7 +2308,7 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) path->startup_cost += initplan_cost; path->total_cost += initplan_cost; if (unsafe_initplans) - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } /* diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 67b7de16fc5c3..9f82b5189da31 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -95,6 +95,7 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool hasTempObject; } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -760,13 +761,17 @@ max_parallel_hazard(Query *parse) * * root->glob->maxParallelHazard must previously have been set to the * result of max_parallel_hazard() on the whole query. + * + * Expression may contain a reference to subplan that employs temporary + * relations. That's why the flag needs_temp_flush is introduced. */ bool -is_parallel_safe(PlannerInfo *root, Node *node) +is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush) { max_parallel_hazard_context context; PlannerInfo *proot; ListCell *l; + bool is_safe; /* * Even if the original querytree contained nothing unsafe, we need to @@ -798,7 +803,20 @@ is_parallel_safe(PlannerInfo *root, Node *node) } } - return !max_parallel_hazard_walker(node, &context); + is_safe = !max_parallel_hazard_walker(node, &context); + + /* + * If the expression is parallel-safe, detect if it needs temp buffers + * flushing before the execution start. Don't care changing it if + * the expression is unsafe - it can't be executed by parallel workers + * anyway. + * In some cases user is interested in only negative result to test an idea. + * So, if incoming poointer is NULL, skip this step. + */ + if (needs_temp_flush && is_safe && context.hasTempObject) + *needs_temp_flush = NEEDS_TEMP_FLUSH; + + return is_safe; } /* core logic for all parallel-hazard checks */ @@ -920,6 +938,8 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) return true; save_safe_param_ids = context->safe_param_ids; + context->hasTempObject = + context->hasTempObject || (subplan->parallel_safe == NEEDS_TEMP_FLUSH); context->safe_param_ids = list_concat_copy(context->safe_param_ids, subplan->paramIds); if (max_parallel_hazard_walker(subplan->testexpr, context)) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index b6be4ddbd01b2..233495c219e67 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -69,6 +69,12 @@ static bool pathlist_is_reparameterizable_by_child(List *pathlist, int compare_path_costs(Path *path1, Path *path2, CostSelector criterion) { + Cost startup_cost1 = path1->startup_cost; + Cost startup_cost2 = path2->startup_cost; + Cost total_cost1 = path1->total_cost; + Cost total_cost2 = path2->total_cost; + Cost extra_cost = tempbuf_flush_extra_cost(); + /* Number of disabled nodes, if different, trumps all else. */ if (unlikely(path1->disabled_nodes != path2->disabled_nodes)) { @@ -78,35 +84,50 @@ compare_path_costs(Path *path1, Path *path2, CostSelector criterion) return +1; } + /* + * Add an extra cost of temporary buffers flushing fofr the time + * of comparison only. + */ + if (path1->parallel_safe == NEEDS_TEMP_FLUSH) + { + startup_cost1 += extra_cost; + total_cost1 += extra_cost; + } + if (path2->parallel_safe == NEEDS_TEMP_FLUSH) + { + startup_cost2 += extra_cost; + total_cost2 += extra_cost; + } + if (criterion == STARTUP_COST) { - if (path1->startup_cost < path2->startup_cost) + if (startup_cost1 < startup_cost2) return -1; - if (path1->startup_cost > path2->startup_cost) + if (startup_cost1 > startup_cost2) return +1; /* * If paths have the same startup cost (not at all unlikely), order * them by total cost. */ - if (path1->total_cost < path2->total_cost) + if (total_cost1 < total_cost2) return -1; - if (path1->total_cost > path2->total_cost) + if (total_cost1 > total_cost2) return +1; } else { - if (path1->total_cost < path2->total_cost) + if (total_cost1 < total_cost2) return -1; - if (path1->total_cost > path2->total_cost) + if (total_cost1 > total_cost2) return +1; /* * If paths have the same total cost, order them by startup cost. */ - if (path1->startup_cost < path2->startup_cost) + if (startup_cost1 < startup_cost2) return -1; - if (path1->startup_cost > path2->startup_cost) + if (startup_cost1 > startup_cost2) return +1; } return 0; @@ -969,6 +990,17 @@ add_partial_path_precheck(RelOptInfo *parent_rel, int disabled_nodes, return true; } +static inline ParallelSafe +parallel_safety(RelOptInfo *rel) +{ + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + return NEEDS_TEMP_FLUSH; + + return PARALLEL_SAFE; +} /***************************************************************************** * PATH NODE CREATION ROUTINES @@ -991,7 +1023,7 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = (parallel_workers > 0); - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = parallel_workers; pathnode->pathkeys = NIL; /* seqscan has unordered result */ @@ -1015,7 +1047,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* samplescan has unordered result */ @@ -1067,7 +1099,7 @@ create_index_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; @@ -1110,7 +1142,7 @@ create_bitmap_heap_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = (parallel_degree > 0); - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_degree; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1162,7 +1194,7 @@ create_bitmap_and_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1214,7 +1246,7 @@ create_bitmap_or_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1243,7 +1275,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1273,7 +1305,7 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = (parallel_workers > 0); - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1334,7 +1366,7 @@ create_append_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = parallel_aware; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -1375,8 +1407,8 @@ create_append_path(PlannerInfo *root, { Path *subpath = (Path *) lfirst(l); - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); /* All child paths must have same parameterization */ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); @@ -1492,7 +1524,7 @@ create_merge_append_path(PlannerInfo *root, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; pathnode->subpaths = subpaths; @@ -1524,8 +1556,8 @@ create_merge_append_path(PlannerInfo *root, Assert(bms_is_empty(PATH_REQ_OUTER(subpath))); pathnode->path.rows += subpath->rows; - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); if (!pathkeys_count_contained_in(pathkeys, subpath->pathkeys, &presorted_keys)) @@ -1617,7 +1649,7 @@ create_group_result_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target; pathnode->path.param_info = NULL; /* there are no other rels... */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; pathnode->quals = havingqual; @@ -1666,8 +1698,7 @@ create_material_path(RelOptInfo *rel, Path *subpath) pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1701,8 +1732,8 @@ create_memoize_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1813,7 +1844,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* Gather has unordered result */ @@ -1856,8 +1887,8 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; pathnode->subpath = subpath; @@ -1885,7 +1916,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -1911,7 +1942,7 @@ create_tablefuncscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1937,7 +1968,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1963,7 +1994,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -1989,7 +2020,7 @@ create_namedtuplestorescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2015,7 +2046,7 @@ create_resultscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2041,7 +2072,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2084,7 +2115,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2138,7 +2169,7 @@ create_foreign_join_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; /* XXX see above */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2187,7 +2218,7 @@ create_foreign_upper_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2351,8 +2382,9 @@ create_nestloop_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2417,8 +2449,9 @@ create_mergejoin_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2483,8 +2516,9 @@ create_hashjoin_path(PlannerInfo *root, &restrict_clauses); pathnode->jpath.path.parallel_aware = joinrel->consider_parallel && parallel_hash; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; @@ -2513,6 +2547,33 @@ create_hashjoin_path(PlannerInfo *root, return pathnode; } +static inline ParallelSafe +compute_parallel_safety(PlannerInfo *root, RelOptInfo *rel, + PathTarget *target, Path *subpath) +{ + ParallelSafe level = PARALLEL_SAFE; + bool needs_temp_flush = false; + + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + level = NEEDS_TEMP_FLUSH; + + if (subpath) + level = Min(level, subpath->parallel_safe); + + if (target) + { + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush)) + return PARALLEL_UNSAFE; + + if (needs_temp_flush) + level = Min(level, NEEDS_TEMP_FLUSH); + } + return level; +} + /* * create_projection_path * Creates a pathnode that represents performing a projection. @@ -2551,9 +2612,9 @@ create_projection_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); + pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2661,9 +2722,12 @@ apply_projection_to_path(PlannerInfo *root, * arrange for the subpath to return the required target list so that * workers can help project. But if there is something that is not * parallel-safe in the target expressions, then we can't. + * + * XXX: don't need flag here because create_projection_path will check the + * target safety anyway. */ if ((IsA(path, GatherPath) || IsA(path, GatherMergePath)) && - is_parallel_safe(root, (Node *) target->exprs)) + is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We always use create_projection_path here, even if the subpath is @@ -2697,14 +2761,14 @@ apply_projection_to_path(PlannerInfo *root, } } else if (path->parallel_safe && - !is_parallel_safe(root, (Node *) target->exprs)) + !is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We're inserting a parallel-restricted target list into a path * currently marked parallel-safe, so we have to mark it as no longer * safe. */ - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } return path; @@ -2735,9 +2799,7 @@ create_set_projection_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order XXX? */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2806,8 +2868,7 @@ create_incremental_sort_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2853,8 +2914,7 @@ create_sort_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2899,8 +2959,7 @@ create_group_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Group doesn't change sort ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2954,8 +3013,7 @@ create_unique_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Unique doesn't change the input ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3010,8 +3068,7 @@ create_agg_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; if (aggstrategy == AGG_SORTED) @@ -3094,8 +3151,7 @@ create_groupingsets_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->subpath = subpath; @@ -3255,7 +3311,7 @@ create_minmaxagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = true; /* might change below */ + pathnode->path.parallel_safe = PARALLEL_SAFE; /* might change below */ pathnode->path.parallel_workers = 0; /* Result is one unordered row */ pathnode->path.rows = 1; @@ -3273,7 +3329,7 @@ create_minmaxagg_path(PlannerInfo *root, initplan_disabled_nodes += mminfo->path->disabled_nodes; initplan_cost += mminfo->pathcost; if (!mminfo->path->parallel_safe) - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; } /* add tlist eval cost for each output row, plus cpu_tuple_cost */ @@ -3301,10 +3357,16 @@ create_minmaxagg_path(PlannerInfo *root, * we are in a subquery then it can be useful for the outer query to know * that this one is parallel-safe.) */ - if (pathnode->path.parallel_safe) - pathnode->path.parallel_safe = - is_parallel_safe(root, (Node *) target->exprs) && - is_parallel_safe(root, (Node *) quals); + if (pathnode->path.parallel_safe > PARALLEL_UNSAFE) + { + bool needs_temp_flush = false; + + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush) || + !is_parallel_safe(root, (Node *) quals, &needs_temp_flush)) + pathnode->path.parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + pathnode->path.parallel_safe = NEEDS_TEMP_FLUSH; + } return pathnode; } @@ -3349,8 +3411,7 @@ create_windowagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* WindowAgg preserves the input sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3419,8 +3480,7 @@ create_setop_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); pathnode->path.parallel_workers = leftpath->parallel_workers + rightpath->parallel_workers; /* SetOp preserves the input sort order if in sort mode */ @@ -3537,8 +3597,7 @@ create_recursiveunion_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); /* Foolish, but we'll do it like joins for now: */ pathnode->path.parallel_workers = leftpath->parallel_workers; /* RecursiveUnion result is always unsorted */ @@ -3577,7 +3636,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.rows = subpath->rows; @@ -3656,7 +3715,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; @@ -3742,8 +3801,7 @@ create_limit_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.rows = subpath->rows; pathnode->path.disabled_nodes = subpath->disabled_nodes; diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index 405f4dae1092a..5514c1ba73542 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -225,6 +225,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptInfo *parent) rel->consider_startup = (root->tuple_fraction > 0); rel->consider_param_startup = false; /* might get changed later */ rel->consider_parallel = false; /* might get changed later */ + rel->needs_temp_safety = false; /* might get changed later */ rel->reltarget = create_empty_pathtarget(); rel->pathlist = NIL; rel->ppilist = NIL; @@ -822,6 +823,7 @@ build_join_rel(PlannerInfo *root, joinrel->consider_startup = (root->tuple_fraction > 0); joinrel->consider_param_startup = false; joinrel->consider_parallel = false; + joinrel->needs_temp_safety = false; joinrel->reltarget = create_empty_pathtarget(); joinrel->pathlist = NIL; joinrel->ppilist = NIL; @@ -959,9 +961,13 @@ build_join_rel(PlannerInfo *root, * here. */ if (inner_rel->consider_parallel && outer_rel->consider_parallel && - is_parallel_safe(root, (Node *) restrictlist) && - is_parallel_safe(root, (Node *) joinrel->reltarget->exprs)) + is_parallel_safe(root, (Node *) restrictlist, &joinrel->needs_temp_safety) && + is_parallel_safe(root, (Node *) joinrel->reltarget->exprs, &joinrel->needs_temp_safety)) + { joinrel->consider_parallel = true; + joinrel->needs_temp_safety |= + (inner_rel->needs_temp_safety | outer_rel->needs_temp_safety); + } /* Add the joinrel to the PlannerInfo. */ add_join_rel(root, joinrel); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index eb55102b0d7fe..301b2ecd8828d 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5112,6 +5112,12 @@ FlushRelationBuffers(Relation rel) } } +void +FlushAllBuffers(void) +{ + FlushAllLocalBuffers(); +} + /* --------------------------------------------------------------------- * FlushRelationsAllBuffers * diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 15aac7d1c9fe4..1bde8738a2d1b 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -44,6 +44,10 @@ typedef struct int NLocBuffer = 0; /* until buffers are initialized */ + +int allocated_localbufs = 0; +int dirtied_localbufs = 0; + BufferDesc *LocalBufferDescriptors = NULL; Block *LocalBufferBlockPointers = NULL; int32 *LocalRefCount = NULL; @@ -185,6 +189,12 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) instr_time io_start; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); + /* + * Parallel temp table scan allows an access to temp tables. So, to be + * paranoid enough we should check it each time, flushing local buffer. + */ + Assert(!IsParallelWorker()); + Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0); /* @@ -509,7 +519,10 @@ MarkLocalBufferDirty(Buffer buffer) buf_state = pg_atomic_read_u32(&bufHdr->state); if (!(buf_state & BM_DIRTY)) + { pgBufferUsage.local_blks_dirtied++; + dirtied_localbufs++; + } buf_state |= BM_DIRTY; @@ -570,6 +583,12 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit /* Clear earlier errors, if this IO failed, it'll be marked again */ buf_state &= ~BM_IO_ERROR; + if (buf_state & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + if (clear_dirty) buf_state &= ~BM_DIRTY; @@ -609,6 +628,12 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) uint32 buf_state; LocalBufferLookupEnt *hresult; + if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + /* * It's possible that we started IO on this buffer before e.g. aborting * the transaction that created a table. We need to wait for that IO to @@ -731,19 +756,6 @@ InitLocalBuffers(void) HASHCTL info; int i; - /* - * Parallel workers can't access data in temporary tables, because they - * have no visibility into the local buffers of their leader. This is a - * convenient, low-cost place to provide a backstop check for that. Note - * that we don't wish to prevent a parallel worker from accessing catalog - * metadata about a temp table, so checks at higher levels would be - * inappropriate. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot access temporary tables during a parallel operation"))); - /* Allocate and zero buffer headers and auxiliary arrays */ LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); @@ -947,6 +959,7 @@ GetLocalBufferStorage(void) this_buf = cur_block + next_buf_in_block * BLCKSZ; next_buf_in_block++; total_bufs_allocated++; + allocated_localbufs++; /* * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it @@ -1020,3 +1033,36 @@ AtProcExit_LocalBuffers(void) */ CheckForLocalBufferLeaks(); } + +/* + * Flush each temporary buffer page to the disk. + * + * It is costly operation needed solely to let temporary tables, indexes and + * 'toasts' participate in a parallel query plan. + */ +void +FlushAllLocalBuffers(void) +{ + int i; + + for (i = 0; i < NLocBuffer; i++) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(i); + uint32 buf_state; + + if (LocalBufHdrGetBlock(bufHdr) == NULL) + continue; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + /* XXX only valid dirty pages need to be flushed? */ + if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) + { + PinLocalBuffer(bufHdr, false); + FlushLocalBuffer(bufHdr, NULL); + UnpinLocalBuffer(BufferDescriptorGetBuffer(bufHdr)); + } + } + + Assert(dirtied_localbufs == 0); +} diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2d0cb7bcfd4a6..32d548677e2b8 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -2132,6 +2132,10 @@ RelationIdGetRelation(Oid relationId) Assert(rd->rd_isvalid || (rd->rd_isnailed && !criticalRelcachesBuilt)); } + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } @@ -2142,6 +2146,10 @@ RelationIdGetRelation(Oid relationId) rd = RelationBuildDesc(relationId, true); if (RelationIsValid(rd)) RelationIncrementReferenceCount(rd); + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index ac0c7c36c5617..f9e37f8b7c2c3 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1024,6 +1024,13 @@ boot_val => '"$system"', }, +{ name => 'extended_parallel_processing', type => 'bool', context => 'PGC_BACKEND', group => 'QUERY_TUNING_METHOD', + short_desc => 'Enable extra features of parallel pocessing.', + flags => 'GUC_EXPLAIN', + variable => 'extended_parallel_processing', + boot_val => 'true', +}, + { name => 'external_pid_file', type => 'string', context => 'PGC_POSTMASTER', group => 'FILE_LOCATIONS', short_desc => 'Writes the postmaster PID to the specified file.', flags => 'GUC_SUPERUSER_ONLY', @@ -3495,6 +3502,15 @@ max => 'MAX_KILOBYTES', }, + +{ name => 'write_page_cost', type => 'real', context => 'PGC_USERSET', group => 'QUERY_TUNING_COST', + short_desc => 'Sets the planner\'s estimate of the cost of a disk page flushing.', + flags => 'GUC_EXPLAIN', + variable => 'write_page_cost', + boot_val => 'DEFAULT_WRITE_PAGE_COST', + min => '0', + max => 'DBL_MAX', +}, { name => 'xmlbinary', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', short_desc => 'Sets how binary values are to be encoded in XML.', variable => 'xmlbinary', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dc9e2255f8a7f..a3cd0d03eb3b0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -429,11 +429,13 @@ #enable_distinct_reordering = on #enable_self_join_elimination = on #enable_eager_aggregate = on +#extended_parallel_processing = on # - Planner Cost Constants - #seq_page_cost = 1.0 # measured on an arbitrary scale #random_page_cost = 4.0 # same scale as above +#write_page_cost = 5.0 # same scale as above #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3968429f99194..8c2c9727aaaff 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -743,6 +743,8 @@ typedef struct EState bool es_use_parallel_mode; /* can we use parallel workers? */ + bool es_tempbufs_flushed; /* Do we still need to flush dirty temp pages? */ + int es_parallel_workers_to_launch; /* number of workers to * launch. */ int es_parallel_workers_launched; /* number of workers actually diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index b5ff456ef7fab..a01e72cf5effe 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -941,6 +941,10 @@ typedef struct RelOptInfo bool consider_param_startup; /* consider parallel paths? */ bool consider_parallel; + /* If the rel is allowed to be processed in parallel, does it need to flush + * temporary buffers? + */ + bool needs_temp_safety; /* * default result targetlist for Paths scanning this relation; list of @@ -1899,7 +1903,7 @@ typedef struct Path /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* desired # of workers; 0 = not parallel */ int parallel_workers; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index c4393a9432116..c9989a862cca7 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -212,7 +212,7 @@ typedef struct Plan /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* * information needed for asynchronous execution @@ -1343,6 +1343,8 @@ typedef struct Gather bool single_copy; /* suppress EXPLAIN display (for testing)? */ bool invisible; + /* Signal if any object with temporary storage is scanned in this subtree */ + bool process_temp_tables; /* * param id's of initplans which are referred at gather or one of its @@ -1382,6 +1384,9 @@ typedef struct GatherMerge /* NULLS FIRST/LAST directions */ bool *nullsFirst pg_node_attr(array_size(numCols)); + /* Signal if any objects with temporary storage are scanned in this subtree */ + bool process_temp_tables; + /* * param id's of initplans which are referred at gather merge or one of * its child nodes diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 1b4436f2ff6d4..d20b83cfb0647 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -1050,6 +1050,18 @@ typedef struct SubLink ParseLoc location; /* token location, or -1 if unknown */ } SubLink; +/* + * Start from zero and put NEEDS_TEMP_FLUSH as a first positive value. + * In this case if someone still uses true/false values for this type it just + * causes more temp buffers flushes without an error. + */ +typedef enum ParallelSafe +{ + PARALLEL_UNSAFE = 0, + NEEDS_TEMP_FLUSH, + PARALLEL_SAFE, +} ParallelSafe; + /* * SubPlan - executable expression node for a subplan (sub-SELECT) * @@ -1114,7 +1126,7 @@ typedef struct SubPlan bool unknownEqFalse; /* true if it's okay to return FALSE when the * spec result is UNKNOWN; this allows much * simpler handling of null values */ - bool parallel_safe; /* is the subplan parallel-safe? */ + ParallelSafe parallel_safe; /* is the subplan parallel-safe? */ /* Note: parallel_safe does not consider contents of testexpr or args */ /* Information for passing params into and out of the subselect: */ /* setParam and parParam are lists of integers (param IDs) */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index fc38eae5c5a0f..e9f72dcd9cc95 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -33,7 +33,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); extern bool contain_subplans(Node *clause); extern char max_parallel_hazard(Query *parse); -extern bool is_parallel_safe(PlannerInfo *root, Node *node); +extern bool is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); extern bool contain_leaked_vars(Node *clause); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index b523bcda8f3d0..d68d8b8b77470 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -23,6 +23,7 @@ /* If you change these, update backend/utils/misc/postgresql.conf.sample */ #define DEFAULT_SEQ_PAGE_COST 1.0 #define DEFAULT_RANDOM_PAGE_COST 4.0 +#define DEFAULT_WRITE_PAGE_COST 5.0 /* Make it a little more than random read. */ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 @@ -222,5 +223,6 @@ extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual, double loop_count, Cost *cost_p, double *tuples_p); extern double compute_gather_rows(Path *path); +extern Cost tempbuf_flush_extra_cost(void); #endif /* COST_H */ diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 44ec5296a183f..bd264d3ebe13e 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -72,6 +72,7 @@ extern Selectivity clauselist_selectivity_ext(PlannerInfo *root, /* widely used cost parameters */ extern PGDLLIMPORT double seq_page_cost; extern PGDLLIMPORT double random_page_cost; +extern PGDLLIMPORT double write_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; @@ -80,6 +81,12 @@ extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT double recursive_worktable_factor; extern PGDLLIMPORT int effective_cache_size; +/* + * Enable extended feature of parallel query processing such as parallel + * temporary tables scan. + */ +extern PGDLLIMPORT bool extended_parallel_processing; + extern double clamp_row_est(double nrows); extern int32 clamp_width_est(int64 tuple_width); diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 00addf1599250..0eb417883acdd 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -45,7 +45,7 @@ extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, List *fdw_scan_tlist, List *fdw_recheck_quals, Plan *outer_plan); extern Plan *change_plan_targetlist(Plan *subplan, List *tlist, - bool tlist_parallel_safe); + ParallelSafe tlist_parallel_safe); extern Plan *materialize_finished_plan(Plan *subplan); extern bool is_projection_capable_path(Path *path); extern bool is_projection_capable_plan(Plan *plan); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 5400c56a965f0..0dd00415b44c6 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -542,6 +542,7 @@ extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, bool release_aio); extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); +extern void FlushAllLocalBuffers(void); extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced); extern void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum, int nforks, diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 97c1124c12acf..1084c1e115a8d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -188,6 +188,11 @@ extern PGDLLIMPORT char *BufferBlocks; /* in localbuf.c */ extern PGDLLIMPORT int NLocBuffer; + +/* Local buffer statistics */ +extern PGDLLIMPORT int allocated_localbufs; +extern PGDLLIMPORT int dirtied_localbufs; + extern PGDLLIMPORT Block *LocalBufferBlockPointers; extern PGDLLIMPORT int32 *LocalRefCount; @@ -279,6 +284,7 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum); extern void FlushOneBuffer(Buffer buffer); extern void FlushRelationBuffers(Relation rel); +extern void FlushAllBuffers(void); extern void FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels); extern void CreateAndCopyRelationData(RelFileLocator src_rlocator, RelFileLocator dst_rlocator, diff --git a/src/test/regress/expected/temp.out b/src/test/regress/expected/temp.out index a50c7ae88a9c8..c7ccc71ca7e47 100644 --- a/src/test/regress/expected/temp.out +++ b/src/test/regress/expected/temp.out @@ -566,3 +566,57 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; +SET max_parallel_workers_per_gather = 1; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Seq Scan on test (actual rows=50.00 loops=2) +(4 rows) + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Index Only Scan using idx1 on test (actual rows=50.00 loops=2) + Heap Fetches: 0 + Index Searches: 1 +(6 rows) + +RESET enable_seqscan; +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + view_a +-------- +(0 rows) + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; diff --git a/src/test/regress/sql/temp.sql b/src/test/regress/sql/temp.sql index d50472ddced89..65c519bdabdec 100644 --- a/src/test/regress/sql/temp.sql +++ b/src/test/regress/sql/temp.sql @@ -418,3 +418,42 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); + +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; + +SET max_parallel_workers_per_gather = 1; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; + +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +RESET enable_seqscan; + +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost;