Skip to content

Commit 082fd6b

Browse files
lss602726449oppenheimer01
authored andcommitted
fix merge
1 parent cb28adc commit 082fd6b

33 files changed

+1496
-614
lines changed

src/backend/cdb/cdbpath.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ static bool try_redistribute(PlannerInfo *root, CdbpathMfjRel *g,
7575

7676
static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti);
7777

78+
static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists);
79+
7880
static bool can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, GpPolicy *policy);
7981
/*
8082
* cdbpath_cost_motion
@@ -2774,6 +2776,82 @@ create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *policy, Path *s
27742776
return subpath;
27752777
}
27762778

2779+
2780+
Path *
2781+
create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy *policy, List *mergeActionLists, Path *subpath)
2782+
{
2783+
GpPolicyType policyType = policy->ptype;
2784+
CdbPathLocus targetLocus;
2785+
RelOptInfo *rel;
2786+
ListCell *lc, *l;
2787+
bool need_split_merge = false;
2788+
2789+
if (policyType == POLICYTYPE_PARTITIONED)
2790+
{
2791+
/*
2792+
* If merge contain CMD_INSERT, we need split merge to let new
2793+
* insert tuple redistributed to correct segment. otherwise, we
2794+
* create motion as the same as update/delete in create_motion_path_for_upddel
2795+
*/
2796+
foreach(l, mergeActionLists)
2797+
{
2798+
List *mergeActionList = lfirst(l);
2799+
foreach(lc, mergeActionList)
2800+
{
2801+
MergeAction *action = lfirst(lc);
2802+
if (action->commandType == CMD_INSERT)
2803+
need_split_merge = true;
2804+
}
2805+
}
2806+
2807+
if (need_split_merge)
2808+
{
2809+
if (root->simple_rel_array[linitial_int(resultRelations)])
2810+
rel = root->simple_rel_array[linitial_int(resultRelations)];
2811+
else
2812+
rel = build_simple_rel(root, linitial_int(resultRelations), NULL /*parent*/);
2813+
targetLocus = cdbpathlocus_from_baserel(root, rel, 0);
2814+
2815+
subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists);
2816+
subpath = cdbpath_create_explicit_motion_path(root,
2817+
subpath,
2818+
targetLocus);
2819+
}
2820+
else
2821+
{
2822+
2823+
if (can_elide_explicit_motion(root, linitial_int(resultRelations), subpath, policy))
2824+
return subpath;
2825+
else
2826+
{
2827+
CdbPathLocus_MakeStrewn(&targetLocus, policy->numsegments, 0);
2828+
subpath = cdbpath_create_explicit_motion_path(root,
2829+
subpath,
2830+
targetLocus);
2831+
}
2832+
}
2833+
}
2834+
else if (policyType == POLICYTYPE_ENTRY)
2835+
{
2836+
/* Master-only table */
2837+
CdbPathLocus_MakeEntry(&targetLocus);
2838+
subpath = cdbpath_create_motion_path(root, subpath, NIL, false, targetLocus);
2839+
}
2840+
else if (policyType == POLICYTYPE_REPLICATED)
2841+
{
2842+
/*
2843+
* The statement that insert/update/delete on replicated table has to
2844+
* be dispatched to each segment and executed on each segment. Thus
2845+
* the targetlist cannot contain volatile functions.
2846+
*/
2847+
if (contain_volatile_functions((Node *) (subpath->pathtarget->exprs)))
2848+
elog(ERROR, "could not devise a plan.");
2849+
}
2850+
else
2851+
elog(ERROR, "unrecognized policy type %u", policyType);
2852+
return subpath;
2853+
}
2854+
27772855
/*
27782856
* turn_volatile_seggen_to_singleqe
27792857
*
@@ -2836,6 +2914,35 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
28362914
return path;
28372915
}
28382916

2917+
static SplitMergePath *
2918+
make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists)
2919+
{
2920+
PathTarget *splitMergePathTarget;
2921+
SplitMergePath *splitmergepath;
2922+
2923+
splitMergePathTarget = copy_pathtarget(subpath->pathtarget);
2924+
2925+
/* populate information generated above into splitupdate node */
2926+
splitmergepath = makeNode(SplitMergePath);
2927+
splitmergepath->path.pathtype = T_SplitMerge;
2928+
splitmergepath->path.parent = subpath->parent;
2929+
splitmergepath->path.pathtarget = splitMergePathTarget;
2930+
splitmergepath->path.param_info = NULL;
2931+
splitmergepath->path.parallel_aware = false;
2932+
splitmergepath->path.parallel_safe = subpath->parallel_safe;
2933+
splitmergepath->path.parallel_workers = subpath->parallel_workers;
2934+
splitmergepath->path.rows = 2 * subpath->rows;
2935+
splitmergepath->path.startup_cost = subpath->startup_cost;
2936+
splitmergepath->path.total_cost = subpath->total_cost;
2937+
splitmergepath->path.pathkeys = subpath->pathkeys;
2938+
splitmergepath->path.locus = subpath->locus;
2939+
splitmergepath->subpath = subpath;
2940+
splitmergepath->resultRelations = resultRelations;
2941+
splitmergepath->mergeActionLists = mergeActionLists;
2942+
2943+
return splitmergepath;
2944+
}
2945+
28392946
static SplitUpdatePath *
28402947
make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti)
28412948
{

src/backend/cdb/cdbplan.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,17 @@ plan_tree_mutator(Node *node,
970970
}
971971
break;
972972

973+
case T_SplitMerge:
974+
{
975+
SplitMerge *splitMerge = (SplitMerge *) node;
976+
SplitMerge *newSplitMerge;
977+
978+
FLATCOPY(newSplitMerge, splitMerge, SplitMerge);
979+
PLANMUTATE(newSplitMerge, splitMerge);
980+
return (Node *) newSplitMerge;
981+
}
982+
break;
983+
973984
case T_IncrementalSort:
974985
{
975986
IncrementalSort *incrementalSort = (IncrementalSort *) node;

src/backend/cdb/cdbtargeteddispatch.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ DirectDispatchUpdateContentIdsFromPlan(PlannerInfo *root, Plan *plan)
532532
* so disable */
533533
break;
534534
case T_SplitUpdate:
535+
case T_SplitMerge:
535536
break;
536537
case T_CustomScan:
537538
break;

src/backend/commands/explain.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1961,7 +1961,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
19611961
}
19621962
break;
19631963
case T_SplitUpdate:
1964-
pname = sname = "Split";
1964+
pname = sname = "Split Update";
1965+
break;
1966+
case T_SplitMerge:
1967+
pname = sname = "Split Merge";
19651968
break;
19661969
case T_AssertOp:
19671970
pname = sname = "Assert";

src/backend/commands/prepare.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ PrepareQuery(ParseState *pstate, PrepareStmt *stmt,
153153
case CMD_DELETE:
154154
srctag = T_DeleteStmt;
155155
break;
156+
case CMD_MERGE:
157+
srctag = T_MergeStmt;
158+
break;
156159
default:
157160
ereport(ERROR,
158161
(errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION),

src/backend/executor/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ OBJS += nodeMotion.o \
8888
nodeSequence.o \
8989
nodeAssertOp.o \
9090
nodeSplitUpdate.o \
91+
nodeSplitMerge.o \
9192
nodeTupleSplit.o \
9293
nodePartitionSelector.o
9394

src/backend/executor/execMain.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
10771077
*/
10781078
if (IS_QD_OR_SINGLENODE() &&
10791079
(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE ||
1080-
queryDesc->plannedstmt->hasModifyingCTE) &&
1080+
operation == CMD_MERGE || queryDesc->plannedstmt->hasModifyingCTE) &&
10811081
((es_processed > 0 || estate->es_processed > 0) || !queryDesc->plannedstmt->canSetTag))
10821082
{
10831083
MaintainMaterializedViewStatus(queryDesc, operation);

src/backend/executor/execProcnode.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
#include "executor/nodeSequence.h"
138138
#include "executor/nodeShareInputScan.h"
139139
#include "executor/nodeSplitUpdate.h"
140+
#include "executor/nodeSplitMerge.h"
140141
#include "executor/nodeTableFunction.h"
141142
#include "pg_trace.h"
142143
#include "tcop/tcopprot.h"
@@ -512,6 +513,10 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
512513
result = (PlanState *) ExecInitSplitUpdate((SplitUpdate *) node,
513514
estate, eflags);
514515
break;
516+
case T_SplitMerge:
517+
result = (PlanState *) ExecInitSplitMerge((SplitMerge *) node,
518+
estate, eflags);
519+
break;
515520
case T_AssertOp:
516521
result = (PlanState *) ExecInitAssertOp((AssertOp *) node,
517522
estate, eflags);
@@ -1055,6 +1060,9 @@ ExecEndNode(PlanState *node)
10551060
case T_SplitUpdateState:
10561061
ExecEndSplitUpdate((SplitUpdateState *) node);
10571062
break;
1063+
case T_SplitMergeState:
1064+
ExecEndSplitMerge((SplitMergeState *) node);
1065+
break;
10581066
case T_AssertOpState:
10591067
ExecEndAssertOp((AssertOpState *) node);
10601068
break;

src/backend/executor/nodeModifyTable.c

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,7 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
15111511
ModifyTableState *mtstate = context->mtstate;
15121512
EState *estate = context->estate;
15131513
TransitionCaptureState *ar_delete_trig_tcs;
1514+
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
15141515

15151516
/*
15161517
* If this delete is the result of a partition key update that moved the
@@ -1539,6 +1540,14 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
15391540
if (!RelationIsNonblockRelation(resultRelInfo->ri_RelationDesc) && !splitUpdate)
15401541
ExecARDeleteTriggers(estate, resultRelInfo, tupleid, oldtuple,
15411542
ar_delete_trig_tcs, changingPart);
1543+
1544+
if (resultRelationDesc->rd_rel->relispartition)
1545+
{
1546+
1547+
context->mtstate->mt_leaf_relids_deleted =
1548+
bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc));
1549+
context->mtstate->has_leaf_changed = true;
1550+
}
15421551
}
15431552

15441553
/* ----------------------------------------------------------------
@@ -1872,14 +1881,6 @@ ExecDelete(ModifyTableContext *context,
18721881
if (canSetTag)
18731882
(estate->es_processed)++;
18741883

1875-
if (resultRelationDesc->rd_rel->relispartition)
1876-
{
1877-
1878-
context->mtstate->mt_leaf_relids_deleted =
1879-
bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc));
1880-
context->mtstate->has_leaf_changed = true;
1881-
}
1882-
18831884
/* Tell caller that the delete actually happened. */
18841885
if (tupleDeleted)
18851886
*tupleDeleted = true;
@@ -2354,6 +2355,7 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt,
23542355
{
23552356
ModifyTableState *mtstate = context->mtstate;
23562357
List *recheckIndexes = NIL;
2358+
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
23572359

23582360
/* insert index entries for tuple if necessary */
23592361
if (resultRelInfo->ri_NumIndices > 0 && (updateCxt->updateIndexes != TU_None))
@@ -2388,6 +2390,13 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt,
23882390
if (resultRelInfo->ri_WithCheckOptions != NIL)
23892391
ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo,
23902392
slot, context->estate);
2393+
2394+
if (resultRelationDesc->rd_rel->relispartition)
2395+
{
2396+
context->mtstate->mt_leaf_relids_updated =
2397+
bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc));
2398+
context->mtstate->has_leaf_changed = true;
2399+
}
23912400
}
23922401

23932402
/*
@@ -2726,13 +2735,6 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
27262735
if (canSetTag)
27272736
(estate->es_processed)++;
27282737

2729-
if (resultRelationDesc->rd_rel->relispartition)
2730-
{
2731-
context->mtstate->mt_leaf_relids_updated =
2732-
bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc));
2733-
context->mtstate->has_leaf_changed = true;
2734-
}
2735-
27362738
ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, tupleid, oldtuple,
27372739
slot);
27382740

0 commit comments

Comments
 (0)