Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ class ColumnPredicate {
return false;
}

virtual bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const {
return true;
}

virtual bool evaluate_and(const BloomFilter* bf) const { return true; }

virtual bool evaluate_and(const StringRef* dict_words, const size_t dict_count) const {
Expand Down
71 changes: 59 additions & 12 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,30 @@ class ComparisonPredicateBase : public ColumnPredicate {
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
bool result = true;
if ((*statistic->get_stat_func)(statistic, column_id())) {
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
result = true;
} else {
result = camp_field(min_field, max_field);
}
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
return true;
};

return camp_field(min_field, max_field);
if constexpr (PT == PredicateType::EQ) {
if (result && statistic->get_bloom_filter_func != nullptr &&
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
if (!statistic->bloom_filter) {
return result;
}
return evaluate_and(statistic->bloom_filter.get());
}
}
return result;
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
Expand Down Expand Up @@ -318,6 +328,43 @@ class ComparisonPredicateBase : public ColumnPredicate {
return PT == PredicateType::EQ && !ngram;
}

bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
if constexpr (PT == PredicateType::EQ) {
auto test_bytes = [&]<typename V>(const V& value) {
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&value)),
sizeof(V));
};

// Only support Parquet native types where physical == logical representation
// BOOLEAN -> hash as int32 (Parquet bool stored as int32)
if constexpr (Type == PrimitiveType::TYPE_BOOLEAN) {
int32_t int32_value = static_cast<int32_t>(_value);
return test_bytes(int32_value);
} else if constexpr (Type == PrimitiveType::TYPE_INT) {
// INT -> hash as int32
return test_bytes(_value);
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
// BIGINT -> hash as int64
return test_bytes(_value);
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
// FLOAT -> hash as float
return test_bytes(_value);
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
// DOUBLE -> hash as double
return test_bytes(_value);
} else if constexpr (std::is_same_v<T, StringRef>) {
// VARCHAR/STRING -> hash bytes
return bf->test_bytes(_value.data, _value.size);
} else {
// Unsupported types: return true (accept)
return true;
}
} else {
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
return true;
}
}

void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const override {
_evaluate_bit<false>(column, sel, size, flags);
Expand Down
86 changes: 74 additions & 12 deletions be/src/olap/in_list_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,30 @@ class InListPredicateBase : public ColumnPredicate {
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
bool result = true;
if ((*statistic->get_stat_func)(statistic, column_id())) {
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
result = true;
} else {
result = camp_field(min_field, max_field);
}
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
return true;
};

return camp_field(min_field, max_field);
if constexpr (PT == PredicateType::IN_LIST) {
if (result && statistic->get_bloom_filter_func != nullptr &&
(*statistic->get_bloom_filter_func)(statistic, column_id())) {
if (!statistic->bloom_filter) {
return result;
}
return evaluate_and(statistic->bloom_filter.get());
}
}
return result;
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
Expand Down Expand Up @@ -404,6 +414,58 @@ class InListPredicateBase : public ColumnPredicate {
return get_in_list_ignore_thredhold(_values->size());
}

bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override {
if constexpr (PT == PredicateType::IN_LIST) {
HybridSetBase::IteratorBase* iter = _values->begin();
while (iter->has_next()) {
const T* value = (const T*)(iter->get_value());

auto test_bytes = [&]<typename V>(const V& val) {
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&val)),
sizeof(V));
};

// Small integers (TINYINT, SMALLINT, INTEGER) -> hash as int32
if constexpr (Type == PrimitiveType::TYPE_TINYINT ||
Type == PrimitiveType::TYPE_SMALLINT ||
Type == PrimitiveType::TYPE_INT) {
int32_t int32_value = static_cast<int32_t>(*value);
if (test_bytes(int32_value)) {
return true;
}
} else if constexpr (Type == PrimitiveType::TYPE_BIGINT) {
// BIGINT -> hash as int64
if (test_bytes(*value)) {
return true;
}
} else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) {
// DOUBLE -> hash as double
if (test_bytes(*value)) {
return true;
}
} else if constexpr (Type == PrimitiveType::TYPE_FLOAT) {
// FLOAT -> hash as float
if (test_bytes(*value)) {
return true;
}
} else if constexpr (std::is_same_v<T, StringRef>) {
// VARCHAR/STRING -> hash bytes
if (bf->test_bytes(value->data, value->size)) {
return true;
}
} else {
// Unsupported types: return true (accept)
return true;
}
iter->next();
}
return false;
} else {
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
return true;
}
}

private:
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
uint16_t size) const override {
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/rowset/segment_v2/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ class BloomFilter {
return this->init(optimal_bit_num(n, fpp) / 8, strategy);
}

Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }
virtual Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); }

