Skip to content

Commit 2a6ca57

Browse files
committed
change colocate execution parallel num
1 parent ff0972f commit 2a6ca57

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,40 @@ private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet, long cat
505505
}
506506
return workers;
507507
}
508+
509+
@Override
510+
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
511+
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
512+
if (!fragment.getDataPartition().isPartitioned()) {
513+
return 1;
514+
}
515+
if (fragment.queryCacheParam != null) {
516+
return maxParallel;
517+
}
518+
if (scanNodes.size() == 1 && scanNodes.get(0) instanceof OlapScanNode) {
519+
OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0);
520+
ConnectContext connectContext = statementContext.getConnectContext();
521+
if (connectContext != null && olapScanNode.shouldUseOneInstance(connectContext)) {
522+
return 1;
523+
}
524+
}
525+
526+
long tabletNum = 0;
527+
for (ScanNode scanNode : scanNodes) {
528+
if (scanNode instanceof OlapScanNode) {
529+
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
530+
tabletNum = olapScanNode.getTotalTabletsNum();
531+
break;
532+
}
533+
}
534+
535+
ConnectContext connectContext = statementContext.getConnectContext();
536+
int colocateMaxParallelNum = 128;
537+
if (connectContext != null) {
538+
colocateMaxParallelNum = connectContext.getSessionVariable().colocateMaxParallelNum;
539+
}
540+
541+
int maxParallelism = (int) Math.max(tabletNum, fragment.getParallelExecNum());
542+
return Math.min(maxParallelism, colocateMaxParallelNum);
543+
}
508544
}

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public class SessionVariable implements Serializable, Writable {
168168
"enable_distinct_streaming_agg_force_passthrough";
169169
public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = "enable_broadcast_join_force_passthrough";
170170
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
171+
public static final String COLOCATE_MAX_PARALLEL_NUM = "colocate_max_parallel_num";
171172
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
172173
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
173174
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
@@ -1291,6 +1292,9 @@ public void checkQuerySlotCount(String slotCnt) {
12911292
setter = "setFragmentInstanceNum", varType = VariableAnnotation.DEPRECATED)
12921293
public int parallelExecInstanceNum = 8;
12931294

1295+
@VariableMgr.VarAttr(name = COLOCATE_MAX_PARALLEL_NUM, needForward = true, fuzzy = false)
1296+
public int colocateMaxParallelNum = 128;
1297+
12941298
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true,
12951299
setter = "setPipelineTaskNum")
12961300
public int parallelPipelineTaskNum = 0;

0 commit comments

Comments
 (0)