Skip to content

Commit 8adfcfa

Browse files
authored
refactor(query): add left join for experimental new hash join (#18814)
* refactor(query): refactor code struct * refactor(query): refactor code struct * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): enable experimental new hash join setting * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join * refactor(query): refactor left outer join to new join
1 parent 654d822 commit 8adfcfa

File tree

30 files changed

+2209
-1110
lines changed

30 files changed

+2209
-1110
lines changed

src/common/base/src/hints/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,17 @@
1414

1515
#[inline]
1616
pub fn assume(condition: bool) {
17-
if !condition {
18-
unsafe { std::hint::unreachable_unchecked() }
17+
#[cfg(debug_assertions)]
18+
{
19+
if !condition {
20+
panic!("assume condition must be true");
21+
}
22+
}
23+
24+
#[cfg(not(debug_assertions))]
25+
{
26+
if !condition {
27+
unsafe { std::hint::unreachable_unchecked() }
28+
}
1929
}
2030
}

src/common/hashtable/src/hashjoin_hashtable.rs

Lines changed: 38 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::marker::PhantomData;
1717
use std::sync::atomic::AtomicU64;
1818
use std::sync::atomic::Ordering;
1919

20+
use databend_common_base::hints::assume;
2021
use databend_common_base::mem_allocator::DefaultAllocator;
2122
use databend_common_column::bitmap::Bitmap;
2223

@@ -215,25 +216,19 @@ where
215216
&self,
216217
hashes: &mut [u64],
217218
bitmap: Option<Bitmap>,
218-
matched_selection: &mut [u32],
219-
unmatched_selection: &mut [u32],
219+
matched_selection: &mut Vec<u32>,
220+
unmatched_selection: &mut Vec<u32>,
220221
) -> (usize, usize) {
221222
let mut valids = None;
222223
if let Some(bitmap) = bitmap {
223224
if bitmap.null_count() == bitmap.len() {
224-
unmatched_selection
225-
.iter_mut()
226-
.enumerate()
227-
.for_each(|(idx, val)| {
228-
*val = idx as u32;
229-
});
225+
unmatched_selection.extend(0..bitmap.null_count() as u32);
230226
return (0, hashes.len());
231227
} else if bitmap.null_count() > 0 {
232228
valids = Some(bitmap);
233229
}
234230
}
235-
let mut matched_idx = 0;
236-
let mut unmatched_idx = 0;
231+
237232
match valids {
238233
Some(valids) => {
239234
valids.iter().zip(hashes.iter_mut().enumerate()).for_each(
@@ -242,22 +237,15 @@ where
242237
let header = self.pointers[(*hash >> self.hash_shift) as usize];
243238
if header != 0 && early_filtering(header, *hash) {
244239
*hash = remove_header_tag(header);
245-
unsafe {
246-
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
247-
};
248-
matched_idx += 1;
240+
assume(matched_selection.len() < matched_selection.capacity());
241+
matched_selection.push(idx as u32);
249242
} else {
250-
unsafe {
251-
*unmatched_selection.get_unchecked_mut(unmatched_idx) =
252-
idx as u32
253-
};
254-
unmatched_idx += 1;
243+
assume(unmatched_selection.len() < unmatched_selection.capacity());
244+
unmatched_selection.push(idx as u32);
255245
}
256246
} else {
257-
unsafe {
258-
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
259-
};
260-
unmatched_idx += 1;
247+
assume(unmatched_selection.len() < unmatched_selection.capacity());
248+
unmatched_selection.push(idx as u32);
261249
}
262250
},
263251
);
@@ -267,72 +255,60 @@ where
267255
let header = self.pointers[(*hash >> self.hash_shift) as usize];
268256
if header != 0 && early_filtering(header, *hash) {
269257
*hash = remove_header_tag(header);
270-
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
271-
matched_idx += 1;
258+
assume(matched_selection.len() < matched_selection.capacity());
259+
matched_selection.push(idx as u32);
272260
} else {
273-
unsafe {
274-
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
275-
};
276-
unmatched_idx += 1;
261+
assume(unmatched_selection.len() < unmatched_selection.capacity());
262+
unmatched_selection.push(idx as u32);
277263
}
278264
});
279265
}
280266
}
281-
(matched_idx, unmatched_idx)
267+
(matched_selection.len(), unmatched_selection.len())
282268
}
283269

