Skip to content

Commit 90ffecd

Browse files
authored
[Exec](performance) change colocate execution parallel num (#60677)
Previously, the parallelism for colocate execution was calculated based on bucket count, which might not fully utilize available resources. This PR changes the parallelism calculation logic to: 1. Use tablet count instead of bucket count for more accurate resource utilization 2. Add a new session variable colocate_max_parallel_num to control the maximum parallelism 3. Calculate parallelism as min(max(tablet_num, fragment.getParallelExecNum()), colocate_max_parallel_num) to ensure both resource utilization and system stability
1 parent c422f2a commit 90ffecd

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,40 @@ private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet, long cat
505505
}
506506
return workers;
507507
}
508+
509+
@Override
510+
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
511+
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
512+
if (!fragment.getDataPartition().isPartitioned()) {
513+
return 1;
514+
}
515+
if (fragment.queryCacheParam != null) {
516+
return maxParallel;
517+
}
518+
if (scanNodes.size() == 1 && scanNodes.get(0) instanceof OlapScanNode) {
519+
OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0);
520+
ConnectContext connectContext = statementContext.getConnectContext();
521+
if (connectContext != null && olapScanNode.shouldUseOneInstance(connectContext)) {
522+
return 1;
523+
}
524+
}
525+
526+
long tabletNum = 0;
527+
for (ScanNode scanNode : scanNodes) {
528+
if (scanNode instanceof OlapScanNode) {
529+
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
530+
tabletNum = olapScanNode.getTotalTabletsNum();
531+
break;
532+
}
533+
}
534+
535+
ConnectContext connectContext = statementContext.getConnectContext();
536+
int colocateMaxParallelNum = 128;
537+
if (connectContext != null) {
538+
colocateMaxParallelNum = connectContext.getSessionVariable().colocateMaxParallelNum;
539+
}
540+
541+
int maxParallelism = (int) Math.max(tabletNum, fragment.getParallelExecNum());
542+
return Math.min(maxParallelism, colocateMaxParallelNum);
543+
}
508544
}

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public class SessionVariable implements Serializable, Writable {
168168
"enable_distinct_streaming_agg_force_passthrough";
169169
public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = "enable_broadcast_join_force_passthrough";
170170
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
171+
public static final String COLOCATE_MAX_PARALLEL_NUM = "colocate_max_parallel_num";
171172
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
172173
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
173174
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
@@ -1301,6 +1302,9 @@ public void checkQuerySlotCount(String slotCnt) {
13011302
setter = "setFragmentInstanceNum", varType = VariableAnnotation.DEPRECATED)
13021303
public int parallelExecInstanceNum = 8;
13031304

1305+
@VariableMgr.VarAttr(name = COLOCATE_MAX_PARALLEL_NUM, needForward = true, fuzzy = false)
1306+
public int colocateMaxParallelNum = 128;
1307+
13041308
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true,
13051309
setter = "setPipelineTaskNum")
13061310
public int parallelPipelineTaskNum = 0;

fe/fe-core/src/test/java/org/apache/doris/qe/LocalShuffleWithBucketJoinTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void test() throws Exception {
6767
PipelineDistributedPlan distributedPlan
6868
= (PipelineDistributedPlan) distributedPlans.get(0);
6969
List<AssignedJob> instances = distributedPlan.getInstanceJobs();
70-
Assertions.assertEquals(beNum * parallelPipelineTaskNum, instances.size());
70+
Assertions.assertEquals(beNum * bucketNum, instances.size());
7171
Assertions.assertTrue(instances.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance));
7272

7373
long assignedBucketInstanceNum = instances.stream().map(LocalShuffleBucketJoinAssignedJob.class::cast)
@@ -80,7 +80,7 @@ public void test() throws Exception {
8080
workerToInstances.put(instance.getAssignedWorker(), instance);
8181
}
8282
for (Collection<AssignedJob> instancePerBe : workerToInstances.asMap().values()) {
83-
Assertions.assertEquals(parallelPipelineTaskNum, instancePerBe.size());
83+
Assertions.assertEquals(bucketNum, instancePerBe.size());
8484
}
8585
}
8686
}

0 commit comments

Comments
 (0)