-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-34785: [C++][Parquet] Add bloom filter write support #37400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
f1c6dc0
6ebd6da
70c9267
48350d8
d2a659e
41236d8
8afba81
96c6691
c131341
220b58e
ad96c48
b756241
f43505b
3497f4a
fecd0f0
29cc1c1
ffbb491
4d63428
f689716
8e9cb16
7fd47be
7c4ff4e
feccee9
90245e7
d924e36
0340193
b78eed0
23828e1
6fd57dc
86a8760
f8e724c
447badf
0c1065c
5225e08
a779982
4195406
ed267bd
478889d
2992072
4852261
add1afd
f627e30
bb8d4a5
ad0f1af
e1de5bc
430742a
00f176e
17f4951
de27ce4
259f15b
057b542
34a4c28
70e3508
c587568
2223423
22030db
e9c550a
23fb3fa
d892819
ef3291d
7aee7dd
c5b1fb1
0898466
71f5906
d57ceea
26c2d07
d422ffa
e6bc6e1
dfaf0e8
0bafe78
ce30ebc
8286783
b079acb
3a5a491
cccb9a8
3cf9425
aac454e
83999cd
fa0c9b1
351da07
12364d0
8dec902
40c9079
d32c40b
a662563
61b6dff
18f1a47
2bfa278
f03a327
4aeff8b
0f50418
789d130
6dc8d88
0940cd8
a126e03
e560a28
bf5e859
0638b11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,117 @@ | ||||||
| #include "parquet/bloom_filter_builder.h" | ||||||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
|
|
||||||
| #include <utility> | ||||||
|
|
||||||
| #include "arrow/io/interfaces.h" | ||||||
|
|
||||||
| #include "metadata.h" | ||||||
| #include "parquet/bloom_filter.h" | ||||||
| #include "parquet/exception.h" | ||||||
| #include "parquet/properties.h" | ||||||
|
|
||||||
| namespace parquet { | ||||||
|
|
||||||
| class BloomFilterBuilderImpl : public BloomFilterBuilder { | ||||||
| public: | ||||||
| explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema, | ||||||
| WriterProperties properties) | ||||||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| : schema_(schema), properties_(std::move(properties)) {} | ||||||
| /// Append a new row group to host all incoming bloom filters. | ||||||
| void AppendRowGroup() override; | ||||||
|
||||||
|
|
||||||
| BloomFilter* GetOrCreateBloomFilter( | ||||||
| int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) override; | ||||||
|
|
||||||
| /// Serialize all bloom filters with header and bitset in the order of row group and | ||||||
| /// column id. Column encryption is not implemented yet. The side effect is that it | ||||||
| /// deletes all bloom filters after they have been flushed. | ||||||
| void WriteTo(::arrow::io::OutputStream* sink, | ||||||
| BloomFilterLocation* location) override; | ||||||
|
|
||||||
| void Finish() override { finished_ = true; } | ||||||
|
|
||||||
| private: | ||||||
| /// Make sure column ordinal is not out of bound and the builder is in good state. | ||||||
| void CheckState(int32_t column_ordinal) const { | ||||||
| if (finished_) { | ||||||
|
||||||
| if (finished_) { | |
| if (finished_) [[unlikely]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckState might be called for multiple times so adding [[unlikely]] is something worth doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's checked once per rowgroup, so I don't think this would be heavy
And I suspect that compiler can already well handle this under -O2: https://godbolt.org/z/6qvevr3G1
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a recent discussion on the parquet mailing list about bloom filters and what good writers should do. My take-away was:
- Knowing NDV up-front typically requires two passes which I don't think we are doing. It might be better to take FPP and a byte size, and work out
ndvif necessary. - An extension of this idea (I think someone tried in Java, maybe it was you?) is to have multiple byte sizes (e.g. at log_2 intervals with an FPP) write to all of them and then choose the smallest one that is reasonably sparse.
- In either case it would likely be a good idea to evaluate the final bloom filters for sparcity before choosing to write them (this might be another config parameter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think someone tried in Java, maybe it was you?
Personally I think the best way is to buffering the hash values and making a decision later when hash value too much or buffer is too large. But personally I think we can first make a "static" config and enhance it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reviewed that PR and it could be a followup change. Writer implementation has the freedom to try smart things.
FYI, parquet-java also discards the bloom filter if dictionary encoding is applied to all data pages, though I don't think we should do the same thing.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it maybe better to make this a map. I expect the number of columns with a bloom filter to be relatively small compared to the number of overall columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 This reuse some structure in PageIndex, however, I think IndexLocaction is just 9b, and even for parquet file with 10000 columns, the cost here is low(about 200kib) , so I think we can keep vector here?
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| // This module defines an abstract interface for iterating through pages in a | ||
| // Parquet column chunk within a row group. It could be extended in the future | ||
| // to iterate through all data pages in all chunks in a file. | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| #pragma once | ||
|
|
||
| #include "arrow/io/interfaces.h" | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| #include "parquet/types.h" | ||
|
|
||
| namespace parquet { | ||
|
|
||
| class BloomFilter; | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| class SchemaDescriptor; | ||
| struct BloomFilterOptions; | ||
| struct BloomFilterLocation; | ||
|
|
||
| namespace schema { | ||
| class ColumnPath; | ||
| } | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// \brief Interface for collecting bloom filter of a parquet file. | ||
| class PARQUET_EXPORT BloomFilterBuilder { | ||
wgtmac marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public: | ||
| /// \brief API convenience to create a BloomFilterBuilder. | ||
wgtmac marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema, | ||
| const WriterProperties& properties); | ||
|
|
||
| /// Append a new row group to host all incoming bloom filters. | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| virtual void AppendRowGroup() = 0; | ||
|
|
||
| /// \brief Get the BloomFilter from column ordinal. | ||
| /// | ||
| /// \param column_ordinal Column ordinal in schema, which is only for leaf columns. | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// \return ColumnIndexBuilder for the column and its memory ownership belongs to | ||
| /// the PageIndexBuilder. | ||
| virtual BloomFilter* GetOrCreateBloomFilter( | ||
| int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) = 0; | ||
|
|
||
| /// \brief Write the bloom filter to sink. | ||
| /// | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// \param[out] sink The output stream to write the bloom filter. | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// \param[out] location The location of all page index to the start of sink. | ||
| virtual void WriteTo(::arrow::io::OutputStream* sink, | ||
| BloomFilterLocation* location) = 0; | ||
|
|
||
| /// \brief Complete the bloom filter builder and no more write is allowed. | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| virtual void Finish() = 0; | ||
|
|
||
| virtual ~BloomFilterBuilder() = default; | ||
| }; | ||
|
|
||
| } // namespace parquet | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include <arrow/testing/gtest_util.h> | ||
| #include <gtest/gtest.h> | ||
|
|
||
| #include "parquet/bloom_filter.h" | ||
| #include "parquet/bloom_filter_builder.h" | ||
| #include "parquet/bloom_filter_reader.h" | ||
| #include "parquet/file_reader.h" | ||
| #include "parquet/test_util.h" | ||
|
|
||
| namespace parquet::test { | ||
|
|
||
| TEST(BloomFilterReader, ReadBloomFilter) { | ||
| std::string dir_string(parquet::test::get_data_dir()); | ||
| std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet"; | ||
| auto reader = ParquetFileReader::OpenFile(path, false); | ||
| auto file_metadata = reader->metadata(); | ||
| EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); | ||
| auto& bloom_filter_reader = reader->GetBloomFilterReader(); | ||
| auto row_group_0 = bloom_filter_reader.RowGroup(0); | ||
| ASSERT_NE(nullptr, row_group_0); | ||
| EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException); | ||
| auto bloom_filter = row_group_0->GetColumnBloomFilter(0); | ||
| ASSERT_NE(nullptr, bloom_filter); | ||
| EXPECT_THROW(row_group_0->GetColumnBloomFilter(1), ParquetException); | ||
|
|
||
| // assert exists | ||
| { | ||
| std::string_view sv = "Hello"; | ||
| ByteArray ba{sv}; | ||
| EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); | ||
| } | ||
|
|
||
| // no exists | ||
| { | ||
| std::string_view sv = "NOT_EXISTS"; | ||
| ByteArray ba{sv}; | ||
| EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba))); | ||
| } | ||
| } | ||
|
|
||
| TEST(BloomFilterReader, FileNotHaveBloomFilter) { | ||
| // Can still get a BloomFilterReader and a RowGroupBloomFilter | ||
| // reader, but cannot get a non-null BloomFilter. | ||
| std::string dir_string(parquet::test::get_data_dir()); | ||
| std::string path = dir_string + "/alltypes_plain.parquet"; | ||
| auto reader = ParquetFileReader::OpenFile(path, false); | ||
| auto file_metadata = reader->metadata(); | ||
| EXPECT_FALSE(file_metadata->is_encryption_algorithm_set()); | ||
| auto& bloom_filter_reader = reader->GetBloomFilterReader(); | ||
| auto row_group_0 = bloom_filter_reader.RowGroup(0); | ||
| ASSERT_NE(nullptr, row_group_0); | ||
| EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException); | ||
| auto bloom_filter = row_group_0->GetColumnBloomFilter(0); | ||
| ASSERT_EQ(nullptr, bloom_filter); | ||
| } | ||
|
|
||
| // <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter. | ||
| TEST(BloomFilterBuilderTest, BasicRoundTrip) { | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| SchemaDescriptor schema; | ||
| schema::NodePtr root = schema::GroupNode::Make( | ||
| "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")}); | ||
| schema.Init(root); | ||
| auto writer_properties = default_writer_properties(); | ||
| auto builder = BloomFilterBuilder::Make(&schema, *writer_properties); | ||
| builder->AppendRowGroup(); | ||
| BloomFilterOptions bloom_filter_options; | ||
| bloom_filter_options.ndv = 100; | ||
| auto bloom_filter = builder->GetOrCreateBloomFilter(0, bloom_filter_options); | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ASSERT_NE(nullptr, bloom_filter); | ||
| ASSERT_EQ(bloom_filter->GetBitsetSize(), | ||
| BlockSplitBloomFilter::OptimalNumOfBytes(bloom_filter_options.ndv, | ||
| bloom_filter_options.fpp)); | ||
| std::vector<uint64_t> insert_hashes = {100, 200}; | ||
| for (uint64_t hash : insert_hashes) { | ||
| bloom_filter->InsertHash(hash); | ||
| } | ||
| builder->Finish(); | ||
| auto sink = CreateOutputStream(); | ||
| BloomFilterLocation location; | ||
| builder->WriteTo(sink.get(), &location); | ||
| EXPECT_EQ(1, location.bloom_filter_location.size()); | ||
| EXPECT_EQ(2, location.bloom_filter_location[0].size()); | ||
| EXPECT_TRUE(location.bloom_filter_location[0][0].has_value()); | ||
| EXPECT_FALSE(location.bloom_filter_location[0][1].has_value()); | ||
|
|
||
| int32_t bloom_filter_offset = location.bloom_filter_location[0][0]->offset; | ||
| int32_t bloom_filter_length = location.bloom_filter_location[0][0]->length; | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); | ||
| ReaderProperties reader_properties; | ||
| ::arrow::io::BufferReader reader( | ||
| ::arrow::SliceBuffer(buffer, bloom_filter_offset, bloom_filter_length)); | ||
| auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader); | ||
| for (uint64_t hash : insert_hashes) { | ||
| EXPECT_TRUE(bloom_filter->FindHash(hash)); | ||
| } | ||
| EXPECT_FALSE(filter.FindHash(300)); | ||
| } | ||
|
|
||
| TEST(BloomFilterBuilderTest, InvalidOperations) { | ||
| SchemaDescriptor schema; | ||
| schema::NodePtr root = | ||
| schema::GroupNode::Make("schema", Repetition::REPEATED, {schema::ByteArray("c1")}); | ||
| schema.Init(root); | ||
| auto properties = WriterProperties::Builder().build(); | ||
| auto builder = BloomFilterBuilder::Make(&schema, *properties); | ||
| // AppendRowGroup() is not called and expect throw. | ||
| BloomFilterOptions default_options; | ||
| ASSERT_THROW(builder->GetOrCreateBloomFilter(0, default_options), ParquetException); | ||
|
|
||
| builder->AppendRowGroup(); | ||
| // GetOrCreateBloomFilter() with wrong column ordinal expect throw. | ||
| ASSERT_THROW(builder->GetOrCreateBloomFilter(1, default_options), ParquetException); | ||
| builder->GetOrCreateBloomFilter(0, default_options); | ||
| auto sink = CreateOutputStream(); | ||
| BloomFilterLocation location; | ||
| // WriteTo() before Finish() expect throw. | ||
| ASSERT_THROW(builder->WriteTo(sink.get(), &location), ParquetException); | ||
| builder->Finish(); | ||
| builder->WriteTo(sink.get(), &location); | ||
| EXPECT_EQ(1, location.bloom_filter_location.size()); | ||
| } | ||
|
|
||
| } // namespace parquet::test | ||
Uh oh!
There was an error while loading. Please reload this page.