diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index 0c12bc481ab394..863c064ff86fef 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -215,6 +215,10 @@ class ColumnPredicate { return false; } + virtual bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const { + return true; + } + virtual bool evaluate_and(const BloomFilter* bf) const { return true; } virtual bool evaluate_and(const StringRef* dict_words, const size_t dict_count) const { diff --git a/be/src/olap/comparison_predicate.h b/be/src/olap/comparison_predicate.h index 0242f0f15b4abc..12db94a5f716dc 100644 --- a/be/src/olap/comparison_predicate.h +++ b/be/src/olap/comparison_predicate.h @@ -180,20 +180,30 @@ class ComparisonPredicateBase : public ColumnPredicate { } bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override { - if (!(*statistic->get_stat_func)(statistic, column_id())) { - return true; + bool result = true; + if ((*statistic->get_stat_func)(statistic, column_id())) { + vectorized::Field min_field; + vectorized::Field max_field; + if (!vectorized::ParquetPredicate::parse_min_max_value( + statistic->col_schema, statistic->encoded_min_value, + statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field) + .ok()) [[unlikely]] { + result = true; + } else { + result = camp_field(min_field, max_field); + } } - vectorized::Field min_field; - vectorized::Field max_field; - if (!vectorized::ParquetPredicate::parse_min_max_value( - statistic->col_schema, statistic->encoded_min_value, - statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field) - .ok()) [[unlikely]] { - return true; - }; - - return camp_field(min_field, max_field); + if constexpr (PT == PredicateType::EQ) { + if (result && statistic->get_bloom_filter_func != nullptr && + (*statistic->get_bloom_filter_func)(statistic, column_id())) { + if (!statistic->bloom_filter) { + return result; + } + return evaluate_and(statistic->bloom_filter.get()); + } + } + return result; } bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic, @@ -318,6 +328,43 @@ class ComparisonPredicateBase : public ColumnPredicate { return PT == PredicateType::EQ && !ngram; } + bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override { + if constexpr (PT == PredicateType::EQ) { + auto test_bytes = [&](const V& value) { + return bf->test_bytes(const_cast(reinterpret_cast(&value)), + sizeof(V)); + }; + + // Only support Parquet native types where physical == logical representation + // BOOLEAN -> hash as int32 (Parquet bool stored as int32) + if constexpr (Type == PrimitiveType::TYPE_BOOLEAN) { + int32_t int32_value = static_cast(_value); + return test_bytes(int32_value); + } else if constexpr (Type == PrimitiveType::TYPE_INT) { + // INT -> hash as int32 + return test_bytes(_value); + } else if constexpr (Type == PrimitiveType::TYPE_BIGINT) { + // BIGINT -> hash as int64 + return test_bytes(_value); + } else if constexpr (Type == PrimitiveType::TYPE_FLOAT) { + // FLOAT -> hash as float + return test_bytes(_value); + } else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) { + // DOUBLE -> hash as double + return test_bytes(_value); + } else if constexpr (std::is_same_v) { + // VARCHAR/STRING -> hash bytes + return bf->test_bytes(_value.data, _value.size); + } else { + // Unsupported types: return true (accept) + return true; + } + } else { + LOG(FATAL) << "Bloom filter is not supported by predicate type."; + return true; + } + } + void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size, bool* flags) const override { _evaluate_bit(column, sel, size, flags); diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h index 4b9f3e83b0e243..2246d0e2fccc15 100644 --- a/be/src/olap/in_list_predicate.h +++ b/be/src/olap/in_list_predicate.h @@ -272,20 +272,30 @@ class InListPredicateBase : public ColumnPredicate { } bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override { - if (!(*statistic->get_stat_func)(statistic, column_id())) { - return true; + bool result = true; + if ((*statistic->get_stat_func)(statistic, column_id())) { + vectorized::Field min_field; + vectorized::Field max_field; + if (!vectorized::ParquetPredicate::parse_min_max_value( + statistic->col_schema, statistic->encoded_min_value, + statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field) + .ok()) [[unlikely]] { + result = true; + } else { + result = camp_field(min_field, max_field); + } } - vectorized::Field min_field; - vectorized::Field max_field; - if (!vectorized::ParquetPredicate::parse_min_max_value( - statistic->col_schema, statistic->encoded_min_value, - statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field) - .ok()) [[unlikely]] { - return true; - }; - - return camp_field(min_field, max_field); + if constexpr (PT == PredicateType::IN_LIST) { + if (result && statistic->get_bloom_filter_func != nullptr && + (*statistic->get_bloom_filter_func)(statistic, column_id())) { + if (!statistic->bloom_filter) { + return result; + } + return evaluate_and(statistic->bloom_filter.get()); + } + } + return result; } bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic, @@ -404,6 +414,58 @@ class InListPredicateBase : public ColumnPredicate { return get_in_list_ignore_thredhold(_values->size()); } + bool evaluate_and(const vectorized::ParquetBlockSplitBloomFilter* bf) const override { + if constexpr (PT == PredicateType::IN_LIST) { + HybridSetBase::IteratorBase* iter = _values->begin(); + while (iter->has_next()) { + const T* value = (const T*)(iter->get_value()); + + auto test_bytes = [&](const V& val) { + return bf->test_bytes(const_cast(reinterpret_cast(&val)), + sizeof(V)); + }; + + // Small integers (TINYINT, SMALLINT, INTEGER) -> hash as int32 + if constexpr (Type == PrimitiveType::TYPE_TINYINT || + Type == PrimitiveType::TYPE_SMALLINT || + Type == PrimitiveType::TYPE_INT) { + int32_t int32_value = static_cast(*value); + if (test_bytes(int32_value)) { + return true; + } + } else if constexpr (Type == PrimitiveType::TYPE_BIGINT) { + // BIGINT -> hash as int64 + if (test_bytes(*value)) { + return true; + } + } else if constexpr (Type == PrimitiveType::TYPE_DOUBLE) { + // DOUBLE -> hash as double + if (test_bytes(*value)) { + return true; + } + } else if constexpr (Type == PrimitiveType::TYPE_FLOAT) { + // FLOAT -> hash as float + if (test_bytes(*value)) { + return true; + } + } else if constexpr (std::is_same_v) { + // VARCHAR/STRING -> hash bytes + if (bf->test_bytes(value->data, value->size)) { + return true; + } + } else { + // Unsupported types: return true (accept) + return true; + } + iter->next(); + } + return false; + } else { + LOG(FATAL) << "Bloom filter is not supported by predicate type."; + return true; + } + } + private: uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel, uint16_t size) const override { diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.h b/be/src/olap/rowset/segment_v2/bloom_filter.h index d9df2e323fd5cd..7ca6fdac634695 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter.h +++ b/be/src/olap/rowset/segment_v2/bloom_filter.h @@ -102,9 +102,9 @@ class BloomFilter { return this->init(optimal_bit_num(n, fpp) / 8, strategy); } - Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); } + virtual Status init(uint64_t filter_size) { return init(filter_size, HASH_MURMUR3_X64_64); } - Status init(uint64_t filter_size, HashStrategyPB strategy) { + virtual Status init(uint64_t filter_size, HashStrategyPB strategy) { if (strategy == HASH_MURMUR3_X64_64) { _hash_func = murmur_hash3_x64_64; } else { @@ -182,7 +182,7 @@ class BloomFilter { add_hash(code); } - bool test_bytes(const char* buf, size_t size) const { + virtual bool test_bytes(const char* buf, size_t size) const { if (buf == nullptr) { return *_has_null; } @@ -200,7 +200,7 @@ class BloomFilter { virtual size_t size() const { return _size; } - void set_has_null(bool has_null) { *_has_null = has_null; } + virtual void set_has_null(bool has_null) { *_has_null = has_null; } virtual bool has_null() const { return *_has_null; } @@ -239,7 +239,6 @@ class BloomFilter { // is this bf used for write bool _is_write = false; -private: std::function _hash_func; }; diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index 359d9951fc1807..9c5d4ef3aca539 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -380,6 +381,15 @@ class HashUtil { return XXH3_64bits_withSeed(reinterpret_cast(&INT_VALUE), sizeof(int), seed); } + static xxh_u64 xxhash64_compat_with_seed(const char* s, size_t len, xxh_u64 seed) { + return XXH64(reinterpret_cast(s), len, seed); + } + + static xxh_u64 xxhash64_compat_null_with_seed(xxh_u64 seed) { + static const int INT_VALUE = 0; + return XXH64(reinterpret_cast(&INT_VALUE), sizeof(int), seed); + } + #if defined(__clang__) #pragma clang diagnostic pop #endif diff --git a/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.cpp b/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.cpp new file mode 100644 index 00000000000000..626798deeeaa0f --- /dev/null +++ b/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.cpp @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "vec/exec/format/parquet/vparquet_column_reader.h" + +namespace doris { +namespace vectorized { + +// for write +Status ParquetBlockSplitBloomFilter::init(uint64_t filter_size, + segment_v2::HashStrategyPB strategy) { + if (strategy == XX_HASH_64) { + _hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) { + auto h = + HashUtil::xxhash64_compat_with_seed(reinterpret_cast(buf), len, 0); + *reinterpret_cast(out) = h; + }; + } else { + return Status::InvalidArgument("invalid strategy:{}", strategy); + } + _num_bytes = filter_size; + _size = _num_bytes; + _data = new char[_size]; + memset(_data, 0, _size); + _has_null = nullptr; + _is_write = true; + g_write_bloom_filter_num << 1; + g_write_bloom_filter_total_bytes << _size; + g_total_bloom_filter_total_bytes << _size; + return Status::OK(); +} + +// for read +// use deep copy to acquire the data +Status ParquetBlockSplitBloomFilter::init(const char* buf, size_t size, + segment_v2::HashStrategyPB strategy) { + if (size <= 1) { + return Status::InvalidArgument("invalid size:{}", size); + } + DCHECK(size > 1); + if (strategy == XX_HASH_64) { + _hash_func = [](const void* buf, const int64_t len, const uint64_t seed, void* out) { + auto h = + HashUtil::xxhash64_compat_with_seed(reinterpret_cast(buf), len, 0); + *reinterpret_cast(out) = h; + }; + } else { + return Status::InvalidArgument("invalid strategy:{}", strategy); + } + if (buf == nullptr) { + return Status::InvalidArgument("buf is nullptr"); + } + + _data = new char[size]; + memcpy(_data, buf, size); + _size = size; + _num_bytes = _size; + _has_null = nullptr; + g_read_bloom_filter_num << 1; + g_read_bloom_filter_total_bytes << _size; + g_total_bloom_filter_total_bytes << _size; + return Status::OK(); +} + +void ParquetBlockSplitBloomFilter::add_bytes(const char* buf, size_t size) { + DCHECK(buf != nullptr) << "Parquet bloom filter does not track nulls"; + uint64_t code = hash(buf, size); + add_hash(code); +} + +bool ParquetBlockSplitBloomFilter::test_bytes(const char* buf, size_t size) const { + uint64_t code = hash(buf, size); + return test_hash(code); +} + +void ParquetBlockSplitBloomFilter::set_has_null(bool has_null) { + DCHECK(!has_null) << "Parquet bloom filter does not track nulls"; +} + +void ParquetBlockSplitBloomFilter::add_hash(uint64_t hash) { + DCHECK(_num_bytes >= BYTES_PER_BLOCK); + const uint32_t bucket_index = + static_cast((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32); + uint32_t key = static_cast(hash); + uint32_t* bitset32 = reinterpret_cast(_data); + + // Calculate mask for bucket. + BlockMask block_mask; + _set_masks(key, block_mask); + + for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { + bitset32[bucket_index * BITS_SET_PER_BLOCK + i] |= block_mask.item[i]; + } +} + +bool ParquetBlockSplitBloomFilter::test_hash(uint64_t hash) const { + const uint32_t bucket_index = + static_cast((hash >> 32) * (_num_bytes / BYTES_PER_BLOCK) >> 32); + uint32_t key = static_cast(hash); + uint32_t* bitset32 = reinterpret_cast(_data); + + // Calculate masks for bucket. + BlockMask block_mask; + _set_masks(key, block_mask); + + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + uint32_t bit_val = bitset32[BITS_SET_PER_BLOCK * bucket_index + i]; + if (0 == (bit_val & block_mask.item[i])) { + return false; + } + } + return true; +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.h b/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.h new file mode 100644 index 00000000000000..abfa43ba8d585e --- /dev/null +++ b/be/src/vec/exec/format/parquet/parquet_block_split_bloom_filter.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "olap/rowset/segment_v2/bloom_filter.h" + +namespace doris { +namespace vectorized { + +// Parquet Block Bloom filter is implemented using block-based Bloom filter algorithm +// from Putze et al.'s "Cache-, Hash- and Space-Efficient Bloom filters". The basic +// idea is to hash the item to a tiny Bloom filter which size fit a single cache line +// or smaller. This implementation sets 8 bits in each tiny Bloom filter. Each tiny +// Bloom filter is 32 bytes to take advantage of 32-byte SIMD instruction. +// +// Note: The main reasons for overriding the parent class method are: +// 1. Parquet Bloom filter does not include null data. +// 2. Parquet Bloom filter does not assume the number of blocks is a power of two, +// while Doris relies on this assumption. +// https://parquet.apache.org/docs/file-format/bloomfilter/ +class ParquetBlockSplitBloomFilter : public segment_v2::BloomFilter { +public: + Status init(uint64_t filter_size, segment_v2::HashStrategyPB strategy) override; + Status init(const char* buf, size_t size, segment_v2::HashStrategyPB strategy) override; + void add_bytes(const char* buf, size_t size) override; + bool test_bytes(const char* buf, size_t size) const override; + void set_has_null(bool has_null) override; + bool has_null() const override { return false; } + + void add_hash(uint64_t hash) override; + + bool test_hash(uint64_t hash) const override; + +private: + // Bytes in a tiny Bloom filter block. + static constexpr int BYTES_PER_BLOCK = 32; + // The number of bits to set in a tiny Bloom filter block + static constexpr int BITS_SET_PER_BLOCK = 8; + + static constexpr uint32_t SALT[BITS_SET_PER_BLOCK] = {0x47b6137bU, 0x44974d91U, 0x8824ad5bU, + 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, + 0x9efc4947U, 0x5c6bfb31U}; + + struct BlockMask { + uint32_t item[BITS_SET_PER_BLOCK]; + }; + +private: + void _set_masks(uint32_t key, BlockMask& block_mask) const { + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + block_mask.item[i] = key * SALT[i]; + } + + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + block_mask.item[i] = block_mask.item[i] >> 27; + } + + for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { + block_mask.item[i] = uint32_t(0x1) << block_mask.item[i]; + } + } +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index 59e831b455e572..7a97e057da2334 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -27,10 +27,12 @@ #include "exec/olap_common.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "parquet_common.h" +#include "runtime/primitive_type.h" #include "util/timezone_utils.h" #include "vec/common/endian.h" #include "vec/data_types/data_type_decimal.h" #include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/parquet_block_split_bloom_filter.h" #include "vec/exec/format/parquet/parquet_column_convert.h" #include "vec/exec/format/parquet/schema_desc.h" @@ -152,6 +154,7 @@ class ParquetPredicate { } public: + static constexpr int BLOOM_FILTER_MAX_HEADER_LENGTH = 64; struct ColumnStat { std::string encoded_min_value; std::string encoded_max_value; @@ -159,9 +162,34 @@ class ParquetPredicate { bool is_all_null; const FieldSchema* col_schema; const cctz::time_zone* ctz; - std::function* get_stat_func; + std::unique_ptr bloom_filter; + std::function* get_stat_func = nullptr; + std::function* get_bloom_filter_func = + nullptr; }; + static bool bloom_filter_supported(PrimitiveType type) { + // Only support types where physical type == logical type (no conversion needed) + // For types like DATEV2, DATETIMEV2, DECIMAL, Parquet stores them in physical format + // (INT32, INT64, etc.) but Doris uses different internal representations. + // Bloom filter works with physical bytes, but we only have logical type values, + // and there's no reverse conversion (logical -> physical) available. + // TINYINT/SMALLINT also need conversion via LittleIntPhysicalConverter. + switch (type) { + case TYPE_BOOLEAN: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_FLOAT: + case TYPE_DOUBLE: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: + return true; + default: + return false; + } + } + struct PageIndexStat { // Indicates whether the page index information in this column can be used. bool available = false; @@ -410,6 +438,52 @@ class ParquetPredicate { return Status::OK(); } + + static Status read_bloom_filter(const tparquet::ColumnMetaData& column_meta_data, + io::FileReaderSPtr file_reader, io::IOContext* io_ctx, + ColumnStat* ans_stat) { + size_t size; + if (!column_meta_data.__isset.bloom_filter_offset) { + return Status::NotSupported("Can not use this parquet bloom filter."); + } + + if (column_meta_data.__isset.bloom_filter_length && + column_meta_data.bloom_filter_length > 0) { + size = column_meta_data.bloom_filter_length; + } else { + size = BLOOM_FILTER_MAX_HEADER_LENGTH; + } + size_t bytes_read = 0; + std::vector header_buffer(size); + RETURN_IF_ERROR(file_reader->read_at(column_meta_data.bloom_filter_offset, + Slice(header_buffer.data(), size), &bytes_read, + io_ctx)); + + tparquet::BloomFilterHeader t_bloom_filter_header; + uint32_t t_bloom_filter_header_size = static_cast(bytes_read); + RETURN_IF_ERROR(deserialize_thrift_msg(header_buffer.data(), &t_bloom_filter_header_size, + true, &t_bloom_filter_header)); + + // TODO the bloom filter could be encrypted, too, so need to double check that this is NOT the case + if (!t_bloom_filter_header.algorithm.__isset.BLOCK || + !t_bloom_filter_header.compression.__isset.UNCOMPRESSED || + !t_bloom_filter_header.hash.__isset.XXHASH) { + return Status::NotSupported("Can not use this parquet bloom filter."); + } + + ans_stat->bloom_filter = std::make_unique(); + + std::vector data_buffer(t_bloom_filter_header.numBytes); + RETURN_IF_ERROR(file_reader->read_at( + column_meta_data.bloom_filter_offset + t_bloom_filter_header_size, + Slice(data_buffer.data(), t_bloom_filter_header.numBytes), &bytes_read, io_ctx)); + + RETURN_IF_ERROR(ans_stat->bloom_filter->init( + reinterpret_cast(data_buffer.data()), t_bloom_filter_header.numBytes, + segment_v2::HashStrategyPB::XX_HASH_64)); + + return Status::OK(); + } }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index d4fee37a33bc1b..879de81ebf78f8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -47,6 +47,7 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" #include "vec/exec/format/column_type_convert.h" +#include "vec/exec/format/parquet/parquet_block_split_bloom_filter.h" #include "vec/exec/format/parquet/parquet_common.h" #include "vec/exec/format/parquet/schema_desc.h" #include "vec/exec/format/parquet/vparquet_file_metadata.h" @@ -98,7 +99,10 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams _enable_lazy_mat(enable_lazy_mat), _enable_filter_by_min_max( state == nullptr ? true - : state->query_options().enable_parquet_filter_by_min_max) { + : state->query_options().enable_parquet_filter_by_min_max), + _enable_filter_by_bloom_filter( + state == nullptr ? true + : state->query_options().enable_parquet_filter_by_bloom_filter) { _meta_cache = meta_cache; _init_profile(); _init_system_properties(); @@ -116,7 +120,10 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRang _enable_lazy_mat(enable_lazy_mat), _enable_filter_by_min_max( state == nullptr ? true - : state->query_options().enable_parquet_filter_by_min_max) { + : state->query_options().enable_parquet_filter_by_min_max), + _enable_filter_by_bloom_filter( + state == nullptr ? true + : state->query_options().enable_parquet_filter_by_bloom_filter) { _meta_cache = meta_cache; _init_system_properties(); _init_file_description(); @@ -141,6 +148,10 @@ void ParquetReader::_init_profile() { _parquet_profile.filtered_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "FilteredGroups", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.filtered_row_groups_by_min_max = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "FilteredGroupsByMinMax", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.filtered_row_groups_by_bloom_filter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "FilteredGroupsByBloomFilter", TUnit::UNIT, parquet_profile, 1); _parquet_profile.to_read_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "ReadGroups", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_group_rows = ADD_CHILD_COUNTER_WITH_LEVEL( @@ -201,6 +212,8 @@ void ParquetReader::_init_profile() { ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1); _parquet_profile.dict_filter_rewrite_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", parquet_profile, 1); + _parquet_profile.bloom_filter_read_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "BloomFilterReadTime", parquet_profile, 1); } } @@ -1094,12 +1107,20 @@ Status ParquetReader::_process_min_max_bloom_filter( } } else { bool filter_this_row_group = false; - RETURN_IF_ERROR( - _process_column_stat_filter(row_group, push_down_pred, &filter_this_row_group)); - _init_chunk_dicts(); - RETURN_IF_ERROR(_process_dict_filter(&filter_this_row_group)); - _init_bloom_filter(); - RETURN_IF_ERROR(_process_bloom_filter(&filter_this_row_group)); + bool filtered_by_min_max = false; + bool filtered_by_bloom_filter = false; + RETURN_IF_ERROR(_process_column_stat_filter(row_group, push_down_pred, + &filter_this_row_group, &filtered_by_min_max, + &filtered_by_bloom_filter)); + // Update statistics based on filter type + if (filter_this_row_group) { + if (filtered_by_min_max) { + _statistics.filtered_row_groups_by_min_max++; + } + if (filtered_by_bloom_filter) { + _statistics.filtered_row_groups_by_bloom_filter++; + } + } if (!filter_this_row_group) { RETURN_IF_ERROR(_process_page_index_filter(row_group, row_group_index, push_down_pred, @@ -1113,14 +1134,28 @@ Status ParquetReader::_process_min_max_bloom_filter( Status ParquetReader::_process_column_stat_filter( const tparquet::RowGroup& row_group, const std::vector>& push_down_pred, - bool* filter_group) { - if (!_enable_filter_by_min_max) { + bool* filter_group, bool* filtered_by_min_max, bool* filtered_by_bloom_filter) { + // If both filters are disabled, skip filtering + if (!_enable_filter_by_min_max && !_enable_filter_by_bloom_filter) { return Status::OK(); } - for (const auto& predicate : push_down_pred) { + // Cache bloom filters for each column to avoid reading the same bloom filter multiple times + // when there are multiple predicates on the same column + std::unordered_map> + bloom_filter_cache; + + // Initialize output parameters + *filtered_by_min_max = false; + *filtered_by_bloom_filter = false; + + for (const auto& predicate : _push_down_predicates) { std::function get_stat_func = [&](ParquetPredicate::ColumnStat* stat, const int cid) { + // Check if min-max filter is enabled + if (!_enable_filter_by_min_max) { + return false; + } auto* slot = _tuple_descriptor->slots()[cid]; if (!_table_info_node_ptr->children_column_exists(slot->col_name())) { return false; @@ -1137,28 +1172,97 @@ Status ParquetReader::_process_column_stat_filter( _t_metadata->created_by, stat) .ok(); }; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* stat, const int cid) { + auto* slot = _tuple_descriptor->slots()[cid]; + if (!_table_info_node_ptr->children_column_exists(slot->col_name())) { + return false; + } + const auto& file_col_name = + _table_info_node_ptr->children_file_column_name(slot->col_name()); + const FieldSchema* col_schema = + _file_metadata->schema().get_column(file_col_name); + int parquet_col_id = col_schema->physical_column_index; + auto meta_data = row_group.columns[parquet_col_id].meta_data; + if (!meta_data.__isset.bloom_filter_offset) { + return false; + } + auto primitive_type = + remove_nullable(col_schema->data_type)->get_primitive_type(); + if (!ParquetPredicate::bloom_filter_supported(primitive_type)) { + return false; + } + + // Check if bloom filter is enabled + if (!_enable_filter_by_bloom_filter) { + return false; + } + + // Check cache first + auto cache_iter = bloom_filter_cache.find(parquet_col_id); + if (cache_iter != bloom_filter_cache.end()) { + // Bloom filter already loaded for this column, reuse it + stat->bloom_filter = std::move(cache_iter->second); + bloom_filter_cache.erase(cache_iter); + return stat->bloom_filter != nullptr; + } + + if (!stat->bloom_filter) { + SCOPED_RAW_TIMER(&_statistics.bloom_filter_read_time); + auto st = ParquetPredicate::read_bloom_filter( + meta_data, _tracing_file_reader, _io_ctx, stat); + if (!st.ok()) { + LOG(WARNING) << "Failed to read bloom filter for column " + << col_schema->name << " in file " << _scan_range.path + << ", status: " << st.to_string(); + stat->bloom_filter.reset(); + return false; + } + } + return stat->bloom_filter != nullptr; + }; ParquetPredicate::ColumnStat stat; stat.ctz = _ctz; stat.get_stat_func = &get_stat_func; + stat.get_bloom_filter_func = &get_bloom_filter_func; if (!predicate->evaluate_and(&stat)) { *filter_group = true; - return Status::OK(); - } - } - return Status::OK(); -} - -void ParquetReader::_init_chunk_dicts() {} + // Track which filter was used for filtering + // If bloom filter was loaded, it means bloom filter was used + if (stat.bloom_filter) { + *filtered_by_bloom_filter = true; + } + // If col_schema was set but no bloom filter, it means min-max stats were used + if (stat.col_schema && !stat.bloom_filter) { + *filtered_by_min_max = true; + } -Status ParquetReader::_process_dict_filter(bool* filter_group) { - return Status::OK(); -} + return Status::OK(); + } -void ParquetReader::_init_bloom_filter() {} + // After evaluating, if the bloom filter was used, cache it for subsequent predicates + if (stat.bloom_filter) { + // Find the column id for caching + for (auto* slot : _tuple_descriptor->slots()) { + if (_table_info_node_ptr->children_column_exists(slot->col_name())) { + const auto& file_col_name = + _table_info_node_ptr->children_file_column_name(slot->col_name()); + const FieldSchema* col_schema = + _file_metadata->schema().get_column(file_col_name); + int parquet_col_id = col_schema->physical_column_index; + if (stat.col_schema == col_schema) { + bloom_filter_cache[parquet_col_id] = std::move(stat.bloom_filter); + break; + } + } + } + } + } -Status ParquetReader::_process_bloom_filter(bool* filter_group) { + // Update filter statistics if this row group was not filtered + // The statistics will be updated in _init_row_groups when filter_group is true return Status::OK(); } @@ -1175,6 +1279,10 @@ void ParquetReader::_collect_profile() { _current_group_reader->collect_profile_before_close(); } COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); + COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_min_max, + _statistics.filtered_row_groups_by_min_max); + COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_bloom_filter, + _statistics.filtered_row_groups_by_bloom_filter); COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); @@ -1199,6 +1307,7 @@ void ParquetReader::_collect_profile() { _column_statistics.parse_page_header_num); COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time); COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time); + COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time, _statistics.bloom_filter_read_time); COUNTER_UPDATE(_parquet_profile.page_index_read_calls, _column_statistics.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 2aee8fe2a820c1..b4768ed5578c3b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -76,6 +76,8 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { public: struct Statistics { int32_t filtered_row_groups = 0; + int32_t filtered_row_groups_by_min_max = 0; + int32_t filtered_row_groups_by_bloom_filter = 0; int32_t read_row_groups = 0; int64_t filtered_group_rows = 0; int64_t filtered_page_rows = 0; @@ -96,6 +98,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { int64_t parse_page_index_time = 0; int64_t predicate_filter_time = 0; int64_t dict_filter_rewrite_time = 0; + int64_t bloom_filter_read_time = 0; }; ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, @@ -172,6 +175,8 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups = nullptr; + RuntimeProfile::Counter* filtered_row_groups_by_min_max = nullptr; + RuntimeProfile::Counter* filtered_row_groups_by_bloom_filter = nullptr; RuntimeProfile::Counter* to_read_row_groups = nullptr; RuntimeProfile::Counter* filtered_group_rows = nullptr; RuntimeProfile::Counter* filtered_page_rows = nullptr; @@ -202,6 +207,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { RuntimeProfile::Counter* parse_page_header_num = nullptr; RuntimeProfile::Counter* predicate_filter_time = nullptr; RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr; + RuntimeProfile::Counter* bloom_filter_read_time = nullptr; }; Status _open_file(); @@ -228,11 +234,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { Status _process_column_stat_filter( const tparquet::RowGroup& row_group, const std::vector>& push_down_pred, - bool* filter_group); - void _init_chunk_dicts(); - Status _process_dict_filter(bool* filter_group); - void _init_bloom_filter(); - Status _process_bloom_filter(bool* filter_group); + bool* filter_group, bool* filtered_by_min_max, bool* filtered_by_bloom_filter); /* * 1. row group min-max filter @@ -327,6 +329,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { RuntimeState* _state = nullptr; bool _enable_lazy_mat = true; bool _enable_filter_by_min_max = true; + bool _enable_filter_by_bloom_filter = true; const TupleDescriptor* _tuple_descriptor = nullptr; const RowDescriptor* _row_descriptor = nullptr; const std::unordered_map* _colname_to_slot_id = nullptr; diff --git a/be/test/olap/block_column_predicate_test.cpp b/be/test/olap/block_column_predicate_test.cpp index 480606ed39440d..beb5c16d7407ad 100644 --- a/be/test/olap/block_column_predicate_test.cpp +++ b/be/test/olap/block_column_predicate_test.cpp @@ -25,10 +25,12 @@ #include #include #include +#include #include #include #include +#include "common/status.h" #include "exprs/hybrid_set.h" #include "gtest/gtest_pred_impl.h" #include "olap/column_predicate.h" @@ -41,6 +43,7 @@ #include "vec/columns/column.h" #include "vec/columns/predicate_column.h" #include "vec/core/field.h" +#include "vec/exec/format/parquet/parquet_block_split_bloom_filter.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/runtime/timestamptz_value.h" @@ -2070,6 +2073,300 @@ TEST_F(BlockColumnPredicateTest, PARQUET_IN_PREDICATE) { } } +TEST_F(BlockColumnPredicateTest, PARQUET_COMPARISON_PREDICATE_BLOOM_FILTER) { + const int value = 42; + const int col_idx = 0; + std::unique_ptr pred( + new ComparisonPredicateBase(col_idx, value)); + SingleColumnBlockPredicate single_column_block_pred(pred.get()); + + auto parquet_field = std::make_unique(); + parquet_field->name = "col1"; + parquet_field->data_type = + vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true); + parquet_field->field_id = -1; + parquet_field->parquet_schema.type = tparquet::Type::type::INT32; + + auto encode_value = [](int v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + current_stat->encoded_min_value = encode_value(value); + current_stat->encoded_max_value = encode_value(value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + if (!current_stat->bloom_filter) { + current_stat->bloom_filter = + std::make_unique(); + auto* bloom = static_cast( + current_stat->bloom_filter.get()); + Status st = bloom->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bloom->add_bytes(reinterpret_cast(&value), sizeof(value)); + } + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + } + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + current_stat->encoded_min_value = encode_value(value); + current_stat->encoded_max_value = encode_value(value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + if (!current_stat->bloom_filter) { + current_stat->bloom_filter = + std::make_unique(); + auto* bloom = static_cast( + current_stat->bloom_filter.get()); + Status st = bloom->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + int other_value = value + 10; + bloom->add_bytes(reinterpret_cast(&other_value), + sizeof(other_value)); + } + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + } + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + current_stat->encoded_min_value = encode_value(value); + current_stat->encoded_max_value = encode_value(value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + bool loader_invoked = false; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_invoked = true; + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_TRUE(loader_invoked); + } + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + int min_value = value + 5; + int max_value = value + 10; + current_stat->encoded_min_value = encode_value(min_value); + current_stat->encoded_max_value = encode_value(max_value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(0, loader_calls); + } +} + +TEST_F(BlockColumnPredicateTest, PARQUET_IN_PREDICATE_BLOOM_FILTER) { + const int col_idx = 0; + auto hybrid_set = std::make_shared>(false); + const int included_value = 7; + hybrid_set->insert(&included_value); + std::unique_ptr pred( + new InListPredicateBase>(col_idx, hybrid_set)); + SingleColumnBlockPredicate single_column_block_pred(pred.get()); + + auto parquet_field = std::make_unique(); + parquet_field->name = "col1"; + parquet_field->data_type = + vectorized::DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true); + parquet_field->field_id = -1; + parquet_field->parquet_schema.type = tparquet::Type::type::INT32; + + auto encode_value = [](int v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + current_stat->encoded_min_value = encode_value(included_value); + current_stat->encoded_max_value = encode_value(included_value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + if (!current_stat->bloom_filter) { + current_stat->bloom_filter = + std::make_unique(); + auto* bloom = static_cast( + current_stat->bloom_filter.get()); + Status st = bloom->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bloom->add_bytes(reinterpret_cast(&included_value), + sizeof(included_value)); + } + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + } + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + current_stat->encoded_min_value = encode_value(included_value); + current_stat->encoded_max_value = encode_value(included_value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + if (!current_stat->bloom_filter) { + current_stat->bloom_filter = + std::make_unique(); + auto* bloom = static_cast( + current_stat->bloom_filter.get()); + Status st = bloom->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + int excluded_value = included_value + 1; + bloom->add_bytes(reinterpret_cast(&excluded_value), + sizeof(excluded_value)); + } + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + } + + { + vectorized::ParquetPredicate::ColumnStat stat; + cctz::time_zone tmp_ctz; + stat.ctz = &tmp_ctz; + + std::function get_stat_func = + [&](vectorized::ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = parquet_field.get(); + current_stat->is_all_null = false; + current_stat->has_null = false; + int min_value = included_value + 5; + int max_value = included_value + 10; + current_stat->encoded_min_value = encode_value(min_value); + current_stat->encoded_max_value = encode_value(max_value); + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](vectorized::ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(single_column_block_pred.evaluate_and(&stat)); + EXPECT_EQ(0, loader_calls); + } +} + TEST_F(BlockColumnPredicateTest, NULL_PREDICATE) { { int col_idx = 0; diff --git a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp index 7bf7d73808d68e..b1238b91e6f4a7 100644 --- a/be/test/vec/exec/format/parquet/parquet_expr_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_expr_test.cpp @@ -53,8 +53,12 @@ #include "common/status.h" #include "exec/schema_scanner.h" #include "exprs/create_predicate_function.h" +#include "exprs/hybrid_set.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" +#include "olap/comparison_predicate.h" +#include "olap/in_list_predicate.h" +#include "olap/null_predicate.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" @@ -65,6 +69,7 @@ #include "util/timezone_utils.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" +#include "vec/exec/format/parquet/parquet_block_split_bloom_filter.h" #include "vec/exec/format/parquet/parquet_thrift_util.h" #include "vec/exec/format/parquet/schema_desc.h" #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" @@ -92,10 +97,10 @@ class ParquetExprTest : public testing::Test { st = io::global_local_filesystem()->create_directory(test_dir); ASSERT_TRUE(st.ok()) << st; - // 1. 构造各种类型的 Arrow Array + // 1. Construct various types of Arrow Arrays const int num_rows = 6; - // int32 部分为 null + // int32 partially null arrow::Int32Builder int32_partial_null_builder; for (int i = 0; i < num_rows; ++i) { if (i % 3 == 0) { @@ -107,7 +112,7 @@ class ParquetExprTest : public testing::Test { std::shared_ptr int32_partial_null_array; ASSERT_TRUE(int32_partial_null_builder.Finish(&int32_partial_null_array).ok()); - // int32 全为 null + // int32 all null arrow::Int32Builder int32_all_null_builder; for (int i = 0; i < num_rows; ++i) { ASSERT_TRUE(int32_all_null_builder.AppendNull().ok()); @@ -158,9 +163,9 @@ class ParquetExprTest : public testing::Test { // date32 arrow::Date32Builder date_builder; for (int i = 0; i < num_rows; ++i) { - // 以 2020-01-01 为基准,每行递增一天 - ASSERT_TRUE( - date_builder.Append(18262 + i).ok()); // 18262 是 2020-01-01 的 days since epoch + // Use 2020-01-01 as the base, increment one day per row + ASSERT_TRUE(date_builder.Append(18262 + i) + .ok()); // 18262 represents 2020-01-01 days since epoch } std::shared_ptr date_array; ASSERT_TRUE(date_builder.Finish(&date_array).ok()); @@ -169,7 +174,8 @@ class ParquetExprTest : public testing::Test { arrow::TimestampBuilder ts_builder(arrow::timestamp(arrow::TimeUnit::SECOND), arrow::default_memory_pool()); for (int i = 0; i < num_rows; ++i) { - ASSERT_TRUE(ts_builder.Append(1609459200 + i * 3600).ok()); // 每小时递增 + ASSERT_TRUE( + ts_builder.Append(1609459200 + i * 3600).ok()); // Increase by one hour per row } std::shared_ptr timestamp_array; ASSERT_TRUE(ts_builder.Finish(×tamp_array).ok()); @@ -194,7 +200,7 @@ class ParquetExprTest : public testing::Test { std::shared_ptr decimal_array_18_6; ASSERT_TRUE(decimal_builder_18_6.Finish(&decimal_array_18_6).ok()); - // 2. 构造 Arrow Schema + // 2. Construct Arrow Schema std::vector> fields = { arrow::field("int32_partial_null_col", arrow::int32()), arrow::field("int32_all_null_col", arrow::int32()), @@ -209,7 +215,7 @@ class ParquetExprTest : public testing::Test { arrow::field("decimal_col_18_6", decimal_type_18_6)}; auto arrow_schema = arrow::schema(fields); - // 3. 构造 Arrow Table + // 3. Construct Arrow Table auto table = arrow::Table::Make( arrow_schema, {int32_partial_null_array, int32_all_null_array, int64_array, float_array, double_array, string_array, bool_array, date_array, @@ -297,7 +303,7 @@ class ParquetExprTest : public testing::Test { t_desc_table.tableDescriptors.push_back(t_table_desc); t_desc_table.__isset.tableDescriptors = true; - // init boolean and numeric slot + // Initialize boolean and numeric slots for (int i = 0; i < table_column_names.size(); i++) { TSlotDescriptor tslot_desc; { @@ -515,6 +521,7 @@ TEST_F(ParquetExprTest, test_lt) { } TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001 [10000000000 , 10000000000+3) + // int64_col = 10000000001 [10000000000 , 10000000000+3) int loc = 2; auto slot_ref = std::make_shared(loc, std::make_shared()); auto fn_eq = MockFnCall::create("eq"); @@ -566,6 +573,7 @@ TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001 [10000000000 , } TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1 + // string_col < name_1 int loc = 5; auto slot_ref = std::make_shared(loc, std::make_shared()); auto fn_eq = MockFnCall::create("lt"); @@ -609,6 +617,7 @@ TEST_F(ParquetExprTest, test_lt_2) { // string_col < name_1 } TEST_F(ParquetExprTest, test_is_null) { // int32_all_null_col is null + // int32_all_null_col is null auto slot_ref = std::make_shared(1, std::make_shared()); auto fn_eq = MockFnCall::create("is_null_pred"); @@ -660,6 +669,7 @@ TEST_F(ParquetExprTest, test_is_null) { // int32_all_null_col is null } TEST_F(ParquetExprTest, test_is_not_null) { // int32_all_null_col is not null + // int32_all_null_col is not null auto slot_ref = std::make_shared(1, std::make_shared()); auto fn_eq = MockFnCall::create("is_not_null_pred"); auto const_val = std::make_shared( @@ -710,6 +720,7 @@ TEST_F(ParquetExprTest, test_is_not_null) { // int32_all_null_col is not null } TEST_F(ParquetExprTest, test_is_null_2) { // int32_partial_null_col is null + // int32_partial_null_col is null auto slot_ref = std::make_shared(0, std::make_shared()); auto fn_eq = MockFnCall::create("is_null_pred"); auto const_val = std::make_shared( @@ -1249,15 +1260,19 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) { .ok()); bool filter_group = false; - ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[0], - p_reader->_push_down_predicates, - &filter_group) + bool filtered_by_min_max = false; + bool filtered_by_bloom_filter = false; + ASSERT_TRUE(p_reader->_process_column_stat_filter( + doris_metadata.row_groups[0], p_reader->_push_down_predicates, + &filter_group, &filtered_by_min_max, &filtered_by_bloom_filter) .OK()); ASSERT_FALSE(filter_group); filter_group = true; - ASSERT_TRUE(p_reader->_process_column_stat_filter(doris_metadata.row_groups[1], - p_reader->_push_down_predicates, - &filter_group) + filtered_by_min_max = false; + filtered_by_bloom_filter = false; + ASSERT_TRUE(p_reader->_process_column_stat_filter( + doris_metadata.row_groups[1], p_reader->_push_down_predicates, + &filter_group, &filtered_by_min_max, &filtered_by_bloom_filter) .OK()); ASSERT_TRUE(filter_group); } @@ -1319,5 +1334,567 @@ TEST_F(ParquetExprTest, test_expr_push_down_or_string) { ASSERT_TRUE(p_reader->check_expr_can_push_down(or_expr)); } +TEST_F(ParquetExprTest, test_bloom_filter_skipped_when_range_miss) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value + 1; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->has_null = false; + current_stat->is_all_null = false; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(0, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_rejects_value) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->has_null = false; + current_stat->is_all_null = false; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + current_stat->bloom_filter = + std::make_unique(); + auto* bf = static_cast( + current_stat->bloom_filter.get()); + Status st = bf->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + const int64_t other_value = predicate_value + 10; + bf->add_bytes(reinterpret_cast(&other_value), sizeof(other_value)); + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_accepts_value) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 1; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->has_null = false; + current_stat->is_all_null = false; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + current_stat->bloom_filter = + std::make_unique(); + auto* bf = static_cast( + current_stat->bloom_filter.get()); + Status st = bf->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bf->add_bytes(reinterpret_cast(&predicate_value), + sizeof(predicate_value)); + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_skipped_when_min_max_evicts_rowgroup) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + stat.bloom_filter = std::make_unique(); + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 5; + const int64_t max_value = predicate_value - 1; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(0, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_loader_called_when_min_max_allows) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 5; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + current_stat->bloom_filter = + std::make_unique(); + auto* bf = static_cast( + current_stat->bloom_filter.get()); + Status st = bf->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bf->add_bytes(reinterpret_cast(&predicate_value), + sizeof(predicate_value)); + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_loader_not_called_when_missing_metadata) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 5; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_loader_resets_on_failure) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 5; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + current_stat->bloom_filter = + std::make_unique(); + current_stat->bloom_filter.reset(); + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + EXPECT_EQ(nullptr, stat.bloom_filter); +} + +TEST_F(ParquetExprTest, test_bloom_filter_not_supported_type) { + const int col_idx = 6; // bool column + const bool predicate_value = true; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + uint8_t min_value = 0; + uint8_t max_value = 1; + const std::string encoded_min = + std::string(reinterpret_cast(&min_value), sizeof(min_value)); + const std::string encoded_max = + std::string(reinterpret_cast(&max_value), sizeof(max_value)); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_min_max_overlap_but_no_loader) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 1; + const int64_t max_value = predicate_value + 1; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + stat.get_bloom_filter_func = nullptr; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); +} + +TEST_F(ParquetExprTest, test_in_list_predicate_uses_bloom_filter) { + const int col_idx = 2; + const int64_t min_candidate = 10000000000; + const int64_t max_candidate = 10000000005; + + auto set = std::make_shared>(false); + for (int64_t v : {min_candidate, max_candidate, max_candidate + 10}) { + set->insert(&v); + } + + InListPredicateBase> + in_pred(col_idx, set); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const std::string encoded_min = encode_value(min_candidate); + const std::string encoded_max = encode_value(max_candidate); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + current_stat->bloom_filter = + std::make_unique(); + auto* bf = static_cast( + current_stat->bloom_filter.get()); + Status st = bf->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bf->add_bytes(reinterpret_cast(&min_candidate), sizeof(min_candidate)); + bf->add_bytes(reinterpret_cast(&max_candidate), sizeof(max_candidate)); + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(in_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); +} + +TEST_F(ParquetExprTest, test_in_list_predicate_no_loader_on_range_miss) { + const int col_idx = 2; + auto set = std::make_shared>(false); + for (int64_t v : {20000000000, 20000000001}) { + set->insert(&v); + } + + InListPredicateBase> + in_pred(col_idx, set); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = 10000000000; + const int64_t max_value = 10000000005; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat*, int) { + loader_calls++; + return false; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_FALSE(in_pred.evaluate_and(&stat)); + EXPECT_EQ(0, loader_calls); +} + +TEST_F(ParquetExprTest, test_bloom_filter_reused_after_first_load) { + const int col_idx = 2; + const int64_t predicate_value = 10000000001; + ComparisonPredicateBase eq_pred(col_idx, predicate_value); + + ParquetPredicate::ColumnStat stat; + stat.ctz = &ctz; + const FieldSchema* col_schema = doris_file_metadata->schema().get_column(col_idx); + + auto encode_value = [](int64_t v) { + return std::string(reinterpret_cast(&v), sizeof(v)); + }; + + const int64_t min_value = predicate_value - 5; + const int64_t max_value = predicate_value + 5; + const std::string encoded_min = encode_value(min_value); + const std::string encoded_max = encode_value(max_value); + + std::function get_stat_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + current_stat->col_schema = col_schema; + current_stat->encoded_min_value = encoded_min; + current_stat->encoded_max_value = encoded_max; + current_stat->has_null = false; + current_stat->is_all_null = false; + return true; + }; + stat.get_stat_func = &get_stat_func; + + int loader_calls = 0; + std::function get_bloom_filter_func = + [&](ParquetPredicate::ColumnStat* current_stat, int cid) { + EXPECT_EQ(col_idx, cid); + loader_calls++; + if (!current_stat->bloom_filter) { + current_stat->bloom_filter = + std::make_unique(); + auto* bf = static_cast( + current_stat->bloom_filter.get()); + Status st = bf->init(256, segment_v2::HashStrategyPB::XX_HASH_64); + EXPECT_TRUE(st.ok()); + bf->add_bytes(reinterpret_cast(&predicate_value), + sizeof(predicate_value)); + } + return true; + }; + stat.get_bloom_filter_func = &get_bloom_filter_func; + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(1, loader_calls); + + EXPECT_TRUE(eq_pred.evaluate_and(&stat)); + EXPECT_EQ(2, loader_calls); +} + } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/create_table.hql b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/create_table.hql new file mode 100644 index 00000000000000..6b20e91976ed58 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/create_table.hql @@ -0,0 +1,39 @@ +CREATE DATABASE IF NOT EXISTS multi_catalog; +USE multi_catalog; + +CREATE TABLE `multi_catalog.parquet_bloom_filter`( + `tinyint_col` tinyint, + `smallint_col` smallint, + `int_col` int, + `bigint_col` bigint, + `float_col` float, + `double_col` double, + `decimal_col` decimal(10,2), + `name` string, + `is_active` boolean, + `created_date` date, + `last_login` timestamp, + `numeric_array` array, + `address_info` struct, + `metrics` map) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1') +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/suites/multi_catalog/parquet_bloom_filter' +TBLPROPERTIES ( + 'totalSize'='0', + 'numRows'='0', + 'rawDataSize'='0', + 'parquet.dictionary.size.limit'='50', + 'numFiles'='0', + 'transient_lastDdlTime'='1763470218', + 'bucketing_version'='2', + 'parquet.compression'='ZSTD'); + +msck repair table parquet_bloom_filter; diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/data.tar.gz b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/data.tar.gz new file mode 100644 index 00000000000000..3f439848118533 Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/data.tar.gz differ diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/generate_data.script b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/generate_data.script new file mode 100644 index 00000000000000..f64b60b413ce1b --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/generate_data.script @@ -0,0 +1,44 @@ +duckdb +COPY ( + SELECT + (row_number() OVER () - 1)::TINYINT as tinyint_col, + (row_number() OVER () * 100)::SMALLINT as smallint_col, + (row_number() OVER () - 1)::INTEGER as int_col, + (row_number() OVER () * 10000)::BIGINT as bigint_col, + + (random() * 100)::FLOAT as float_col, + (random() * 1000)::DOUBLE as double_col, + (random() * 500)::DECIMAL(10,2) as decimal_col, + + 'User_' || ((row_number() OVER () - 1)::INTEGER)::VARCHAR as name, + ((row_number() OVER () - 1)::INTEGER % 2)::BOOLEAN as is_active, + date '2024-01-01' + ((row_number() OVER () - 1)::INTEGER) as created_date, + now() - (((row_number() OVER () - 1)::INTEGER) * INTERVAL '1 hour') as last_login, + + CASE ((row_number() OVER () - 1)::INTEGER) % 3 + WHEN 0 THEN [1.1, 2.2, 3.3]::DOUBLE[] + WHEN 1 THEN [4.4, 5.5]::DOUBLE[] + ELSE [6.6, 7.7, 8.8, 9.9]::DOUBLE[] + END as numeric_array, + + {'city': 'city_' || ((row_number() OVER () - 1)::INTEGER)::VARCHAR, + 'population': (random() * 100000)::INTEGER, + 'area': (random() * 100)::DOUBLE, + 'is_capital': ((row_number() OVER () - 1)::INTEGER % 2)::BOOLEAN}::STRUCT(city VARCHAR, population INTEGER, area DOUBLE, is_capital BOOLEAN) as address_info, + + MAP( + LIST_VALUE('count', 'score', 'rating'), + LIST_VALUE( + ((row_number() OVER () - 1)::INTEGER)::VARCHAR, + (random() * 100)::VARCHAR, + ((row_number() OVER () - 1)::INTEGER % 5 + 1)::VARCHAR + ) + ) AS metrics + FROM range(10) +) TO 'complete_data_types.parquet' ( + FORMAT 'PARQUET', + ROW_GROUP_SIZE 10, + DICTIONARY_SIZE_LIMIT 50, + COMPRESSION 'ZSTD' +); + diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/run.sh b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/run.sh new file mode 100755 index 00000000000000..601daadaee0ca7 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_bloom_filter/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -x + +CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + + +hadoop fs -mkdir -p /user/doris/suites/multi_catalog/ +hadoop fs -put "${CUR_DIR}"/data/* /user/doris/suites/multi_catalog/ + +# create table +hive -f "${CUR_DIR}/create_table.hql" diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ba96a4a739e7af..381bc35d74f575 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -538,6 +538,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PARQUET_FILTER_BY_MIN_MAX = "enable_parquet_filter_by_min_max"; + public static final String ENABLE_PARQUET_FILTER_BY_BLOOM_FILTER = "enable_parquet_filter_by_bloom_filter"; + public static final String ENABLE_ORC_FILTER_BY_MIN_MAX = "enable_orc_filter_by_min_max"; public static final String CHECK_ORC_INIT_SARGS_SUCCESS = "check_orc_init_sargs_success"; @@ -2264,6 +2266,15 @@ public boolean isEnableHboNonStrictMatchingMode() { needForward = true) public boolean enableParquetFilterByMinMax = true; + @VariableMgr.VarAttr( + name = ENABLE_PARQUET_FILTER_BY_BLOOM_FILTER, + fuzzy = true, + description = {"控制 parquet reader 是否启用 bloom filter 过滤。默认为 true。", + "Controls whether to filter by bloom filter in parquet reader. " + + "The default value is true."}, + needForward = true) + public boolean enableParquetFilterByBloomFilter = true; + @VariableMgr.VarAttr( name = ENABLE_ORC_FILTER_BY_MIN_MAX, description = {"控制 orc reader 是否启用 min-max 值过滤。默认为 true。", @@ -3292,6 +3303,7 @@ private void setFuzzyForCatalog(Random random) { } // parquet this.enableParquetFilterByMinMax = random.nextBoolean(); + this.enableParquetFilterByBloomFilter = random.nextBoolean(); this.enableParquetLazyMat = random.nextBoolean(); // orc @@ -4815,6 +4827,7 @@ public TQueryOptions toThrift() { tResult.setEnableParquetLazyMat(enableParquetLazyMat); tResult.setEnableOrcLazyMat(enableOrcLazyMat); tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax); + tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter); tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax); tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess); diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index 779acc83fe2bd6..c4537b16b8065a 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -351,6 +351,7 @@ message BitmapIndexPB { enum HashStrategyPB { HASH_MURMUR3_X64_64 = 0; CITY_HASH_64 = 1; + XX_HASH_64 = 2; } enum BloomFilterAlgorithmPB { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 62bb7b4b435c81..7181431c2b792e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -420,6 +420,9 @@ struct TQueryOptions { 182: optional i32 ivf_nprobe = 1; + 179: optional bool enable_parquet_filter_by_bloom_filter = true; + + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/gensrc/thrift/parquet.thrift b/gensrc/thrift/parquet.thrift index f60160a6988a26..1c18eeb796a7af 100644 --- a/gensrc/thrift/parquet.thrift +++ b/gensrc/thrift/parquet.thrift @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -// File format description for the parquet file format - namespace cpp tparquet namespace java org.apache.parquet.format @@ -30,7 +28,7 @@ enum Type { BOOLEAN = 0; INT32 = 1; INT64 = 2; - INT96 = 3; // deprecated, only used by legacy implementations. + INT96 = 3; // deprecated, new Parquet writers should not write data in INT96 FLOAT = 4; DOUBLE = 5; BYTE_ARRAY = 6; @@ -57,16 +55,16 @@ enum ConvertedType { * values */ LIST = 3; - /** an enum is converted into a binary field */ + /** an enum is converted into a BYTE_ARRAY field */ ENUM = 4; /** * A decimal value. * - * This may be used to annotate binary or fixed primitive types. The - * underlying byte array stores the unscaled value encoded as two's - * complement using big-endian byte order (the most significant byte is the - * zeroth element). The value of the decimal is the value * 10^{-scale}. + * This may be used to annotate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY primitive + * types. The underlying byte array stores the unscaled value encoded as two's + * complement using bigendian byte order (the most significant byte is the + * zeroth element). The value of the decimal is the value * 10^{scale}. * * This must be accompanied by a (maximum) precision and a scale in the * SchemaElement. The precision specifies the number of digits in the decimal @@ -155,7 +153,7 @@ enum ConvertedType { /** * An embedded BSON document * - * A BSON document embedded within a single BINARY column. + * A BSON document embedded within a single BYTE_ARRAY column. */ BSON = 20; @@ -178,16 +176,85 @@ enum ConvertedType { * Representation of Schemas */ enum FieldRepetitionType { - /** This field is required (can not be null) and each record has exactly 1 value. */ + /** This field is required (can not be null) and each row has exactly 1 value. */ REQUIRED = 0; - /** The field is optional (can be null) and each record has 0 or 1 values. */ + /** The field is optional (can be null) and each row has 0 or 1 values. */ OPTIONAL = 1; /** The field is repeated and can contain 0 or more values */ REPEATED = 2; } +/** + * A structure for capturing metadata for estimating the unencoded, + * uncompressed size of data written. This is useful for readers to estimate + * how much memory is needed to reconstruct data in their memory model and for + * fine grained filter pushdown on nested structures (the histograms contained + * in this structure can help determine the number of nulls at a particular + * nesting level and maximum length of lists). + */ +struct SizeStatistics { + /** + * The number of physical bytes stored for BYTE_ARRAY data values assuming + * no encoding. This is exclusive of the bytes needed to store the length of + * each byte array. In other words, this field is equivalent to the `(size + * of PLAINENCODING the byte array values) (4 bytes * number of values + * written)`. To determine unencoded sizes of other types readers can use + * schema information multiplied by the number of nonnull and null values. + * The number of null/nonnull values can be inferred from the histograms + * below. + * + * For example, if a column chunk is dictionaryencoded with dictionary + * ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + * then this value for that data page should be 7 (1 + 1 + 2 + 3). + * + * This field should only be set for types that use BYTE_ARRAY as their + * physical type. + */ + 1: optional i64 unencoded_byte_array_data_bytes; + /** + * When present, there is expected to be one element corresponding to each + * repetition (i.e. size=max repetition_level+1) where each element + * represents the number of times the repetition level was observed in the + * data. + * + * This field may be omitted if max_repetition_level is 0 without loss + * of information. + **/ + 2: optional list repetition_level_histogram; + /** + * Same as repetition_level_histogram except for definition levels. + * + * This field may be omitted if max_definition_level is 0 or 1 without + * loss of information. + **/ + 3: optional list definition_level_histogram; +} + +/** + * Bounding box for GEOMETRY or GEOGRAPHY type in the representation of min/max + * value pair of coordinates from each axis. + */ +struct BoundingBox { + 1: required double xmin; + 2: required double xmax; + 3: required double ymin; + 4: required double ymax; + 5: optional double zmin; + 6: optional double zmax; + 7: optional double mmin; + 8: optional double mmax; +} + +/** Statistics specific to Geometry and Geography logical types */ +struct GeospatialStatistics { + /** A bounding box of geospatial instances */ + 1: optional BoundingBox bbox; + /** Geospatial type codes of all instances, or an empty list if not known */ + 2: optional list geospatial_types; +} + /** * Statistics per row group and per page * All fields are optional. @@ -196,7 +263,7 @@ struct Statistics { /** * DEPRECATED: min and max value of the column. Use min_value and max_value. * - * Values are encoded using PLAIN encoding, except that variable-length byte + * Values are encoded using PLAIN encoding, except that variablelength byte * arrays do not include a length prefix. * * These fields encode min and max values determined by signed comparison @@ -208,27 +275,45 @@ struct Statistics { */ 1: optional binary max; 2: optional binary min; - /** count of null value in the column */ + /** + * Count of null values in the column. + * + * Writers SHOULD always write this field even if it is zero (i.e. no null value) + * or the column is not nullable. + * Readers MUST distinguish between null_count not being present and null_count == 0. + * If null_count is not present, readers MUST NOT assume null_count == 0. + */ 3: optional i64 null_count; /** count of distinct values occurring */ 4: optional i64 distinct_count; /** - * Min and max values for the column, determined by its ColumnOrder. + * Lower and upper bound values for the column, determined by its ColumnOrder. + * + * These may be the actual minimum and maximum values found on a page or column + * chunk, but can also be (more compact) values that do not exist on a page or + * column chunk. For example, instead of storing "Blart Versenwald III", a writer + * may set min_value="B", max_value="C". Such more compact values must still be + * valid values within the column's logical type. * - * Values are encoded using PLAIN encoding, except that variable-length byte + * Values are encoded using PLAIN encoding, except that variablelength byte * arrays do not include a length prefix. */ 5: optional binary max_value; 6: optional binary min_value; + /** If true, max_value is the actual maximum value for a column */ + 7: optional bool is_max_value_exact; + /** If true, min_value is the actual minimum value for a column */ + 8: optional bool is_min_value_exact; } /** Empty structs to use as logical type annotations */ -struct StringType {} // allowed for BINARY, must be encoded with UTF-8 -struct UUIDType {} // allowed for FIXED[16], must encoded raw UUID bytes +struct StringType {} // allowed for BYTE_ARRAY, must be encoded with UTF8 +struct UUIDType {} // allowed for FIXED[16], must be encoded as raw UUID bytes struct MapType {} // see LogicalTypes.md struct ListType {} // see LogicalTypes.md -struct EnumType {} // allowed for BINARY, must be encoded with UTF-8 +struct EnumType {} // allowed for BYTE_ARRAY, must be encoded with UTF8 struct DateType {} // allowed for INT32 +struct Float16Type {} // allowed for FIXED[2], must be encoded as raw FLOAT16 bytes (see LogicalTypes.md) /** * Logical type to annotate a column that is always null. @@ -242,10 +327,13 @@ struct NullType {} // allowed for any physical type, only null values stored /** * Decimal logical type annotation * - * To maintain forward-compatibility in v1, implementations using this logical + * Scale must be zero or a positive integer less than or equal to the precision. + * Precision must be a nonzero positive integer. + * + * To maintain forwardcompatibility in v1, implementations using this logical * type must also set scale and precision on the annotated SchemaElement. * - * Allowed for physical types: INT32, INT64, FIXED, and BINARY + * Allowed for physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY, and BYTE_ARRAY. */ struct DecimalType { 1: required i32 scale @@ -297,7 +385,7 @@ struct IntType { /** * Embedded JSON logical type annotation * - * Allowed for physical types: BINARY + * Allowed for physical types: BYTE_ARRAY */ struct JsonType { } @@ -305,11 +393,69 @@ struct JsonType { /** * Embedded BSON logical type annotation * - * Allowed for physical types: BINARY + * Allowed for physical types: BYTE_ARRAY */ struct BsonType { } +/** + * Embedded Variant logical type annotation + */ +struct VariantType { + // The version of the variant specification that the variant was + // written with. + 1: optional i8 specification_version +} + +/** Edge interpolation algorithm for Geography logical type */ +enum EdgeInterpolationAlgorithm { + SPHERICAL = 0; + VINCENTY = 1; + THOMAS = 2; + ANDOYER = 3; + KARNEY = 4; +} + +/** + * Embedded Geometry logical type annotation + * + * Geospatial features in the WellKnown Binary (WKB) format and edges interpolation + * is always linear/planar. + * + * A custom CRS can be set by the crs field. If unset, it defaults to "OGC:CRS84", + * which means that the geometries must be stored in longitude, latitude based on + * the WGS84 datum. + * + * Allowed for physical type: BYTE_ARRAY. + * + * See Geospatial.md for details. + */ +struct GeometryType { + 1: optional string crs; +} + +/** + * Embedded Geography logical type annotation + * + * Geospatial features in the WKB format with an explicit (nonlinear/nonplanar) + * edges interpolation algorithm. + * + * A custom geographic CRS can be set by the crs field, where longitudes are + * bound by [180, 180] and latitudes are bound by [90, 90]. If unset, the CRS + * defaults to "OGC:CRS84". + * + * An optional algorithm can be set to correctly interpret edges interpolation + * of the geometries. If unset, the algorithm defaults to SPHERICAL. + * + * Allowed for physical type: BYTE_ARRAY. + * + * See Geospatial.md for details. + */ +struct GeographyType { + 1: optional string crs; + 2: optional EdgeInterpolationAlgorithm algorithm; +} + /** * LogicalType annotations to replace ConvertedType. * @@ -339,19 +485,23 @@ union LogicalType { 12: JsonType JSON // use ConvertedType JSON 13: BsonType BSON // use ConvertedType BSON 14: UUIDType UUID // no compatible ConvertedType + 15: Float16Type FLOAT16 // no compatible ConvertedType + 16: VariantType VARIANT // no compatible ConvertedType + 17: GeometryType GEOMETRY // no compatible ConvertedType + 18: GeographyType GEOGRAPHY // no compatible ConvertedType } /** * Represents a element inside a schema definition. - * - if it is a group (inner node) then type is undefined and num_children is defined - * - if it is a primitive type (leaf) then type is defined and num_children is undefined + * if it is a group (inner node) then type is undefined and num_children is defined + * if it is a primitive type (leaf) then type is defined and num_children is undefined * the nodes are listed in depth first traversal order. */ struct SchemaElement { - /** Data type for this field. Not set if the current element is a non-leaf node */ + /** Data type for this field. Not set if the current element is a nonleaf node */ 1: optional Type type; - /** If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the vales. + /** If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values. * Otherwise, if specified, this is the maximum bit length to store any of the values. * (e.g. a low cardinality INT col could have this set to 3). Note that this is * in the schema, and therefore fixed for the entire file. @@ -366,7 +516,7 @@ struct SchemaElement { 4: required string name; /** Nested fields. Since thrift does not support nested fields, - * the nesting is flattened to a single list by a depth-first traversal. + * the nesting is flattened to a single list by a depthfirst traversal. * The children count is used to construct the nested relationship. * This field is not set when the element is a primitive type */ @@ -398,7 +548,7 @@ struct SchemaElement { * The logical type of this SchemaElement * * LogicalType replaces ConvertedType, but ConvertedType is still required - * for some logical types to ensure forward-compatibility in format v1. + * for some logical types to ensure forwardcompatibility in format v1. */ 10: optional LogicalType logicalType } @@ -410,19 +560,20 @@ struct SchemaElement { */ enum Encoding { /** Default encoding. - * BOOLEAN - 1 bit per value. 0 is false; 1 is true. - * INT32 - 4 bytes per value. Stored as little-endian. - * INT64 - 8 bytes per value. Stored as little-endian. - * FLOAT - 4 bytes per value. IEEE. Stored as little-endian. - * DOUBLE - 8 bytes per value. IEEE. Stored as little-endian. - * BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes. - * FIXED_LEN_BYTE_ARRAY - Just the bytes. + * BOOLEAN 1 bit per value. 0 is false; 1 is true. + * INT32 4 bytes per value. Stored as littleendian. + * INT64 8 bytes per value. Stored as littleendian. + * FLOAT 4 bytes per value. IEEE. Stored as littleendian. + * DOUBLE 8 bytes per value. IEEE. Stored as littleendian. + * BYTE_ARRAY 4 byte length stored as little endian, followed by bytes. + * FIXED_LEN_BYTE_ARRAY Just the bytes. */ PLAIN = 0; - // Group VarInt encoding for INT32/INT64. - // This encoding is deprecated. It was never used - // GROUP_VAR_INT = 1; + /** Group VarInt encoding for INT32/INT64. + * This encoding is deprecated. It was never used + */ + // GROUP_VAR_INT = 1; /** * Deprecated: Dictionary encoding. The values in the dictionary are encoded in the @@ -452,7 +603,7 @@ enum Encoding { */ DELTA_LENGTH_BYTE_ARRAY = 6; - /** Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED. + /** Incrementalencoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED. * Suffixes are stored as delta length byte arrays. */ DELTA_BYTE_ARRAY = 7; @@ -461,12 +612,15 @@ enum Encoding { */ RLE_DICTIONARY = 8; - /** Encoding for floating-point data. - K byte-streams are created where K is the size in bytes of the data type. - The individual bytes of an FP value are scattered to the corresponding stream and + /** Encoding for fixedwidth data (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). + K bytestreams are created where K is the size in bytes of the data type. + The individual bytes of a value are scattered to the corresponding stream and the streams are concatenated. This itself does not reduce the size of the data but can lead to better compression afterwards. + + Added in 2.8 for FLOAT and DOUBLE. + Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. */ BYTE_STREAM_SPLIT = 9; } @@ -510,7 +664,13 @@ enum BoundaryOrder { /** Data page header */ struct DataPageHeader { - /** Number of values, including NULLs, in this data page. **/ + /** + * Number of values, including NULLs, in this data page. + * + * If a OffsetIndex is present, a page must begin at a row + * boundary (repetition_level = 0). Otherwise, pages may begin + * within a row (repetition_level > 0). + **/ 1: required i32 num_values /** Encoding used for this data page **/ @@ -522,7 +682,7 @@ struct DataPageHeader { /** Encoding used for repetition levels **/ 4: required Encoding repetition_level_encoding; - /** Optional statistics for the data in this page**/ + /** Optional statistics for the data in this page **/ 5: optional Statistics statistics; } @@ -555,45 +715,49 @@ struct DataPageHeaderV2 { /** Number of values, including NULLs, in this data page. **/ 1: required i32 num_values /** Number of NULL values, in this data page. - Number of non-null = num_values - num_nulls which is also the number of values in the data section **/ + Number of nonnull = num_values num_nulls which is also the number of values in the data section **/ 2: required i32 num_nulls - /** Number of rows in this data page. which means pages change on record boundaries (r = 0) **/ + /** + * Number of rows in this data page. Every page must begin at a + * row boundary (repetition_level = 0): rows must **not** be + * split across page boundaries when using V2 data pages. + **/ 3: required i32 num_rows /** Encoding used for data in this page **/ 4: required Encoding encoding // repetition levels and definition levels are always using RLE (without size in it) - /** length of the definition levels */ + /** Length of the definition levels */ 5: required i32 definition_levels_byte_length; - /** length of the repetition levels */ + /** Length of the repetition levels */ 6: required i32 repetition_levels_byte_length; - /** whether the values are compressed. + /** Whether the values are compressed. Which means the section of the page between definition_levels_byte_length + repetition_levels_byte_length + 1 and compressed_page_size (included) is compressed with the compression_codec. If missing it is considered compressed */ - 7: optional bool is_compressed = 1; + 7: optional bool is_compressed = true; - /** optional statistics for the data in this page **/ + /** Optional statistics for the data in this page **/ 8: optional Statistics statistics; } -/** Block-based algorithm type annotation. **/ +/** Blockbased algorithm type annotation. **/ struct SplitBlockAlgorithm {} /** The algorithm used in Bloom filter. **/ union BloomFilterAlgorithm { - /** Block-based Bloom filter. **/ + /** Blockbased Bloom filter. **/ 1: SplitBlockAlgorithm BLOCK; } -/** Hash strategy type annotation. xxHash is an extremely fast non-cryptographic hash - * algorithm. It uses 64 bits version of xxHash. +/** Hash strategy type annotation. xxHash is an extremely fast noncryptographic hash + * algorithm. It uses 64 bits version of xxHash. **/ struct XxHash {} -/** +/** * The hash function used in Bloom filter. This function takes the hash of a column value * using plain encoding. **/ @@ -635,32 +799,23 @@ struct PageHeader { /** Compressed (and potentially encrypted) page size in bytes, not including this header **/ 3: required i32 compressed_page_size - /** The 32bit CRC for the page, to be be calculated as follows: - * - Using the standard CRC32 algorithm - * - On the data only, i.e. this header should not be included. 'Data' - * hereby refers to the concatenation of the repetition levels, the - * definition levels and the column value, in this exact order. - * - On the encoded versions of the repetition levels, definition levels and - * column values - * - On the compressed versions of the repetition levels, definition levels - * and column values where possible; - * - For v1 data pages, the repetition levels, definition levels and column - * values are always compressed together. If a compression scheme is - * specified, the CRC shall be calculated on the compressed version of - * this concatenation. If no compression scheme is specified, the CRC - * shall be calculated on the uncompressed version of this concatenation. - * - For v2 data pages, the repetition levels and definition levels are - * handled separately from the data and are never compressed (only - * encoded). If a compression scheme is specified, the CRC shall be - * calculated on the concatenation of the uncompressed repetition levels, - * uncompressed definition levels and the compressed column values. - * If no compression scheme is specified, the CRC shall be calculated on - * the uncompressed concatenation. - * - In encrypted columns, CRC is calculated after page encryption; the - * encryption itself is performed after page compression (if compressed) + /** The 32bit CRC checksum for the page, to be be calculated as follows: + * + * The standard CRC32 algorithm is used (with polynomial 0x04C11DB7, + * the same as in e.g. GZip). + * All page types can have a CRC (v1 and v2 data pages, dictionary pages, + * etc.). + * The CRC is computed on the serialization binary representation of the page + * (as written to disk), excluding the page header. For example, for v1 + * data pages, the CRC is computed on the concatenation of repetition levels, + * definition levels and column values (optionally compressed, optionally + * encrypted). + * The CRC computation therefore takes place after any compression + * and encryption steps, if any. + * * If enabled, this allows for disabling checksumming in HDFS if only a few * pages need to be read. - **/ + */ 4: optional i32 crc // Headers for page specific data. One only will be set. @@ -679,16 +834,16 @@ struct PageHeader { } /** - * Wrapper struct to specify sort order + * Sort order within a RowGroup of a leaf column */ struct SortingColumn { - /** The column index (in this row group) **/ + /** The ordinal position of the column (in this row group) **/ 1: required i32 column_idx /** If true, indicates this column is sorted in descending order. **/ 2: required bool descending - /** If true, nulls will come before non-null values, otherwise, + /** If true, nulls will come before nonnull values, otherwise, * nulls go at the end. */ 3: required bool nulls_first } @@ -758,6 +913,25 @@ struct ColumnMetaData { /** Byte offset from beginning of file to Bloom filter data. **/ 14: optional i64 bloom_filter_offset; + + /** Size of Bloom filter data including the serialized header, in bytes. + * Added in 2.10 so readers may not read this field from old files and + * it can be obtained after the BloomFilterHeader has been deserialized. + * Writers should write this field so readers can read the bloom filter + * in a single I/O. + */ + 15: optional i32 bloom_filter_length; + + /** + * Optional statistics to help estimate total memory when converted to inmemory + * representations. The histograms contained in these statistics can + * also be useful in some cases for more finegrained nullability/list length + * filter pushdown. + */ + 16: optional SizeStatistics size_statistics; + + /** Optional statistics specific for Geometry and Geography logical types */ + 17: optional GeospatialStatistics geospatial_statistics; } struct EncryptionWithFooterKey { @@ -766,7 +940,7 @@ struct EncryptionWithFooterKey { struct EncryptionWithColumnKey { /** Column path in schema **/ 1: required list path_in_schema - + /** Retrieval metadata of column encryption key **/ 2: optional binary key_metadata } @@ -782,12 +956,21 @@ struct ColumnChunk { **/ 1: optional string file_path - /** Byte offset in file_path to the ColumnMetaData **/ - 2: required i64 file_offset + /** Deprecated: Byte offset in file_path to the ColumnMetaData + * + * Past use of this field has been inconsistent, with some implementations + * using it to point to the ColumnMetaData and some using it to point to + * the first page in the column chunk. In many cases, the ColumnMetaData at this + * location is wrong. This field is now deprecated and should not be used. + * Writers should set this field to 0 if no ColumnMetaData has been written outside + * the footer. + */ + 2: required i64 file_offset = 0 - /** Column metadata for this chunk. This is the same content as what is at - * file_path/file_offset. Having it here has it replicated in the file - * metadata. + /** Column metadata for this chunk. Some writers may also replicate this at the + * location pointed to by file_path/file_offset. + * Note: while marked as optional, this field is in fact required by most major + * Parquet implementations. As such, writers MUST populate this field. **/ 3: optional ColumnMetaData meta_data @@ -805,7 +988,7 @@ struct ColumnChunk { /** Crypto metadata of encrypted columns **/ 8: optional ColumnCryptoMetaData crypto_metadata - + /** Encrypted column metadata for this chunk **/ 9: optional binary encrypted_column_metadata } @@ -845,10 +1028,10 @@ struct TypeDefinedOrder {} /** * Union to specify the order used for the min_value and max_value fields for a * column. This union takes the role of an enhanced enum that allows rich - * elements (which will be needed for a collation-based ordering in the future). + * elements (which will be needed for a collationbased ordering in the future). * * Possible values are: - * * TypeDefinedOrder - the column uses the order defined by its logical or + * * TypeDefinedOrder the column uses the order defined by its logical or * physical type (if there is no logical type). * * If the reader does not support the value of this union, min and max stats @@ -858,46 +1041,66 @@ union ColumnOrder { /** * The sort orders for logical types are: - * UTF8 - unsigned byte-wise comparison - * INT8 - signed comparison - * INT16 - signed comparison - * INT32 - signed comparison - * INT64 - signed comparison - * UINT8 - unsigned comparison - * UINT16 - unsigned comparison - * UINT32 - unsigned comparison - * UINT64 - unsigned comparison - * DECIMAL - signed comparison of the represented value - * DATE - signed comparison - * TIME_MILLIS - signed comparison - * TIME_MICROS - signed comparison - * TIMESTAMP_MILLIS - signed comparison - * TIMESTAMP_MICROS - signed comparison - * INTERVAL - unsigned comparison - * JSON - unsigned byte-wise comparison - * BSON - unsigned byte-wise comparison - * ENUM - unsigned byte-wise comparison - * LIST - undefined - * MAP - undefined + * UTF8 unsigned bytewise comparison + * INT8 signed comparison + * INT16 signed comparison + * INT32 signed comparison + * INT64 signed comparison + * UINT8 unsigned comparison + * UINT16 unsigned comparison + * UINT32 unsigned comparison + * UINT64 unsigned comparison + * DECIMAL signed comparison of the represented value + * DATE signed comparison + * FLOAT16 signed comparison of the represented value (*) + * TIME_MILLIS signed comparison + * TIME_MICROS signed comparison + * TIMESTAMP_MILLIS signed comparison + * TIMESTAMP_MICROS signed comparison + * INTERVAL undefined + * JSON unsigned bytewise comparison + * BSON unsigned bytewise comparison + * ENUM unsigned bytewise comparison + * LIST undefined + * MAP undefined + * VARIANT undefined + * GEOMETRY undefined + * GEOGRAPHY undefined * * In the absence of logical types, the sort order is determined by the physical type: - * BOOLEAN - false, true - * INT32 - signed comparison - * INT64 - signed comparison - * INT96 (only used for legacy timestamps) - undefined - * FLOAT - signed comparison of the represented value (*) - * DOUBLE - signed comparison of the represented value (*) - * BYTE_ARRAY - unsigned byte-wise comparison - * FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison + * BOOLEAN false, true + * INT32 signed comparison + * INT64 signed comparison + * INT96 (only used for legacy timestamps) undefined(+) + * FLOAT signed comparison of the represented value (*) + * DOUBLE signed comparison of the represented value (*) + * BYTE_ARRAY unsigned bytewise comparison + * FIXED_LEN_BYTE_ARRAY unsigned bytewise comparison + * + * (+) While the INT96 type has been deprecated, at the time of writing it is + * still used in many legacy systems. If a Parquet implementation chooses + * to write statistics for INT96 columns, it is recommended to order them + * according to the legacy rules: + * compare the last 4 bytes (days) as a littleendian 32bit signed integer + * if equal last 4 bytes, compare the first 8 bytes as a littleendian + * 64bit signed integer (nanos) + * See https://github.com/apache/parquetformat/issues/502 for more details * * (*) Because the sorting order is not specified properly for floating * point values (relations vs. total ordering) the following * compatibility rules should be applied when reading statistics: - * - If the min is a NaN, it should be ignored. - * - If the max is a NaN, it should be ignored. - * - If the min is +0, the row group may contain -0 values as well. - * - If the max is -0, the row group may contain +0 values as well. - * - When looking for NaN values, min and max should be ignored. + * If the min is a NaN, it should be ignored. + * If the max is a NaN, it should be ignored. + * If the min is +0, the row group may contain 0 values as well. + * If the max is 0, the row group may contain +0 values as well. + * When looking for NaN values, min and max should be ignored. + * + * When writing statistics the following rules should be followed: + * NaNs should not be written to min or max statistics fields. + * If the computed max value is zero (whether negative or positive), + * `+0.0` should be written into the max statistics field. + * If the computed min value is zero (whether negative or positive), + * `0.0` should be written into the min statistics field. */ 1: TypeDefinedOrder TYPE_ORDER; } @@ -913,23 +1116,44 @@ struct PageLocation { 2: required i32 compressed_page_size /** - * Index within the RowGroup of the first row of the page; this means pages - * change on record boundaries (r = 0). + * Index within the RowGroup of the first row of the page. When an + * OffsetIndex is present, pages must begin on row boundaries + * (repetition_level = 0). */ 3: required i64 first_row_index } +/** + * Optional offsets for each data page in a ColumnChunk. + * + * Forms part of the page index, along with ColumnIndex. + * + * OffsetIndex may be present even if ColumnIndex is not. + */ struct OffsetIndex { /** * PageLocations, ordered by increasing PageLocation.offset. It is required * that page_locations[i].first_row_index < page_locations[i+1].first_row_index. */ 1: required list page_locations + /** + * Unencoded/uncompressed size for BYTE_ARRAY types. + * + * See documention for unencoded_byte_array_data_bytes in SizeStatistics for + * more details on this field. + */ + 2: optional list unencoded_byte_array_data_bytes } /** - * Description for ColumnIndex. - * Each [i] refers to the page at OffsetIndex.page_locations[i] + * Optional statistics for each data page in a ColumnChunk. + * + * Forms part the page index, along with OffsetIndex. + * + * If this structure is present, OffsetIndex must also be present. + * + * For each field in this structure, [i] refers to the page at + * OffsetIndex.page_locations[i] */ struct ColumnIndex { /** @@ -955,15 +1179,42 @@ struct ColumnIndex { 3: required list max_values /** - * Stores whether both min_values and max_values are orderd and if so, in + * Stores whether both min_values and max_values are ordered and if so, in * which direction. This allows readers to perform binary searches in both * lists. Readers cannot assume that max_values[i] <= min_values[i+1], even * if the lists are ordered. */ 4: required BoundaryOrder boundary_order - /** A list containing the number of null values for each page **/ + /** + * A list containing the number of null values for each page + * + * Writers SHOULD always write this field even if no null values + * are present or the column is not nullable. + * Readers MUST distinguish between null_counts not being present + * and null_count being 0. + * If null_counts are not present, readers MUST NOT assume all + * null counts are 0. + */ 5: optional list null_counts + + /** + * Contains repetition level histograms for each page + * concatenated together. The repetition_level_histogram field on + * SizeStatistics contains more details. + * + * When present the length should always be (number of pages * + * (max_repetition_level + 1)) elements. + * + * Element 0 is the first element of the histogram for the first page. + * Element (max_repetition_level + 1) is the first element of the histogram + * for the second page. + **/ + 6: optional list repetition_level_histograms; + /** + * Same as repetition_level_histograms except for definitions levels. + **/ + 7: optional list definition_level_histograms; } struct AesGcmV1 { @@ -972,7 +1223,7 @@ struct AesGcmV1 { /** Unique file identifier part of AAD suffix **/ 2: optional binary aad_file_unique - + /** In files encrypted with AAD prefix without storing it, * readers must supply the prefix **/ 3: optional bool supply_aad_prefix @@ -984,7 +1235,7 @@ struct AesGcmCtrV1 { /** Unique file identifier part of AAD suffix **/ 2: optional binary aad_file_unique - + /** In files encrypted with AAD prefix without storing it, * readers must supply the prefix **/ 3: optional bool supply_aad_prefix @@ -1004,7 +1255,7 @@ struct FileMetaData { /** Parquet schema for this file. This schema contains metadata for all the columns. * The schema is represented as a tree with a single root. The nodes of the tree - * are flattened to a list by doing a depth-first traversal. + * are flattened to a list by doing a depthfirst traversal. * The column metadata contains the path in the schema for that column which can be * used to map columns to nodes in the schema. * The first element is the root **/ @@ -1035,7 +1286,7 @@ struct FileMetaData { * * Without column_orders, the meaning of the min_value and max_value fields * in the Statistics object and the ColumnIndex object is undefined. To ensure - * well-defined behaviour, if these fields are written to a Parquet file, + * welldefined behaviour, if these fields are written to a Parquet file, * column_orders must be written as well. * * The obsolete min and max fields in the Statistics object are always sorted @@ -1069,5 +1320,4 @@ struct FileCryptoMetaData { /** Retrieval metadata of key used for encryption of footer, * and (possibly) columns **/ 2: optional binary key_metadata -} - +} \ No newline at end of file diff --git a/regression-test/data/external_table_p0/hive/test_parquet_bloom_filter.out b/regression-test/data/external_table_p0/hive/test_parquet_bloom_filter.out new file mode 100644 index 00000000000000..ceb6863af04e79 --- /dev/null +++ b/regression-test/data/external_table_p0/hive/test_parquet_bloom_filter.out @@ -0,0 +1,123 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +1 200 1 20000 67.97603 644.3967811606323 244.67 User_1 true 2024-01-02 2025-11-18T20:18:13.927333 [4.4, 5.5] {"city":"city_1", "population":29563, "area":95.02067604330446, "is_capital":1} {"count":"1", "score":"48.84215971084722", "rating":"2"} +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} +8 900 8 90000 75.11468 35.5923493151142 137.59 User_8 false 2024-01-09 2025-11-18T13:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_8", "population":85858, "area":70.7163325383825, "is_capital":0} {"count":"8", "score":"23.962342068314726", "rating":"4"} +9 1000 9 100000 47.59034 148.4892937566462 400.49 User_9 true 2024-01-10 2025-11-18T12:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_9", "population":78762, "area":87.81244490367496, "is_capital":1} {"count":"9", "score":"78.0299174690041", "rating":"5"} + +-- !sql2 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql3 -- +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} + +-- !sql4 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql5 -- +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} + +-- !sql6 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql7 -- +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} + +-- !sql8 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql9 -- +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} + +-- !sql10 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +8 900 8 90000 75.11468 35.5923493151142 137.59 User_8 false 2024-01-09 2025-11-18T13:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_8", "population":85858, "area":70.7163325383825, "is_capital":0} {"count":"8", "score":"23.962342068314726", "rating":"4"} + +-- !sql11 -- +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} + +-- !sql12 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql13 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +9 1000 9 100000 47.59034 148.4892937566462 400.49 User_9 true 2024-01-10 2025-11-18T12:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_9", "population":78762, "area":87.81244490367496, "is_capital":1} {"count":"9", "score":"78.0299174690041", "rating":"5"} + +-- !sql14 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql15 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql1 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +1 200 1 20000 67.97603 644.3967811606323 244.67 User_1 true 2024-01-02 2025-11-18T20:18:13.927333 [4.4, 5.5] {"city":"city_1", "population":29563, "area":95.02067604330446, "is_capital":1} {"count":"1", "score":"48.84215971084722", "rating":"2"} +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} +8 900 8 90000 75.11468 35.5923493151142 137.59 User_8 false 2024-01-09 2025-11-18T13:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_8", "population":85858, "area":70.7163325383825, "is_capital":0} {"count":"8", "score":"23.962342068314726", "rating":"4"} +9 1000 9 100000 47.59034 148.4892937566462 400.49 User_9 true 2024-01-10 2025-11-18T12:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_9", "population":78762, "area":87.81244490367496, "is_capital":1} {"count":"9", "score":"78.0299174690041", "rating":"5"} + +-- !sql2 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql3 -- +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} + +-- !sql4 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql5 -- +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} + +-- !sql6 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql7 -- +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} + +-- !sql8 -- +5 600 5 60000 51.42192 928.5818542200548 389.53 User_5 true 2024-01-06 2025-11-18T16:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_5", "population":94820, "area":79.12481295285784, "is_capital":1} {"count":"5", "score":"70.53070394463444", "rating":"1"} + +-- !sql9 -- +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} + +-- !sql10 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} +4 500 4 50000 69.02388 829.7851267511951 181.01 User_4 false 2024-01-05 2025-11-18T17:18:13.927333 [4.4, 5.5] {"city":"city_4", "population":4785, "area":73.10620008707438, "is_capital":0} {"count":"4", "score":"53.841644968650705", "rating":"5"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +8 900 8 90000 75.11468 35.5923493151142 137.59 User_8 false 2024-01-09 2025-11-18T13:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_8", "population":85858, "area":70.7163325383825, "is_capital":0} {"count":"8", "score":"23.962342068314726", "rating":"4"} + +-- !sql11 -- +2 300 2 30000 18.61847 651.7552751632945 468.77 User_2 false 2024-01-03 2025-11-18T19:18:13.927333 [6.6, 7.7, 8.800000000000001, 9.9] {"city":"city_2", "population":68105, "area":37.78342876738984, "is_capital":0} {"count":"2", "score":"45.86181120513599", "rating":"3"} + +-- !sql12 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql13 -- +0 100 0 10000 26.52095 111.3678717100446 137.67 User_0 false 2024-01-01 2025-11-18T21:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_0", "population":90953, "area":18.72195184818514, "is_capital":0} {"count":"0", "score":"97.58923516038436", "rating":"1"} +3 400 3 40000 29.56161 941.7540673462295 317.98 User_3 true 2024-01-04 2025-11-18T18:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_3", "population":25266, "area":97.87510655075266, "is_capital":1} {"count":"3", "score":"79.06293398182984", "rating":"4"} +6 700 6 70000 75.29995 154.5181515292208 34.95 User_6 false 2024-01-07 2025-11-18T15:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_6", "population":22368, "area":77.25061436410046, "is_capital":0} {"count":"6", "score":"36.936104000701256", "rating":"2"} +9 1000 9 100000 47.59034 148.4892937566462 400.49 User_9 true 2024-01-10 2025-11-18T12:18:13.927333 [1.1, 2.2, 3.3] {"city":"city_9", "population":78762, "area":87.81244490367496, "is_capital":1} {"count":"9", "score":"78.0299174690041", "rating":"5"} + +-- !sql14 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + +-- !sql15 -- +7 800 7 80000 0.1260703 9.655190464555544 123.94 User_7 true 2024-01-08 2025-11-18T14:18:13.927333 [4.4, 5.5] {"city":"city_7", "population":45626, "area":87.47438721418183, "is_capital":1} {"count":"7", "score":"33.546909893506374", "rating":"3"} + diff --git a/regression-test/suites/external_table_p0/hive/test_parquet_bloom_filter.groovy b/regression-test/suites/external_table_p0/hive/test_parquet_bloom_filter.groovy new file mode 100644 index 00000000000000..4647d55646e265 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_parquet_bloom_filter.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_parquet_bloom_filter", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (!"true".equalsIgnoreCase(enabled)) { + return; + } + for (String hivePrefix : ["hive2", "hive3"]) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "test_parquet_bloom_filter" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + + sql """ use multi_catalog """ + + order_qt_sql1 """ select * from parquet_bloom_filter; """ + order_qt_sql2 """ select * from parquet_bloom_filter where tinyint_col = 5; """ + order_qt_sql3 """ select * from parquet_bloom_filter where smallint_col = 500; """ + order_qt_sql4 """ select * from parquet_bloom_filter where int_col = 7; """ + order_qt_sql5 """ select * from parquet_bloom_filter where bigint_col = 30000; """ + order_qt_sql6 """ select * from parquet_bloom_filter where float_col = cast(51.421917 as float); """ + order_qt_sql7 """ select * from parquet_bloom_filter where double_col = 941.7540673462295;""" + order_qt_sql8 """ select * from parquet_bloom_filter where decimal_col = 389.53; """ + order_qt_sql9 """ select * from parquet_bloom_filter where name = 'User_4'; """ + order_qt_sql10 """ select * from parquet_bloom_filter where is_active = false; """ + order_qt_sql11 """ select * from parquet_bloom_filter where created_date = '2024-01-03'; """ + order_qt_sql12 """ select * from parquet_bloom_filter where last_login = '2025-11-18 14:18:13.927333'; """ + order_qt_sql13 """ select * from parquet_bloom_filter where numeric_array[2] = 2.2; """ + order_qt_sql14 """ select * from parquet_bloom_filter where struct_element(address_info, "city") = 'city_7';""" + order_qt_sql15 """ select * from parquet_bloom_filter where metrics['score'] = '33.546909893506374'; """ + + sql """drop catalog ${catalog_name};""" + } +}