Skip to content

Commit cd00116

Browse files
committed
[optimize](parquet-reader) Optimize performace by parquet bloom filter. (#57959)
### What problem does this PR solve? Problem Summary: ### Release note Optimize performance by reading parquet bloom filter. parquet bloom filter: https://parquet.apache.org/docs/file-format/bloomfilter/ ### Query Performance Test Results SQL Query | Optimized Version (time(s)) | Original Version (time(s)) -- | -- | -- SELECT * FROM cqtest.bloom_filter_perf_parquet_duckdb WHERE uuid_string = 'cfcd2084-cfcd-cfcd-cfcd-cfcd208495d4'; | 0.02 | 0.23 SELECT * FROM cqtest.bloom_filter_perf_parquet_duckdb WHERE uuid_string IN ('cfcd2084-cfcd-cfcd-cfcd-cfcd208495d6', 'cfcd2084-cfcd-cfcd-cfcd-cfcd208495d7'); | 0.04 | 0.24
1 parent 08d3052 commit cd00116

File tree

22 files changed

+2155
-213
lines changed

22 files changed

+2155
-213
lines changed

be/src/olap/column_predicate.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ class ColumnPredicate {
215215
return false;
216216
}
217217

218+
virtual bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const {
219+
return true;
220+
}
221+
218222
virtual bool evaluate_and(const BloomFilter* bf) const { return true; }
219223

220224
virtual bool evaluate_and(const StringRef* dict_words, const size_t dict_count) const {

be/src/olap/comparison_predicate.h

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,30 @@ class ComparisonPredicateBase : public ColumnPredicate {
180180
}
181181

182182
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
183-
if (!(*statistic->get_stat_func)(statistic, column_id())) {
184-
return true;
183+
bool result = true;
184+
if ((*statistic->get_stat_func)(statistic, column_id())) {
185+
vectorized::Field min_field;
186+
vectorized::Field max_field;
187+
if (!vectorized::ParquetPredicate::parse_min_max_value(
188+
statistic->col_schema, statistic->encoded_min_value,
189+
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
190+
.ok()) [[unlikely]] {
191+
result = true;
192+
} else {
193+
result = camp_field(min_field, max_field);
194+
}
185195
}
186196

187-
vectorized::Field min_field;
188-
vectorized::Field max_field;
189-
if (!vectorized::ParquetPredicate::parse_min_max_value(
190-
statistic->col_schema, statistic->encoded_min_value,
191-
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
192-
.ok()) [[unlikely]] {
193-
return true;
194-
};
195-
196-
return camp_field(min_field, max_field);
197+
if constexpr (PT == PredicateType::EQ) {
198+
if (result && statistic->get_bloom_filter_func != nullptr &&
199+
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
200+
if (!statistic->bloom_filter) {
201+
return result;
202+
}
203+
return evaluate_and(statistic->bloom_filter.get());
204+
}
205+
}
206+
return result;
197207
}
198208

199209
bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
@@ -318,6 +328,43 @@ class ComparisonPredicateBase : public ColumnPredicate {
318328
return PT == PredicateType::EQ && !ngram;
319329
}
320330

331+
bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
332+
if constexpr (PT == PredicateType::EQ) {
333+
auto test_bytes = [&]<typename V>(const V& value) {
334+
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&value)),
335+
sizeof(V));
336+
};
337+
338+
// Only support Parquet native types where physical == logical representation
339+
// BOOLEAN -> hash as int32 (Parquet bool stored as int32)
340+
if constexpr (Type == PrimitiveType::TYPE_BOOLEAN) {
341+
int32_t int32_value = static_cast<int32_t>(_value);
342+
return test_bytes(int32_value);
343+
} else if constexpr (Type == PrimitiveType::TYPE_INT) {
344+
// INT -> hash as int32
345+
return test_bytes(_value);
346+
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
347+
// BIGINT -> hash as int64
348+
return test_bytes(_value);
349+
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
350+
// FLOAT -> hash as float
351+
return test_bytes(_value);
352+
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
353+
// DOUBLE -> hash as double
354+
return test_bytes(_value);
355+
} else if constexpr (std::is_same_v<T, StringRef>) {
356+
// VARCHAR/STRING -> hash bytes
357+
return bf->test_bytes(_value.data, _value.size);
358+
} else {
359+
// Unsupported types: return true (accept)
360+
return true;
361+
}
362+
} else {
363+
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
364+
return true;
365+
}
366+
}
367+
321368
void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
322369
bool* flags) const override {
323370
_evaluate_bit<false>(column, sel, size, flags);

be/src/olap/in_list_predicate.h

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -272,20 +272,30 @@ class InListPredicateBase : public ColumnPredicate {
272272
}
273273

274274
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
275-
if (!(*statistic->get_stat_func)(statistic, column_id())) {
276-
return true;
275+
bool result = true;
276+
if ((*statistic->get_stat_func)(statistic, column_id())) {
277+
vectorized::Field min_field;
278+
vectorized::Field max_field;
279+
if (!vectorized::ParquetPredicate::parse_min_max_value(
280+
statistic->col_schema, statistic->encoded_min_value,
281+
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
282+
.ok()) [[unlikely]] {
283+
result = true;
284+
} else {
285+
result = camp_field(min_field, max_field);
286+
}
277287
}
278288

279-
vectorized::Field min_field;
280-
vectorized::Field max_field;
281-
if (!vectorized::ParquetPredicate::parse_min_max_value(
282-
statistic->col_schema, statistic->encoded_min_value,
283-
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
284-
.ok()) [[unlikely]] {
285-
return true;
286-
};
287-
288-
return camp_field(min_field, max_field);
289+
if constexpr (PT == PredicateType::IN_LIST) {
290+
if (result && statistic->get_bloom_filter_func != nullptr &&
291+
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
292+
if (!statistic->bloom_filter) {
293+
return result;
294+
}
295+
return evaluate_and(statistic->bloom_filter.get());
296+
}
297+
}
298+
return result;
289299
}
290300

291301
bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
@@ -404,6 +414,58 @@ class InListPredicateBase : public ColumnPredicate {
404414
return get_in_list_ignore_thredhold(_values->size());
405415
}
406416

417+
bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
418+
if constexpr (PT == PredicateType::IN_LIST) {
419+
HybridSetBase::IteratorBase* iter = _values->begin();
420+
while (iter->has_next()) {
421+
const T* value = (const T*)(iter->get_value());
422+
423+
auto test_bytes = [&]<typename V>(const V& val) {
424+
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&val)),
425+
sizeof(V));
426+
};
427+
428+
// Small integers (TINYINT, SMALLINT, INTEGER) -> hash as int32
429+
if constexpr (Type == PrimitiveType::TYPE_TINYINT ||
430+
Type == PrimitiveType::TYPE_SMALLINT ||
431+
Type == PrimitiveType::TYPE_INT) {
432+
int32_t int32_value = static_cast<int32_t>(*value);
433+
if (test_bytes(int32_value)) {
434+
return true;
435+
}
436+
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
437+
// BIGINT -> hash as int64
438+
if (test_bytes(*value)) {
439+
return true;
440+
}
441+
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
442+
// DOUBLE -> hash as double
443+
if (test_bytes(*value)) {
444+
return true;
445+
}
446+
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
447+
// FLOAT -> hash as float
448+
if (test_bytes(*value)) {
449+
return true;
450+
}
451+
} else if constexpr (std::is_same_v<T, StringRef>) {
452+
// VARCHAR/STRING -> hash bytes
453+
if (bf->test_bytes(value->data, value->size)) {
454+
return true;
455+
}
456+
} else {
457+
// Unsupported types: return true (accept)
458+
return true;
459+
}
460+
iter->next();
461+
}
462+
return false;
463+
} else {
464+
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
465+
return true;
466+
}
467+
}
468+
407469
private:
408470
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
409471
uint16_t size) const override {

be/src/olap/rowset/segment_v2/bloom_filter.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ class BloomFilter {
102102
return this->init(optimal_bit_num(n, fpp) / 8, strategy);
103103
}
104104

105-
Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }
105+
virtual Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }
106106

