Skip to content

Commit 2f8f463

Browse files
authored
fix: use full join keys for hash shuffle to avoid skew (#19198)
* Use full join keys for hash shuffle * fix: test
1 parent 91da76f commit 2f8f463

File tree

4 files changed

+52
-49
lines changed

4 files changed

+52
-49
lines changed

src/query/sql/src/planner/plans/join.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -793,17 +793,28 @@ impl Operator for Join {
793793

794794
let settings = ctx.get_settings();
795795
if !matches!(self.join_type, JoinType::Cross) && !settings.get_enforce_broadcast_join()? {
796-
// (Hash, Hash)
797-
children_required.extend(self.equi_conditions.iter().map(|condition| {
798-
vec![
796+
// (Hash, Hash) – use full equi-join key set to avoid single-column hash shuffle
797+
let left_keys: Vec<_> = self
798+
.equi_conditions
799+
.iter()
800+
.map(|condition| condition.left.clone())
801+
.collect();
802+
let right_keys: Vec<_> = self
803+
.equi_conditions
804+
.iter()
805+
.map(|condition| condition.right.clone())
806+
.collect();
807+
808+
if !left_keys.is_empty() {
809+
children_required.push(vec![
799810
RequiredProperty {
800-
distribution: Distribution::NodeToNodeHash(vec![condition.left.clone()]),
811+
distribution: Distribution::NodeToNodeHash(left_keys),
801812
},
802813
RequiredProperty {
803-
distribution: Distribution::NodeToNodeHash(vec![condition.right.clone()]),
814+
distribution: Distribution::NodeToNodeHash(right_keys),
804815
},
805-
]
806-
}));
816+
]);
817+
}
807818
}
808819

809820
if !matches!(

tests/sqllogictests/suites/mode/cluster/memo/aggregate_property.test

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ where t_10.a = t_1000.a and t_100.a = t_1000.a
2626
----
2727
Memo
2828
├── root group: #8
29-
├── estimated memory: 10.69 KiB
29+
├── estimated memory: 10.09 KiB
3030
├── Group #0
3131
│ ├── Best properties
3232
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
33-
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
33+
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
3434
│ ├── #0 Scan []
35-
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
35+
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
3636
├── Group #1
3737
│ ├── Best properties
3838
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
@@ -51,12 +51,10 @@ Memo
5151
│ ├── Best properties
5252
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
5353
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
54-
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
55-
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
54+
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: 1820.000, children: [{ dist: Any }]
5655
│ ├── #0 Join [#1, #2]
5756
│ ├── #1 Exchange: (Broadcast) [#3]
58-
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
59-
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
57+
│ └── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
6058
├── Group #4
6159
│ ├── Best properties
6260
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
@@ -89,13 +87,13 @@ group by t_10.a, t_100.a
8987
----
9088
Memo
9189
├── root group: #8
92-
├── estimated memory: 26.72 KiB
90+
├── estimated memory: 26.12 KiB
9391
├── Group #0
9492
│ ├── Best properties
9593
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
96-
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
94+
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
9795
│ ├── #0 Scan []
98-
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
96+
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
9997
├── Group #1
10098
│ ├── Best properties
10199
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
@@ -114,12 +112,12 @@ Memo
114112
│ ├── Best properties
115113
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
116114
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
117-
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
118-
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
115+
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
116+
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
119117
│ ├── #0 Join [#1, #2]
120118
│ ├── #1 Exchange: (Broadcast) [#3]
121-
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
122-
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
119+
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
120+
│ └── #3 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
123121
├── Group #4
124122
│ ├── Best properties
125123
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
@@ -160,9 +158,9 @@ Memo
160158
├── Group #11
161159
│ ├── Best properties
162160
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
163-
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
161+
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
164162
│ ├── #0 Aggregate [#10]
165-
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#11]
163+
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#11]
166164
├── Group #12
167165
│ ├── Best properties
168166
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
@@ -184,13 +182,11 @@ Memo
184182
├── Group #16
185183
│ ├── Best properties
186184
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
187-
│ │ ├── { dist: Broadcast }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
188-
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
189-
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
185+
│ │ ├── { dist: Broadcast }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
186+
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
190187
│ ├── #0 Aggregate [#15]
191-
│ ├── #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#16]
192-
│ ├── #2 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#16]
193-
│ └── #3 Exchange: (Broadcast) [#16]
188+
│ ├── #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#16]
189+
│ └── #2 Exchange: (Broadcast) [#16]
194190
├── Group #17
195191
│ ├── Best properties
196192
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]

tests/sqllogictests/suites/mode/cluster/memo/join_property.test

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ select * from t_10, t_100, t_1000 where t_10.a = t_1000.a and t_100.a = t_1000.a
2525
----
2626
Memo
2727
├── root group: #5
28-
├── estimated memory: 8.31 KiB
28+
├── estimated memory: 7.72 KiB
2929
├── Group #0
3030
│ ├── Best properties
3131
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
32-
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
32+
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
3333
│ ├── #0 Scan []
34-
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
34+
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
3535
├── Group #1
3636
│ ├── Best properties
3737
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
@@ -50,12 +50,10 @@ Memo
5050
│ ├── Best properties
5151
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
5252
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
53-
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
54-
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
53+
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
5554
│ ├── #0 Join [#1, #2]
5655
│ ├── #1 Exchange: (Broadcast) [#3]
57-
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
58-
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
56+
│ └── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
5957
├── Group #4
6058
│ ├── Best properties
6159
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]

tests/sqllogictests/suites/mode/cluster/memo/mix_property.test

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ limit 10
2929
----
3030
Memo
3131
├── root group: #10
32-
├── estimated memory: 27.91 KiB
32+
├── estimated memory: 27.31 KiB
3333
├── Group #0
3434
│ ├── Best properties
3535
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
36-
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
36+
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL, t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
3737
│ ├── #0 Scan []
38-
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL)) [#0]
38+
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL,t_1000.a (#0)::Int32 NULL)) [#0]
3939
├── Group #1
4040
│ ├── Best properties
4141
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
@@ -54,11 +54,11 @@ Memo
5454
│ ├── Best properties
5555
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
5656
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
57-
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
57+
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
5858
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#2)::Int32 NULL) }]
5959
│ ├── #0 Join [#1, #2]
6060
│ ├── #1 Exchange: (Broadcast) [#3]
61-
│ ├── #2 Exchange: (Hash(t_10.a (#2)::Int32 NULL)) [#3]
61+
│ ├── #2 Exchange: (Hash(t_10.a (#2)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
6262
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
6363
├── Group #4
6464
│ ├── Best properties
@@ -108,9 +108,9 @@ Memo
108108
├── Group #13
109109
│ ├── Best properties
110110
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
111-
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
111+
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL, t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: 111816.000, children: [{ dist: Any }]
112112
│ ├── #0 Aggregate [#12]
113-
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL)) [#13]
113+
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL,t_1000.a (#0)::Int32 NULL)) [#13]
114114
├── Group #14
115115
│ ├── Best properties
116116
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
@@ -132,13 +132,11 @@ Memo
132132
├── Group #18
133133
│ ├── Best properties
134134
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
135-
│ │ ├── { dist: Broadcast }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
136-
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
137-
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
135+
│ │ ├── { dist: Broadcast }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
136+
│ │ └── { dist: Hash(t_10.a (#2)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
138137
│ ├── #0 Aggregate [#17]
139-
│ ├── #1 Exchange: (Hash(t_10.a (#2)::Int32 NULL)) [#18]
140-
│ ├── #2 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#18]
141-
│ └── #3 Exchange: (Broadcast) [#18]
138+
│ ├── #1 Exchange: (Hash(t_10.a (#2)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#18]
139+
│ └── #2 Exchange: (Broadcast) [#18]
142140
├── Group #19
143141
│ ├── Best properties
144142
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]

0 commit comments

Comments
 (0)