Skip to content

Commit 0e50d3c

Browse files
committed
[Exec](performance) change colocate execution parallel num
1 parent 961f70e commit 0e50d3c

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,40 @@ private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet, long cat
505505
}
506506
return workers;
507507
}
508+
509+
@Override
510+
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
511+
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
512+
if (!fragment.getDataPartition().isPartitioned()) {
513+
return 1;
514+
}
515+
if (fragment.queryCacheParam != null) {
516+
return maxParallel;
517+
}
518+
if (scanNodes.size() == 1 && scanNodes.get(0) instanceof OlapScanNode) {
519+
OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0);
520+
ConnectContext connectContext = statementContext.getConnectContext();
521+
if (connectContext != null && olapScanNode.shouldUseOneInstance(connectContext)) {
522+
return 1;
523+
}
524+
}
525+
526+
long tabletNum = 0;
527+
for (ScanNode scanNode : scanNodes) {
528+
if (scanNode instanceof OlapScanNode) {
529+
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
530+
tabletNum = olapScanNode.getTotalTabletsNum();
531+
break;
532+
}
533+
}
534+
535+
ConnectContext connectContext = statementContext.getConnectContext();
536+
int colocateMaxParallelNum = 128;
537+
if (connectContext != null) {
538+
colocateMaxParallelNum = connectContext.getSessionVariable().colocateMaxParallelNum;
539+
}
540+
541+
int maxParallelism = (int) Math.max(tabletNum, fragment.getParallelExecNum());
542+
return Math.min(maxParallelism, colocateMaxParallelNum);
543+
}
508544
}

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public class SessionVariable implements Serializable, Writable {
163163
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
164164
public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = "enable_distinct_streaming_aggregation";
165165
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
166+
public static final String COLOCATE_MAX_PARALLEL_NUM = "colocate_max_parallel_num";
166167
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
167168
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
168169
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
@@ -1287,6 +1288,9 @@ public void checkQuerySlotCount(String slotCnt) {
12871288
setter = "setFragmentInstanceNum", varType = VariableAnnotation.DEPRECATED)
12881289
public int parallelExecInstanceNum = 8;
12891290

1291+
@VariableMgr.VarAttr(name = COLOCATE_MAX_PARALLEL_NUM, needForward = true, fuzzy = false)
1292+
public int colocateMaxParallelNum = 128;
1293+
12901294
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true,
12911295
setter = "setPipelineTaskNum")
12921296
public int parallelPipelineTaskNum = 0;

fe/fe-core/src/test/java/org/apache/doris/qe/LocalShuffleWithBucketJoinTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void test() throws Exception {
6767
PipelineDistributedPlan distributedPlan
6868
= (PipelineDistributedPlan) distributedPlans.get(0);
6969
List<AssignedJob> instances = distributedPlan.getInstanceJobs();
70-
Assertions.assertEquals(beNum * parallelPipelineTaskNum, instances.size());
70+
Assertions.assertEquals(beNum * bucketNum, instances.size());
7171
Assertions.assertTrue(instances.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance));
7272

7373
long assignedBucketInstanceNum = instances.stream().map(LocalShuffleBucketJoinAssignedJob.class::cast)
@@ -80,7 +80,7 @@ public void test() throws Exception {
8080
workerToInstances.put(instance.getAssignedWorker(), instance);
8181
}
8282
for (Collection<AssignedJob> instancePerBe : workerToInstances.asMap().values()) {
83-
Assertions.assertEquals(parallelPipelineTaskNum, instancePerBe.size());
83+
Assertions.assertEquals(bucketNum, instancePerBe.size());
8484
}
8585
}
8686
}

0 commit comments

Comments
 (0)