284270
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
285271
fn early_filtering_matched_probe(
286272
&self,
287273
hashes: &mut [u64],
288274
bitmap: Option<Bitmap>,
289-
selection: &mut [u32],
275+
selection: &mut Vec<u32>,
290276
) -> usize {
291277
let mut valids = None;
278+
292279
if let Some(bitmap) = bitmap {
293280
if bitmap.null_count() == bitmap.len() {
294-
hashes.iter_mut().for_each(|hash| {
295-
*hash = 0;
296-
});
297281
return 0;
298282
} else if bitmap.null_count() > 0 {
299283
valids = Some(bitmap);
300284
}
301285
}
302-
let mut count = 0;
303-
match valids {
304-
Some(valids) => {
305-
valids.iter().zip(hashes.iter_mut().enumerate()).for_each(
306-
|(valid, (idx, hash))| {
307-
if valid {
308-
let header = self.pointers[(*hash >> self.hash_shift) as usize];
309-
if header != 0 && early_filtering(header, *hash) {
310-
*hash = remove_header_tag(header);
311-
unsafe { *selection.get_unchecked_mut(count) = idx as u32 };
312-
count += 1;
313-
} else {
314-
*hash = 0;
315-
}
316-
} else {
317-
*hash = 0;
318-
}
319-
},
320-
);
321-
}
322-
None => {
323-
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
286+
287+
if let Some(valids) = valids {
288+
for (valid, (idx, hash)) in valids.iter().zip(hashes.iter_mut().enumerate()) {
289+
if valid {
324290
let header = self.pointers[(*hash >> self.hash_shift) as usize];
325291
if header != 0 && early_filtering(header, *hash) {
326292
*hash = remove_header_tag(header);
327-
unsafe { *selection.get_unchecked_mut(count) = idx as u32 };
328-
count += 1;
329-
} else {
330-
*hash = 0;
293+
assume(selection.len() < selection.capacity());
294+
selection.push(idx as u32);
331295
}
332-
});
296+
}
333297
}
298+
299+
return selection.len();
334300
}
335-
count
301+
302+
for (idx, hash) in hashes.iter_mut().enumerate() {
303+
let header = self.pointers[(*hash >> self.hash_shift) as usize];
304+
if header != 0 && early_filtering(header, *hash) {
305+
*hash = remove_header_tag(header);
306+
assume(selection.len() < selection.capacity());
307+
selection.push(idx as u32);
308+
}
309+
}
310+
311+
selection.len()
336312
}
337313

338314
fn next_contains(&self, key: &Self::Key, mut ptr: u64) -> bool {

src/common/hashtable/src/hashjoin_string_hashtable.rs

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::alloc::Allocator;
1616
use std::sync::atomic::AtomicU64;
1717
use std::sync::atomic::Ordering;
1818

19+
use databend_common_base::hints::assume;
1920
use databend_common_base::mem_allocator::DefaultAllocator;
2021
use databend_common_column::bitmap::Bitmap;
2122

@@ -144,47 +145,35 @@ where A: Allocator + Clone + 'static
144145
&self,
145146
hashes: &mut [u64],
146147
bitmap: Option<Bitmap>,
147-
matched_selection: &mut [u32],
148-
unmatched_selection: &mut [u32],
148+
matched_selection: &mut Vec<u32>,
149+
unmatched_selection: &mut Vec<u32>,
149150
) -> (usize, usize) {
150151
let mut valids = None;
151152
if let Some(bitmap) = bitmap {
152153
if bitmap.null_count() == bitmap.len() {
153-
unmatched_selection
154-
.iter_mut()
155-
.enumerate()
156-
.for_each(|(idx, val)| {
157-
*val = idx as u32;
158-
});
154+
unmatched_selection.extend(0..bitmap.null_count() as u32);
159155
return (0, hashes.len());
160156
} else if bitmap.null_count() > 0 {
161157
valids = Some(bitmap);
162158
}
163159
}
164-
let mut matched_idx = 0;
165-
let mut unmatched_idx = 0;
160+
166161
match valids {
167162
Some(valids) => {
168163
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
169164
if unsafe { valids.get_bit_unchecked(idx) } {
170165
let header = self.pointers[(*hash >> self.hash_shift) as usize];
171166
if header != 0 && early_filtering(header, *hash) {
172167
*hash = remove_header_tag(header);
173-
unsafe {
174-
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
175-
};
176-
matched_idx += 1;
168+
assume(matched_selection.len() < matched_selection.capacity());
169+
matched_selection.push(idx as u32);
177170
} else {
178-
unsafe {
179-
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
180-
};
181-
unmatched_idx += 1;
171+
assume(unmatched_selection.len() < unmatched_selection.capacity());
172+
unmatched_selection.push(idx as u32);
182173
}
183174
} else {
184-
unsafe {
185-
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
186-
};
187-
unmatched_idx += 1;
175+
assume(unmatched_selection.len() < unmatched_selection.capacity());
176+
unmatched_selection.push(idx as u32);
188177
}
189178
});
190179
}
@@ -193,53 +182,44 @@ where A: Allocator + Clone + 'static
193182
let header = self.pointers[(*hash >> self.hash_shift) as usize];
194183
if header != 0 && early_filtering(header, *hash) {
195184
*hash = remove_header_tag(header);
196-
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
197-
matched_idx += 1;
185+
assume(matched_selection.len() < matched_selection.capacity());
186+
matched_selection.push(idx as u32);
198187
} else {
199-
unsafe {
200-
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
201-
};
202-
unmatched_idx += 1;
188+
assume(unmatched_selection.len() < unmatched_selection.capacity());
189+
unmatched_selection.push(idx as u32);
203190
}
204191
});
205192
}
206193
}
207-
(matched_idx, unmatched_idx)
194+
(matched_selection.len(), unmatched_selection.len())
208195
}
209196

