Commit 5b68de5
[refactor](nereids) New distribute planner (apache#36531)
## Proposed changes
The legacy coordinator act not only scheduler but also distribute planner. The code is so complex to understand, and hard to extend, and exist many limitations.
This pr extract and refine the computation of degree of parallel(dop) to a new DistributePlanner and resolve the limitations.
## How to use this function
This function only use for nereids + pipelinex, and current only support query statement, and non cloud mode.
Open this session variables to use this function:
```sql
set enable_nereids_distribute_planner=true; -- default is false
set enable_nereids_planner=true; -- default is true
```
## Core process and concepts
```
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────────┐ ┌───────────────┐ ┌───────────────────┐ ┌─────────────────────────┐ │
│ Translate │ │ Typed │ │ Assign │ │ Wrap │ │ │
│ ──────────► │ PlanFragment │ ──────► │ UnassignedJob │ ───────► │ StaticAssignedJob │ ─────► │ PipelineDistributedPlan │ │
│ │ │ │ │ │ │ │ │ │
│ └──────────────┘ └───────────────┘ └───────────────────┘ └─────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
│ │
│ │
└──────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────┘
│ │
│ │
┌──────────────┐ ┌─────────────────┐ ┌───────────────────┐
│ │ Distribute │ │ AdHoc │ │
│ PhysicalPlan │ ───────────► │ DistributedPlan │ ──────► │ PipelineScheduler │
│ │ │ │ │ │
└──────────────┘ └─────────────────┘ └───────────────────┘
```
DistributePlanner is a new planner to compute dop and generate instances, it consume PlanFragment and do this tasks
1. Use PlanFragment to generate `UnassignedJob`, it's a **Typed Fragment**, decided how to calculate dop and how to select the datasource, but this fragment not yet assigned some backends and datasources. These are some unassignedJobs: UnassignedScanSingleOlapTableJob, UnassignedScanBucketOlapTableJob, UnassignedShuffleJob, UnassignedQueryConstantJob. Keep UnassignedJob different can decoupling unrelated logic, and easy to extend: just and a new type of UnassignedJob.
2. Use UnassignedJob to select datasource, compute dop, and generate `AssignedJob`, means a instance, which already assigned datasource and backend. There are StaticAssignedJob and LocalShuffleAssignedJob, we will add DynamicAssignedJob when support StageScheduler and adaptive query execution
3. Wrap PlanFragment, UnassignedJob and AssignedJob to `PipelineDistributedPlan`, the coordinator will consume the DistributedPlan and translate to TPlan and schedule instances
## Resolve limitations
**1. left table shuffle to right table**
if right table has distribution which distribute by `storage hash`, and left table has distribution which distribute by `compute hash`, we can shuffle left to right by `storage hash` to do shuffle bucket join, and keep right side not move.
```sql
select *
from
(
select id2
from test_shuffle_left
group by id2
) a
inner join [shuffle]
test_shuffle_left b
on a.id2=b.id;
| PhysicalResultSink[288] ( outputExprs=[id2#1, id#2, id2#3] ) ...
| +--PhysicalHashJoin[285]@4 ( type=INNER_JOIN, stats=3, hashCondition=[(id2#1 = id#2)], otherCondition=[], markCondition=[], hint=[shuffle] ) ...
| |--PhysicalDistribute[281]@2 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=STORAGE_BUCKETED, tableId=-1, selectedIndexId=-1, partitionIds=...
| | +--PhysicalHashAggregate[278]@2 ( aggPhase=GLOBAL, aggMode=BUFFER_TO_RESULT, maybeUseStreaming=false, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], requir...
| | +--PhysicalDistribute[275]@7 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=EXECUTION_BUCKETED, tableId=-1, selectedIndexId=-1, parti...
| | +--PhysicalHashAggregate[272]@7 ( aggPhase=LOCAL, aggMode=INPUT_TO_BUFFER, maybeUseStreaming=true, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], req...
| | +--PhysicalProject[269]@1 ( stats=3, projects=[id2#1] ) ...
| | +--PhysicalOlapScan[test_shuffle_left]@0 ( stats=3 ) ...
| +--PhysicalOlapScan[test_shuffle_left]@3 ( stats=3 )
```
**2. support colocate union numbers function**
support use one instance to union/join numbers, note this plan no any PhysicalDistribute plan:
```sql
explain physical plan
select * from numbers('number'='3')a
union all
select * from numbers('number'='4')b
PhysicalResultSink[98] ( outputExprs=[number#2] )
+--PhysicalUnion@ ( qualifier=ALL, outputs=[number#2], regularChildrenOutputs=[[number#0], [number#1]], constantExprsList=[], stats=7 )
|--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#0], function=numbers('number' = '3') )
+--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#1], function=numbers('number' = '4') )
```
**3. support bucket prune with right outer bucket shuffle join**
left table prune some buckets, say [bucket 1, bucket 3]
we should process the right bucket shuffle join like this
```
[
(left bucket 1) right outer join (exchange right table which should process by bucket 1),
(empty bucket) right outer join (exchange right table which should process by bucket 2),
(left bucket 3) right outer join (exchange right table which should process by bucket 3)
]
```
the left bucket 2 is pruned, so right table can not shuffle to left, because the left instance not exists, so bucket 2 will return empty rows and wrong.
new DistributePlanner can fill up this instance.
the case:
```sql
explain physical plan
SELECT * FROM
(select * from test_outer_join1 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_outer_join2)b
ON a.c0 = b.c0
```
### New feature
add an explain statement to show distributed plans
```sql
explain distributed plan select ...
```
for example, you can use this function to check how many instances generated, how many bytes the instance will scan, which backend will process the instance:
```sql
MySQL [email protected]:test> explain distributed plan select * from test_shuffle_left2 a join [shuffle] test_shuffle_left2 b on a.id2=b.id;
Explain String(Nereids Planner)
-------------------------------------------------------------------------------------------------------
PipelineDistributedPlan(
id: 0,
parallel: 2,
fragmentJob: UnassignedScanSingleOlapTableJob,
fragment: {
OUTPUT EXPRS:
id[#8]
id2[#9]
id[#10]
id2[#11]
PARTITION: HASH_PARTITIONED: id2[#3]
HAS_COLO_PLAN_NODE: false
VRESULT SINK
MYSQL_PROTOCAL
3:VHASH JOIN(152)
| join op: INNER JOIN(PARTITIONED)[]
| equal join conjunct: (id2[#3] = id[#0])
| cardinality=3
| vec output tuple id: 3
| output tuple id: 3
| vIntermediate tuple ids: 2
| hash output slot ids: 0 1 2 3
| isMarkJoin: false
| final projections: id[#4], id2[#5], id[#6], id2[#7]
| final project output tuple id: 3
| distribute expr lists: id2[#3]
| distribute expr lists: id[#0]
| tuple ids: 1 0
|
|----0:VOlapScanNode(149)
| TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
| partitions=1/1 (test_shuffle_left2)
| tablets=10/10, tabletList=22038,22040,22042 ...
| cardinality=3, avgRowSize=0.0, numNodes=1
| pushAggOp=NONE
| tuple ids: 0
|
2:VEXCHANGE
offset: 0
distribute expr lists: id[#2]
tuple ids: 1
},
instanceJobs: [
LocalShuffleAssignedJob(
index: 0,
worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
shareScanIndex: 0,
scanSource: [
{
scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
scanRanges: ScanRanges(bytes: 400, ranges: [
tablet 22038, bytes: 0,
tablet 22042, bytes: 0,
tablet 22046, bytes: 0,
tablet 22050, bytes: 400,
tablet 22054, bytes: 0
])
}
]
),
LocalShuffleAssignedJob(
index: 1,
worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
shareScanIndex: 1,
scanSource: [
{
scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
scanRanges: ScanRanges(bytes: 796, ranges: [
tablet 22040, bytes: 397,
tablet 22044, bytes: 0,
tablet 22048, bytes: 399,
tablet 22052, bytes: 0,
tablet 22056, bytes: 0
])
}
]
)
]
)
PipelineDistributedPlan(
id: 1,
parallel: 2,
fragmentJob: UnassignedScanSingleOlapTableJob,
fragment: {
PARTITION: HASH_PARTITIONED: id[#2]
HAS_COLO_PLAN_NODE: false
STREAM DATA SINK
EXCHANGE ID: 02
HASH_PARTITIONED: id2[#3]
1:VOlapScanNode(145)
TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON
partitions=1/1 (test_shuffle_left2)
tablets=10/10, tabletList=22038,22040,22042 ...
cardinality=3, avgRowSize=0.0, numNodes=1
pushAggOp=NONE
tuple ids: 1
},
instanceJobs: [
LocalShuffleAssignedJob(
index: 0,
worker: BackendWorker(id: 10095, address: 192.168.126.1:9050),
shareScanIndex: 0,
scanSource: [
{
scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
scanRanges: ScanRanges(bytes: 400, ranges: [
tablet 22038, bytes: 0,
tablet 22042, bytes: 0,
tablet 22046, bytes: 0,
tablet 22050, bytes: 400,
tablet 22054, bytes: 0
])
}
]
),
LocalShuffleAssignedJob(
index: 1,
worker: BackendWorker(id: 10096, address: 192.168.126.2:9051),
shareScanIndex: 1,
scanSource: [
{
scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1},
scanRanges: ScanRanges(bytes: 796, ranges: [
tablet 22040, bytes: 397,
tablet 22044, bytes: 0,
tablet 22048, bytes: 399,
tablet 22052, bytes: 0,
tablet 22056, bytes: 0
])
}
]
)
]
)
Hint log:
Used: [shuffle]_2
UnUsed:
SyntaxError:
```
## TODO
1. extract PipelineScheduler from Coordinator
2. move this framework into cascades and compute cost by dop
3. support StageScheduler, adaptive query execution and DynamicAssignedJob1 parent f04c185 commit 5b68de5
File tree
76 files changed
+4438
-77
lines changed- fe/fe-core/src
- main
- antlr4/org/apache/doris/nereids
- java/org/apache/doris
- common
- profile
- nereids
- parser
- properties
- trees
- expressions/functions/table
- plans
- commands
- distribute
- worker
- job
- util
- planner
- qe
- test/java/org/apache/doris/qe
- regression-test
- data/nereids_syntax_p0/distribute
- suites
- nereids_p0
- nereids_syntax_p0/distribute
Some content is hidden
Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
76 files changed
+4438
-77
lines changedLines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
262 | 262 | | |
263 | 263 | | |
264 | 264 | | |
| 265 | + | |
265 | 266 | | |
266 | 267 | | |
267 | 268 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
25 | 25 | | |
26 | 26 | | |
27 | 27 | | |
28 | | - | |
| 28 | + | |
29 | 29 | | |
30 | 30 | | |
31 | 31 | | |
| |||
62 | 62 | | |
63 | 63 | | |
64 | 64 | | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
65 | 70 | | |
Lines changed: 25 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
249 | 249 | | |
250 | 250 | | |
251 | 251 | | |
| 252 | + | |
| 253 | + | |
| 254 | + | |
| 255 | + | |
| 256 | + | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
| 263 | + | |
| 264 | + | |
| 265 | + | |
| 266 | + | |
| 267 | + | |
| 268 | + | |
| 269 | + | |
| 270 | + | |
| 271 | + | |
| 272 | + | |
| 273 | + | |
| 274 | + | |
| 275 | + | |
| 276 | + | |
252 | 277 | | |
Lines changed: 10 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
20 | 20 | | |
21 | 21 | | |
22 | 22 | | |
| 23 | + | |
| 24 | + | |
23 | 25 | | |
24 | 26 | | |
25 | 27 | | |
| |||
108 | 110 | | |
109 | 111 | | |
110 | 112 | | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
111 | 121 | | |
112 | 122 | | |
113 | 123 | | |
| |||
Lines changed: 13 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
54 | 54 | | |
55 | 55 | | |
56 | 56 | | |
| 57 | + | |
57 | 58 | | |
58 | 59 | | |
59 | 60 | | |
| |||
86 | 87 | | |
87 | 88 | | |
88 | 89 | | |
| 90 | + | |
89 | 91 | | |
90 | 92 | | |
91 | 93 | | |
| |||
109 | 111 | | |
110 | 112 | | |
111 | 113 | | |
| 114 | + | |
112 | 115 | | |
113 | 116 | | |
114 | 117 | | |
| |||
199 | 202 | | |
200 | 203 | | |
201 | 204 | | |
| 205 | + | |
202 | 206 | | |
203 | 207 | | |
204 | 208 | | |
| |||
315 | 319 | | |
316 | 320 | | |
317 | 321 | | |
| 322 | + | |
318 | 323 | | |
319 | 324 | | |
320 | 325 | | |
| |||
419 | 424 | | |
420 | 425 | | |
421 | 426 | | |
| 427 | + | |
| 428 | + | |
| 429 | + | |
| 430 | + | |
422 | 431 | | |
423 | 432 | | |
424 | 433 | | |
| |||
654 | 663 | | |
655 | 664 | | |
656 | 665 | | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
657 | 670 | | |
658 | 671 | | |
659 | 672 | | |
| |||
Lines changed: 54 additions & 6 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
52 | 52 | | |
53 | 53 | | |
54 | 54 | | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
55 | 58 | | |
56 | 59 | | |
57 | 60 | | |
| |||
70 | 73 | | |
71 | 74 | | |
72 | 75 | | |
| 76 | + | |
73 | 77 | | |
74 | 78 | | |
75 | 79 | | |
| |||
102 | 106 | | |
103 | 107 | | |
104 | 108 | | |
| 109 | + | |
105 | 110 | | |
106 | 111 | | |
107 | 112 | | |
| |||
130 | 135 | | |
131 | 136 | | |
132 | 137 | | |
| 138 | + | |
133 | 139 | | |
134 | 140 | | |
135 | 141 | | |
136 | 142 | | |
137 | 143 | | |
138 | 144 | | |
139 | | - | |
140 | | - | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
141 | 149 | | |
142 | | - | |
143 | | - | |
144 | 150 | | |
145 | 151 | | |
146 | 152 | | |
| |||
315 | 321 | | |
316 | 322 | | |
317 | 323 | | |
318 | | - | |
| 324 | + | |
319 | 325 | | |
320 | 326 | | |
321 | 327 | | |
| |||
360 | 366 | | |
361 | 367 | | |
362 | 368 | | |
| 369 | + | |
| 370 | + | |
| 371 | + | |
| 372 | + | |
| 373 | + | |
| 374 | + | |
| 375 | + | |
| 376 | + | |
| 377 | + | |
| 378 | + | |
| 379 | + | |
| 380 | + | |
| 381 | + | |
| 382 | + | |
| 383 | + | |
| 384 | + | |
| 385 | + | |
| 386 | + | |
| 387 | + | |
| 388 | + | |
| 389 | + | |
363 | 390 | | |
364 | 391 | | |
365 | 392 | | |
| |||
498 | 525 | | |
499 | 526 | | |
500 | 527 | | |
| 528 | + | |
| 529 | + | |
| 530 | + | |
| 531 | + | |
| 532 | + | |
| 533 | + | |
| 534 | + | |
| 535 | + | |
| 536 | + | |
| 537 | + | |
| 538 | + | |
501 | 539 | | |
502 | 540 | | |
503 | 541 | | |
| |||
510 | 548 | | |
511 | 549 | | |
512 | 550 | | |
513 | | - | |
| 551 | + | |
| 552 | + | |
| 553 | + | |
| 554 | + | |
| 555 | + | |
| 556 | + | |
| 557 | + | |
514 | 558 | | |
515 | 559 | | |
516 | 560 | | |
| |||
681 | 725 | | |
682 | 726 | | |
683 | 727 | | |
| 728 | + | |
| 729 | + | |
| 730 | + | |
| 731 | + | |
684 | 732 | | |
685 | 733 | | |
686 | 734 | | |
| |||
Lines changed: 3 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
3451 | 3451 | | |
3452 | 3452 | | |
3453 | 3453 | | |
| 3454 | + | |
| 3455 | + | |
| 3456 | + | |
3454 | 3457 | | |
3455 | 3458 | | |
3456 | 3459 | | |
| |||
Lines changed: 32 additions & 20 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
46 | 46 | | |
47 | 47 | | |
48 | 48 | | |
| 49 | + | |
49 | 50 | | |
50 | 51 | | |
51 | 52 | | |
| |||
213 | 214 | | |
214 | 215 | | |
215 | 216 | | |
216 | | - | |
| 217 | + | |
217 | 218 | | |
218 | 219 | | |
219 | 220 | | |
220 | | - | |
221 | | - | |
| 221 | + | |
| 222 | + | |
222 | 223 | | |
223 | 224 | | |
224 | 225 | | |
| |||
303 | 304 | | |
304 | 305 | | |
305 | 306 | | |
306 | | - | |
307 | | - | |
308 | | - | |
309 | | - | |
310 | | - | |
311 | | - | |
312 | | - | |
| 307 | + | |
| 308 | + | |
| 309 | + | |
| 310 | + | |
| 311 | + | |
| 312 | + | |
| 313 | + | |
| 314 | + | |
| 315 | + | |
| 316 | + | |
| 317 | + | |
| 318 | + | |
| 319 | + | |
| 320 | + | |
| 321 | + | |
| 322 | + | |
| 323 | + | |
| 324 | + | |
313 | 325 | | |
314 | 326 | | |
315 | 327 | | |
| |||
537 | 549 | | |
538 | 550 | | |
539 | 551 | | |
540 | | - | |
541 | | - | |
542 | | - | |
543 | | - | |
| 552 | + | |
| 553 | + | |
| 554 | + | |
| 555 | + | |
544 | 556 | | |
545 | 557 | | |
546 | 558 | | |
547 | | - | |
548 | | - | |
549 | | - | |
550 | | - | |
| 559 | + | |
| 560 | + | |
| 561 | + | |
| 562 | + | |
551 | 563 | | |
552 | | - | |
553 | | - | |
| 564 | + | |
| 565 | + | |
554 | 566 | | |
555 | 567 | | |
556 | 568 | | |
| |||
Lines changed: 4 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
31 | 31 | | |
32 | 32 | | |
33 | 33 | | |
| 34 | + | |
34 | 35 | | |
35 | 36 | | |
36 | 37 | | |
| |||
113 | 114 | | |
114 | 115 | | |
115 | 116 | | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
116 | 120 | | |
117 | 121 | | |
118 | 122 | | |
| |||
0 commit comments