Status init(uint64_t filter_size, HashStrategyPB strategy) {
virtual Status init(uint64_t filter_size, HashStrategyPB strategy) {
if (strategy == HASH_MURMUR3_X64_64) {
_hash_func = murmur_hash3_x64_64;
} else {
Expand Down Expand Up @@ -182,7 +182,7 @@ class BloomFilter {
add_hash(code);
}

bool test_bytes(const char* buf, size_t size) const {
virtual bool test_bytes(const char* buf, size_t size) const {
if (buf == nullptr) {
return *_has_null;
}
Expand All @@ -200,7 +200,7 @@ class BloomFilter {

virtual size_t size() const { return _size; }

void set_has_null(bool has_null) { *_has_null = has_null; }
virtual void set_has_null(bool has_null) { *_has_null = has_null; }

virtual bool has_null() const { return *_has_null; }

Expand Down Expand Up @@ -239,7 +239,6 @@ class BloomFilter {
// is this bf used for write
bool _is_write = false;

private:
std::function<void(const void*, const int64_t, const uint64_t, void*)> _hash_func;
};

Expand Down
10 changes: 10 additions & 0 deletions be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <crc32c/crc32c.h>
#include <gen_cpp/Types_types.h>
#include <xxh3.h>
#include <xxhash.h>
#include <zlib.h>

#include <bit>
Expand Down Expand Up @@ -380,6 +381,15 @@ class HashUtil {
return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE), sizeof(int), seed);
}

static xxh_u64 xxhash64_compat_with_seed(const char* s, size_t len, xxh_u64 seed) {
return XXH64(reinterpret_cast<const void*>(s), len, seed);
}

static xxh_u64 xxhash64_compat_null_with_seed(xxh_u64 seed) {
static const int INT_VALUE = 0;
return XXH64(reinterpret_cast<const void*>(&INT_VALUE), sizeof(int), seed);
}

#if defined(__clang__)
#pragma clang diagnostic pop
#endif
Expand Down
134 changes: 134 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <glog/logging.h>

#include <cstring>

#include "vec/exec/format/parquet/vparquet_column_reader.h"

namespace doris {
namespace vectorized {

// for write
Status ParquetBlockSplitBloomFilter::init(uint64_t filter_size,
segment_v2::HashStrategyPB strategy) {
if (strategy == XX_HASH_64) {
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
auto h =
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
*reinterpret_cast<uint64_t*>(out) = h;
};
} else {
return Status::InvalidArgument("invalid strategy:{}", strategy);
}
_num_bytes = filter_size;
_size = _num_bytes;
_data = new char[_size];
memset(_data, 0, _size);
_has_null = nullptr;
_is_write = true;
g_write_bloom_filter_num << 1;
g_write_bloom_filter_total_bytes << _size;
g_total_bloom_filter_total_bytes << _size;
return Status::OK();
}

// for read
// use deep copy to acquire the data
Status ParquetBlockSplitBloomFilter::init(const char* buf, size_t size,
segment_v2::HashStrategyPB strategy) {
if (size <= 1) {
return Status::InvalidArgument("invalid size:{}", size);
}
DCHECK(size > 1);
if (strategy == XX_HASH_64) {
_hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) {
auto h =
HashUtil::xxhash64_compat_with_seed(reinterpret_cast<const char*>(buf), len, 0);
*reinterpret_cast<uint64_t*>(out) = h;
};
} else {
return Status::InvalidArgument("invalid strategy:{}", strategy);
}
if (buf == nullptr) {
return Status::InvalidArgument("buf is nullptr");
}

_data = new char[size];
memcpy(_data, buf, size);
_size = size;
_num_bytes = _size;
_has_null = nullptr;
g_read_bloom_filter_num << 1;
g_read_bloom_filter_total_bytes << _size;
g_total_bloom_filter_total_bytes << _size;
return Status::OK();
}

void ParquetBlockSplitBloomFilter::add_bytes(const char* buf, size_t size) {
DCHECK(buf != nullptr) << "Parquet bloom filter does not track nulls";
uint64_t code = hash(buf, size);
add_hash(code);
}

bool ParquetBlockSplitBloomFilter::test_bytes(const char* buf, size_t size) const {
uint64_t code = hash(buf, size);
return test_hash(code);
}

void ParquetBlockSplitBloomFilter::set_has_null(bool has_null) {
DCHECK(!has_null) << "Parquet bloom filter does not track nulls";
}

void ParquetBlockSplitBloomFilter::add_hash(uint64_t hash) {
DCHECK(_num_bytes >= BYTES_PER_BLOCK);
const uint32_t bucket_index =
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
uint32_t key = static_cast<uint32_t>(hash);
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);

// Calculate mask for bucket.
BlockMask block_mask;
_set_masks(key, block_mask);

for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
bitset32[bucket_index * BITS_SET_PER_BLOCK + i] |= block_mask.item[i];
}
}

bool ParquetBlockSplitBloomFilter::test_hash(uint64_t hash) const {
const uint32_t bucket_index =
static_cast<uint32_t>((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32);
uint32_t key = static_cast<uint32_t>(hash);
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);

// Calculate masks for bucket.
BlockMask block_mask;
_set_masks(key, block_mask);

for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
uint32_t bit_val = bitset32[BITS_SET_PER_BLOCK * bucket_index + i];
if (0 == (bit_val & block_mask.item[i])) {
return false;
}
}
return true;
}

} // namespace vectorized
} // namespace doris
Loading
Loading