210197
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
211198
fn early_filtering_matched_probe(
212199
&self,
213200
hashes: &mut [u64],
214201
bitmap: Option<Bitmap>,
215-
selection: &mut [u32],
202+
selection: &mut Vec<u32>,
216203
) -> usize {
217204
let mut valids = None;
218205
if let Some(bitmap) = bitmap {
219206
if bitmap.null_count() == bitmap.len() {
220-
hashes.iter_mut().for_each(|hash| {
221-
*hash = 0;
222-
});
223207
return 0;
224208
} else if bitmap.null_count() > 0 {
225209
valids = Some(bitmap);
226210
}
227211
}
228-
let mut count = 0;
212+
229213
match valids {
230214
Some(valids) => {
231215
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
232216
if unsafe { valids.get_bit_unchecked(idx) } {
233217
let header = self.pointers[(*hash >> self.hash_shift) as usize];
234218
if header != 0 && early_filtering(header, *hash) {
235219
*hash = remove_header_tag(header);
236-
unsafe { *selection.get_unchecked_mut(count) = idx as u32 };
237-
count += 1;
238-
} else {
239-
*hash = 0;
220+
assume(selection.len() < selection.capacity());
221+
selection.push(idx as u32);
240222
}
241-
} else {
242-
*hash = 0;
243223
}
244224
});
245225
}
@@ -248,15 +228,13 @@ where A: Allocator + Clone + 'static
248228
let header = self.pointers[(*hash >> self.hash_shift) as usize];
249229
if header != 0 && early_filtering(header, *hash) {
250230
*hash = remove_header_tag(header);
251-
unsafe { *selection.get_unchecked_mut(count) = idx as u32 };
252-
count += 1;
253-
} else {
254-
*hash = 0;
231+
assume(selection.len() < selection.capacity());
232+
selection.push(idx as u32);
255233
}
256234
});
257235
}
258236
}
259-
count
237+
selection.len()
260238
}
261239

262240
fn next_contains(&self, key: &Self::Key, mut ptr: u64) -> bool {

src/common/hashtable/src/traits.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,16 +533,16 @@ pub trait HashJoinHashtableLike {
533533
&self,
534534
hashes: &mut [u64],
535535
valids: Option<Bitmap>,
536-
matched_selection: &mut [u32],
537-
unmatched_selection: &mut [u32],
536+
matched_selection: &mut Vec<u32>,
537+
unmatched_selection: &mut Vec<u32>,
538538
) -> (usize, usize);
539539

540540
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
541541
fn early_filtering_matched_probe(
542542
&self,
543543
hashes: &mut [u64],
544544
valids: Option<Bitmap>,
545-
selection: &mut [u32],
545+
selection: &mut Vec<u32>,
546546
) -> usize;
547547

548548
// we use `next_contains` to see whether we can find a matched row in the link.

0 commit comments

Comments
 (0)