Skip to content

Commit 6a78ba1

Browse files
authored
feat: improve runtime filter [Part 3] (#19006)
* init * fix hash * fix * replace ahash
1 parent 8b4a558 commit 6a78ba1

File tree

9 files changed

+254
-72
lines changed

9 files changed

+254
-72
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/hashtable/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod utils;
4444

4545
pub use table0::Entry as HashtableEntry;
4646
pub use traits::hash_join_fast_string_hash;
47+
pub use traits::BloomHash;
4748
pub use traits::EntryMutRefLike as HashtableEntryMutRefLike;
4849
pub use traits::EntryRefLike as HashtableEntryRefLike;
4950
pub use traits::FastHash;

src/common/hashtable/src/traits.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,25 @@ pub trait FastHash {
176176
fn fast_hash(&self) -> u64;
177177
}
178178

179+
pub trait BloomHash {
180+
fn bloom_hash(&self) -> u64;
181+
}
182+
183+
/// Compress a 128-bit value into a 64-bit hash.
184+
///
185+
/// This is the `Hash128to64` function from CityHash, a Murmur-inspired
186+
/// mixing function with good avalanche properties.
187+
#[inline(always)]
188+
fn hash128_to_64(low: u64, high: u64) -> u64 {
189+
const KMUL: u64 = 0x9ddf_ea08_eb38_2d69;
190+
let mut a = (low ^ high).wrapping_mul(KMUL);
191+
a ^= a >> 47;
192+
let mut b = (high ^ a).wrapping_mul(KMUL);
193+
b ^= b >> 47;
194+
b = b.wrapping_mul(KMUL);
195+
b
196+
}
197+
179198
macro_rules! impl_fast_hash_for_primitive_types {
180199
($t: ty) => {
181200
impl FastHash for $t {
@@ -208,6 +227,32 @@ impl_fast_hash_for_primitive_types!(i32);
208227
impl_fast_hash_for_primitive_types!(u64);
209228
impl_fast_hash_for_primitive_types!(i64);
210229

230+
macro_rules! impl_bloom_hash_for_primitive_types {
231+
($t: ty) => {
232+
impl BloomHash for $t {
233+
#[inline(always)]
234+
fn bloom_hash(&self) -> u64 {
235+
let mut hasher = *self as u64;
236+
hasher ^= hasher >> 33;
237+
hasher = hasher.wrapping_mul(0xff51afd7ed558ccd_u64);
238+
hasher ^= hasher >> 33;
239+
hasher = hasher.wrapping_mul(0xc4ceb9fe1a85ec53_u64);
240+
hasher ^= hasher >> 33;
241+
hasher
242+
}
243+
}
244+
};
245+
}
246+
247+
impl_bloom_hash_for_primitive_types!(u8);
248+
impl_bloom_hash_for_primitive_types!(i8);
249+
impl_bloom_hash_for_primitive_types!(u16);
250+
impl_bloom_hash_for_primitive_types!(i16);
251+
impl_bloom_hash_for_primitive_types!(u32);
252+
impl_bloom_hash_for_primitive_types!(i32);
253+
impl_bloom_hash_for_primitive_types!(u64);
254+
impl_bloom_hash_for_primitive_types!(i64);
255+
211256
impl FastHash for u128 {
212257
#[inline(always)]
213258
fn fast_hash(&self) -> u64 {
@@ -229,13 +274,29 @@ impl FastHash for u128 {
229274
}
230275
}
231276

277+
impl BloomHash for u128 {
278+
#[inline(always)]
279+
fn bloom_hash(&self) -> u64 {
280+
let low = *self as u64;
281+
let high = (self >> 64) as u64;
282+
hash128_to_64(low, high)
283+
}
284+
}
285+
232286
impl FastHash for i128 {
233287
#[inline(always)]
234288
fn fast_hash(&self) -> u64 {
235289
(*self as u128).fast_hash()
236290
}
237291
}
238292

293+
impl BloomHash for i128 {
294+
#[inline(always)]
295+
fn bloom_hash(&self) -> u64 {
296+
(*self as u128).bloom_hash()
297+
}
298+
}
299+
239300
impl FastHash for i256 {
240301
#[inline(always)]
241302
fn fast_hash(&self) -> u64 {
@@ -263,6 +324,20 @@ impl FastHash for i256 {
263324
}
264325
}
265326

327+
impl BloomHash for i256 {
328+
#[inline(always)]
329+
fn bloom_hash(&self) -> u64 {
330+
let mut low = 0_u64;
331+
let mut high = 0_u64;
332+
for x in self.0 {
333+
let v = x as u128;
334+
low ^= v as u64;
335+
high ^= (v >> 64) as u64;
336+
}
337+
hash128_to_64(low, high)
338+
}
339+
}
340+
266341
impl FastHash for U256 {
267342
#[inline(always)]
268343
fn fast_hash(&self) -> u64 {
@@ -290,13 +365,34 @@ impl FastHash for U256 {
290365
}
291366
}
292367

368+
impl BloomHash for U256 {
369+
#[inline(always)]
370+
fn bloom_hash(&self) -> u64 {
371+
let mut low = 0_u64;
372+
let mut high = 0_u64;
373+
for x in self.0 {
374+
let v = x;
375+
low ^= v as u64;
376+
high ^= (v >> 64) as u64;
377+
}
378+
hash128_to_64(low, high)
379+
}
380+
}
381+
293382
impl FastHash for bool {
294383
#[inline(always)]
295384
fn fast_hash(&self) -> u64 {
296385
(*self as u8).fast_hash()
297386
}
298387
}
299388

389+
impl BloomHash for bool {
390+
#[inline(always)]
391+
fn bloom_hash(&self) -> u64 {
392+
(*self as u8).bloom_hash()
393+
}
394+
}
395+
300396
impl FastHash for OrderedFloat<f32> {
301397
#[inline(always)]
302398
fn fast_hash(&self) -> u64 {
@@ -308,6 +404,17 @@ impl FastHash for OrderedFloat<f32> {
308404
}
309405
}
310406

407+
impl BloomHash for OrderedFloat<f32> {
408+
#[inline(always)]
409+
fn bloom_hash(&self) -> u64 {
410+
if self.is_nan() {
411+
f32::NAN.to_bits().bloom_hash()
412+
} else {
413+
self.to_bits().bloom_hash()
414+
}
415+
}
416+
}
417+
311418
impl FastHash for OrderedFloat<f64> {
312419
#[inline(always)]
313420
fn fast_hash(&self) -> u64 {
@@ -319,6 +426,17 @@ impl FastHash for OrderedFloat<f64> {
319426
}
320427
}
321428

429+
impl BloomHash for OrderedFloat<f64> {
430+
#[inline(always)]
431+
fn bloom_hash(&self) -> u64 {
432+
if self.is_nan() {
433+
f64::NAN.to_bits().bloom_hash()
434+
} else {
435+
self.to_bits().bloom_hash()
436+
}
437+
}
438+
}
439+
322440
// To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning.
323441
#[allow(dead_code)]
324442
const SEEDS: [u64; 4] = [1, 1949, 2009, 9527];
@@ -358,13 +476,33 @@ impl FastHash for [u8] {
358476
}
359477
}
360478

479+
impl BloomHash for [u8] {
480+
#[inline(always)]
481+
fn bloom_hash(&self) -> u64 {
482+
use std::hash::BuildHasher;
483+
use std::hash::Hasher;
484+
485+
let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
486+
let mut hasher = state.build_hasher();
487+
hasher.write(self);
488+
hasher.finish()
489+
}
490+
}
491+
361492
impl FastHash for str {
362493
#[inline(always)]
363494
fn fast_hash(&self) -> u64 {
364495
self.as_bytes().fast_hash()
365496
}
366497
}
367498

499+
impl BloomHash for str {
500+
#[inline(always)]
501+
fn bloom_hash(&self) -> u64 {
502+
self.as_bytes().bloom_hash()
503+
}
504+
}
505+
368506
// trick for unsized_hashtable
369507
impl<const N: usize> FastHash for ([u64; N], NonZeroU64) {
370508
#[inline(always)]

src/query/catalog/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ serde = { workspace = true }
3939
serde_json = { workspace = true }
4040
sha2 = { workspace = true }
4141
thrift = { workspace = true }
42-
twox-hash = { workspace = true }
4342
typetag = { workspace = true }
4443

4544
[dev-dependencies]

src/query/catalog/src/sbbf.rs

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@
7373
//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
7474
//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
7575
76-
use std::hash::Hasher;
77-
78-
use twox_hash::XxHash64;
79-
8076
/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
8177
const SALT: [u32; 8] = [
8278
0x47b6137b_u32,
@@ -223,32 +219,20 @@ impl Sbbf {
223219
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
224220
}
225221

226-
/// Insert a hash into the filter
227-
fn insert_hash(&mut self, hash: u64) {
222+
/// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash.
223+
pub fn insert_hash(&mut self, hash: u64) {
228224
let block_index = self.hash_to_block_index(hash);
229225
self.0[block_index].insert(hash as u32)
230226
}
231227

232228
/// Check if a hash is in the filter. May return
233229
/// true for values that was never inserted ("false positive")
234230
/// but will always return false if a hash has not been inserted.
235-
fn check_hash(&self, hash: u64) -> bool {
231+
pub fn check_hash(&self, hash: u64) -> bool {
236232
let block_index = self.hash_to_block_index(hash);
237233
self.0[block_index].check(hash as u32)
238234
}
239235

240-
/// Insert a digest (u64 hash value) into the filter
241-
pub fn insert_digest(&mut self, digest: u64) {
242-
let hash = hash_u64(digest);
243-
self.insert_hash(hash)
244-
}
245-
246-
/// Check if a digest is probably present or definitely absent in the filter
247-
pub fn check_digest(&self, digest: u64) -> bool {
248-
let hash = hash_u64(digest);
249-
self.check_hash(hash)
250-
}
251-
252236
/// Merge another bloom filter into this one (bitwise OR operation)
253237
/// Panics if the filters have different sizes
254238
pub fn union(&mut self, other: &Self) {
@@ -271,30 +255,10 @@ impl Sbbf {
271255
}
272256
}
273257

274-
/// Per spec we use xxHash with seed=0
275-
const SEED: u64 = 0;
276-
277-
/// Hash a u64 value using XxHash64
278-
#[inline]
279-
fn hash_u64(value: u64) -> u64 {
280-
let mut hasher = XxHash64::with_seed(SEED);
281-
hasher.write_u64(value);
282-
hasher.finish()
283-
}
284-
285258
#[cfg(test)]
286259
mod tests {
287260
use super::*;
288261

289-
#[test]
290-
fn test_hash_u64() {
291-
let digest1 = 12345u64;
292-
let digest2 = 67890u64;
293-
assert_ne!(hash_u64(digest1), digest1);
294-
assert_ne!(hash_u64(digest2), digest2);
295-
assert_ne!(hash_u64(digest1), hash_u64(digest2));
296-
}
297-
298262
#[test]
299263
fn test_mask_set_quick_check() {
300264
for i in 0..1_000_000 {
@@ -316,27 +280,27 @@ mod tests {
316280
fn test_sbbf_insert_and_check() {
317281
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
318282
for i in 0..1_000_000 {
319-
sbbf.insert_digest(i);
320-
assert!(sbbf.check_digest(i));
283+
sbbf.insert_hash(i);
284+
assert!(sbbf.check_hash(i));
321285
}
322286
}
323287

324288
#[test]
325289
fn test_sbbf_union() {
326290
let mut filter1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
327291
for i in 0..50 {
328-
filter1.insert_digest(i);
292+
filter1.insert_hash(i);
329293
}
330294

331295
let mut filter2 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
332296
for i in 50..100 {
333-
filter2.insert_digest(i);
297+
filter2.insert_hash(i);
334298
}
335299

336300
filter1.union(&filter2);
337301

338302
for i in 0..100 {
339-
assert!(filter1.check_digest(i));
303+
assert!(filter1.check_hash(i));
340304
}
341305
}
342306

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use super::packet::JoinRuntimeFilterPacket;
3333
use super::packet::RuntimeFilterPacket;
3434
use super::packet::SerializableDomain;
3535
use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc;
36-
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method;
36+
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method_for_bloom;
3737

3838
struct JoinRuntimeFilterPacketBuilder<'a> {
3939
build_key_column: Column,
@@ -156,7 +156,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
156156
let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?;
157157
let mut hashes = Vec::with_capacity(num_rows);
158158
let key_columns = &[self.build_key_column.clone().into()];
159-
hash_by_method(&method, key_columns.into(), num_rows, &mut hashes)?;
159+
hash_by_method_for_bloom(&method, key_columns.into(), num_rows, &mut hashes)?;
160160
Ok(hashes)
161161
}
162162

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ async fn build_bloom_filter(
256256
if total_items < 50000 {
257257
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
258258
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
259-
for digest in bloom {
260-
filter.insert_digest(digest);
259+
for hash in bloom {
260+
filter.insert_hash(hash);
261261
}
262262
return Ok(RuntimeFilterBloom {
263263
column_name,
@@ -279,8 +279,8 @@ async fn build_bloom_filter(
279279
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
280280
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
281281

282-
for digest in chunk {
283-
filter.insert_digest(digest);
282+
for hash in chunk {
283+
filter.insert_hash(hash);
284284
}
285285
Ok::<Sbbf, ErrorCode>(filter)
286286
})

0 commit comments

Comments
 (0)