107-
Status init(uint64_t filter_size, HashStrategyPB strategy) {
107+
virtual Status init(uint64_t filter_size, HashStrategyPB strategy) {
108108
if (strategy == HASH_MURMUR3_X64_64) {
109109
_hash_func = murmur_hash3_x64_64;
110110
} else {
@@ -182,7 +182,7 @@ class BloomFilter {
182182
add_hash(code);
183183
}
184184

185-
bool test_bytes(const char* buf, size_t size) const {
185+
virtual bool test_bytes(const char* buf, size_t size) const {
186186
if (buf == nullptr) {
187187
return *_has_null;
188188
}
@@ -200,7 +200,7 @@ class BloomFilter {
200200

201201
virtual size_t size() const { return _size; }
202202

203-
void set_has_null(bool has_null) { *_has_null = has_null; }
203+
virtual void set_has_null(bool has_null) { *_has_null = has_null; }
204204

205205
virtual bool has_null() const { return *_has_null; }
206206

@@ -239,7 +239,6 @@ class BloomFilter {
239239
// is this bf used for write
240240
bool _is_write = false;
241241

242-
private:
243242
std::function<void(const void*, const int64_t, const uint64_t, void*)> _hash_func;
244243
};
245244

be/src/util/hash_util.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <crc32c/crc32c.h>
2424
#include <gen_cpp/Types_types.h>
2525
#include <xxh3.h>
26+
#include <xxhash.h>
2627
#include <zlib.h>
2728

2829
#include <bit>
@@ -380,6 +381,15 @@ class HashUtil {
380381
return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE), sizeof(int), seed);
381382
}
382383

384+
static xxh_u64 xxhash64_compat_with_seed(const char* s, size_t len, xxh_u64 seed) {
385+
return XXH64(reinterpret_cast<const void*>(s), len, seed);
386+
}
387+
388+
static xxh_u64 xxhash64_compat_null_with_seed(xxh_u64 seed) {
389+
static const int INT_VALUE = 0;
390+
return XXH64(reinterpret_cast<const void*>(&INT_VALUE), sizeof(int), seed);
391+
}
392+
383393
#if defined(__clang__)
384394
#pragma clang diagnostic pop
385395
#endif
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <glog/logging.h>
19+
20+
#include <cstring>
21+
22+
#include "vec/exec/format/parquet/vparquet_column_reader.h"
23+
24+
namespace doris {
25+
namespace vectorized {
26+
27+
// for write
28+
Status ParquetBlockSplitBloomFilter::init(uint64_t filter_size,
29+
segment_v2::HashStrategyPB strategy) {
30+
if (strategy == XX_HASH_64) {
31+
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
32+
auto h =
33+
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
34+
*reinterpret_cast<uint64_t*>(out) = h;
35+
};
36+
} else {
37+
return Status::InvalidArgument("invalid strategy:{}", strategy);
38+
}
39+
_num_bytes = filter_size;
40+
_size = _num_bytes;
41+
_data = new char[_size];
42+
memset(_data, 0, _size);
43+
_has_null = nullptr;
44+
_is_write = true;
45+
g_write_bloom_filter_num << 1;
46+
g_write_bloom_filter_total_bytes << _size;
47+
g_total_bloom_filter_total_bytes << _size;
48+
return Status::OK();
49+
}
50+
51+
// for read
52+
// use deep copy to acquire the data
53+
Status ParquetBlockSplitBloomFilter::init(const char* buf, size_t size,
54+
segment_v2::HashStrategyPB strategy) {
55+
if (size <= 1) {
56+
return Status::InvalidArgument("invalid size:{}", size);
57+
}
58+
DCHECK(size > 1);
59+
if (strategy == XX_HASH_64) {
60+
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
61+
auto h =
62+
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
63+
*reinterpret_cast<uint64_t*>(out) = h;
64+
};
65+
} else {
66+
return Status::InvalidArgument("invalid strategy:{}", strategy);
67+
}
68+
if (buf == nullptr) {
69+
return Status::InvalidArgument("buf is nullptr");
70+
}
71+
72+
_data = new char[size];
73+
memcpy(_data, buf, size);
74+
_size = size;
75+
_num_bytes = _size;
76+
_has_null = nullptr;
77+
g_read_bloom_filter_num << 1;
78+
g_read_bloom_filter_total_bytes << _size;
79+
g_total_bloom_filter_total_bytes << _size;
80+
return Status::OK();
81+
}
82+
83+
void ParquetBlockSplitBloomFilter::add_bytes(const char* buf, size_t size) {
84+
DCHECK(buf != nullptr) << "Parquet bloom filter does not track nulls";
85+
uint64_t code = hash(buf, size);
86+
add_hash(code);
87+
}
88+
89+
bool ParquetBlockSplitBloomFilter::test_bytes(const char* buf, size_t size) const {
90+
uint64_t code = hash(buf, size);
91+
return test_hash(code);
92+
}
93+
94+
void ParquetBlockSplitBloomFilter::set_has_null(bool has_null) {
95+
DCHECK(!has_null) << "Parquet bloom filter does not track nulls";
96+
}
97+
98+
void ParquetBlockSplitBloomFilter::add_hash(uint64_t hash) {
99+
DCHECK(_num_bytes >= BYTES_PER_BLOCK);
100+
const uint32_t bucket_index =
101+
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
102+
uint32_t key = static_cast<uint32_t>(hash);
103+
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
104+
105+
// Calculate mask for bucket.
106+
BlockMask block_mask;
107+
_set_masks(key, block_mask);
108+
109+
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
110+
bitset32[bucket_index * BITS_SET_PER_BLOCK + i] |= block_mask.item[i];
111+
}
112+
}
113+
114+
bool ParquetBlockSplitBloomFilter::test_hash(uint64_t hash) const {
115+
const uint32_t bucket_index =
116+
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
117+
uint32_t key = static_cast<uint32_t>(hash);
118+
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
119+
120+
// Calculate masks for bucket.
121+
BlockMask block_mask;
122+
_set_masks(key, block_mask);
123+
124+
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
125+
uint32_t bit_val = bitset32[BITS_SET_PER_BLOCK * bucket_index + i];
126+
if (0 == (bit_val & block_mask.item[i])) {
127+
return false;
128+
}
129+
}
130+
return true;
131+
}
132+
133+
} // namespace vectorized
134+
} // namespace doris

0 commit comments

Comments
 (0)