Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
f1c6dc0
Parquet: Implement skeleton for BloomFilter
mapleFU Aug 26, 2023
6ebd6da
tiny fixing
mapleFU Aug 26, 2023
70c9267
tiny update test
mapleFU Aug 26, 2023
48350d8
trying to fix ci
mapleFU Aug 26, 2023
d2a659e
fix lint
mapleFU Aug 26, 2023
41236d8
fix some style problem
mapleFU Aug 26, 2023
8afba81
add file roundtrip test
mapleFU Aug 26, 2023
96c6691
add file roundtrip test
mapleFU Aug 26, 2023
c131341
fix document and ci
mapleFU Aug 26, 2023
220b58e
Update: tiny style fix
mapleFU Aug 26, 2023
ad96c48
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Sep 2, 2023
b756241
Bloom Filter Resolve comments:
mapleFU Sep 2, 2023
f43505b
make space writing a batched writing
mapleFU Sep 2, 2023
3497f4a
update bloom_filter builder interface
mapleFU Sep 2, 2023
fecd0f0
update BloomFilterBuilder arguments
mapleFU Sep 2, 2023
29cc1c1
fix compile
mapleFU Sep 2, 2023
ffbb491
try to satisfy win compiler
mapleFU Sep 2, 2023
4d63428
change all to vector
mapleFU Sep 2, 2023
f689716
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Sep 11, 2023
8e9cb16
resolve comment
mapleFU Sep 11, 2023
7fd47be
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Oct 2, 2023
7c4ff4e
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Oct 10, 2023
feccee9
fix some comment
mapleFU Oct 10, 2023
90245e7
add cached version test
mapleFU Oct 10, 2023
d924e36
cleaning the code for column-props
mapleFU Oct 10, 2023
0340193
optimize get bf
mapleFU Oct 10, 2023
b78eed0
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 16, 2024
23828e1
comment minor fix
mapleFU Mar 16, 2024
6fd57dc
fix comment and add bloom-filter-length
mapleFU Mar 16, 2024
86a8760
Fix a bf bug
mapleFU Mar 16, 2024
f8e724c
trying to use std::map for RowGroup filter
mapleFU Mar 17, 2024
447badf
trying to fix msvc compile
mapleFU Mar 17, 2024
0c1065c
fix comment
mapleFU Mar 17, 2024
5225e08
add test case for 2 row-groups
mapleFU Mar 17, 2024
a779982
add test case for dictionary
mapleFU Mar 17, 2024
4195406
minor update style for file_writer.cc
mapleFU Mar 17, 2024
ed267bd
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 26, 2024
478889d
resolve comment
mapleFU Mar 26, 2024
2992072
fix comment for boolean col, and add test
mapleFU Mar 26, 2024
4852261
trying to add bloom boolean test
mapleFU Mar 26, 2024
add1afd
fix test
mapleFU Mar 26, 2024
f627e30
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 8, 2024
bb8d4a5
fix some comments
mapleFU Apr 8, 2024
ad0f1af
Merge branch 'parquet/support-write-bloom-filter' of github.com:maple…
mapleFU Apr 8, 2024
e1de5bc
fix lint
mapleFU Apr 8, 2024
430742a
switch to anonymous namespace
mapleFU Apr 9, 2024
00f176e
fix comment for column_writer.cc
mapleFU Apr 26, 2024
17f4951
fix comment in other parts
mapleFU Apr 26, 2024
de27ce4
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 26, 2024
259f15b
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 26, 2024
057b542
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jun 10, 2024
34a4c28
trying to fix the ci build
mapleFU Jun 10, 2024
70e3508
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jul 3, 2024
c587568
resolve comments
mapleFU Jul 3, 2024
2223423
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Nov 11, 2024
22030db
change the bloom filter from vector to map
mapleFU Nov 11, 2024
e9c550a
fix lint
mapleFU Nov 11, 2024
23fb3fa
fix lint
mapleFU Nov 14, 2024
d892819
fix comment
mapleFU Nov 15, 2024
ef3291d
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Dec 20, 2024
7aee7dd
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jan 13, 2025
c5b1fb1
Resolve comments
mapleFU Jan 13, 2025
0898466
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Feb 5, 2025
71f5906
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Feb 7, 2025
d57ceea
minor fix
mapleFU Feb 7, 2025
26c2d07
address some comments
mapleFU Feb 7, 2025
d422ffa
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 10, 2025
e6bc6e1
Minor fix
mapleFU Mar 10, 2025
dfaf0e8
try to fix lint
mapleFU Mar 10, 2025
0bafe78
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 26, 2025
ce30ebc
Resolve comment part1
mapleFU Apr 24, 2025
8286783
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 27, 2025
b079acb
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU May 28, 2025
3a5a491
Extract a BloomFilterWriterImpl, and supports binary-view type
mapleFU May 28, 2025
cccb9a8
test for string_view type
mapleFU May 28, 2025
3cf9425
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jun 3, 2025
aac454e
Fix comment part1
mapleFU Jun 3, 2025
83999cd
add writer_internal for bf
mapleFU Jun 3, 2025
fa0c9b1
try to fix ci
mapleFU Jun 3, 2025
351da07
Trying to fix lint
mapleFU Jun 4, 2025
12364d0
Remove duplicate code
mapleFU Jun 6, 2025
8dec902
Merge branch 'parquet/support-write-bloom-filter' of https://github.c…
mapleFU Jun 6, 2025
40c9079
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jun 19, 2025
d32c40b
Apply suggestions
mapleFU Jun 20, 2025
a662563
apply suggestions
mapleFU Jun 20, 2025
61b6dff
fix lint
mapleFU Jun 20, 2025
18f1a47
Update: remove some , and fix include
mapleFU Jun 20, 2025
2bfa278
Merge remote-tracking branch 'origin' into parquet/support-write-bloo…
wgtmac Nov 20, 2025
f03a327
address comments
wgtmac Nov 20, 2025
4aeff8b
Merge remote-tracking branch 'origin' into parquet/support-write-bloo…
wgtmac Nov 26, 2025
0f50418
address comments
wgtmac Nov 26, 2025
789d130
address comments
wgtmac Dec 5, 2025
6dc8d88
add template definition back
wgtmac Dec 7, 2025
0940cd8
Merge branch 'main' into parquet/support-write-bloom-filter
wgtmac Jan 14, 2026
a126e03
polish test and address feedback
wgtmac Jan 14, 2026
e560a28
address more comments
wgtmac Jan 14, 2026
bf5e859
update table
wgtmac Jan 14, 2026
0638b11
update table
wgtmac Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ set(PARQUET_SRCS
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
bloom_filter_builder.cc
column_reader.cc
column_scanner.cc
column_writer.cc
Expand Down Expand Up @@ -335,7 +336,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet_version.h"
add_parquet_test(internals-test
SOURCES
bloom_filter_test.cc
bloom_filter_reader_test.cc
bloom_filter_parquet_test.cc
properties_test.cc
statistics_test.cc
encoding_test.cc
Expand Down
113 changes: 110 additions & 3 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
#include "parquet/arrow/schema.h"
#include "parquet/arrow/test_util.h"
#include "parquet/arrow/writer.h"
#include "parquet/bloom_filter.h"
#include "parquet/bloom_filter_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/page_index.h"
Expand Down Expand Up @@ -5256,7 +5258,7 @@ auto encode_double = [](double value) {

} // namespace

class ParquetPageIndexRoundTripTest : public ::testing::Test {
class ParquetIndexRoundTripTest {
public:
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
const std::shared_ptr<::arrow::Table>& table) {
Expand All @@ -5280,10 +5282,17 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
}

protected:
std::shared_ptr<Buffer> buffer_;
};

class ParquetPageIndexRoundTripTest : public ::testing::Test,
public ParquetIndexRoundTripTest {
public:
void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
const std::set<int>& expect_columns_without_index = {}) {
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(this->buffer_));

auto metadata = reader->metadata();
ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
Expand Down Expand Up @@ -5348,7 +5357,6 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
}

protected:
std::shared_ptr<Buffer> buffer_;
std::vector<ColumnIndexObject> column_indexes_;
};

Expand Down Expand Up @@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
/*null_counts=*/{0}}));
}

class ParquetBloomFilterRoundTripTest : public ::testing::Test,
public ParquetIndexRoundTripTest {
public:
void ReadBloomFilters(int expect_num_row_groups,
const std::set<int>& expect_columns_without_filter = {}) {
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));

auto metadata = reader->metadata();
ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());

auto& bloom_filter_reader = reader->GetBloomFilterReader();

for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
auto row_group_reader = bloom_filter_reader.RowGroup(rg);
ASSERT_NE(row_group_reader, nullptr);

for (int col = 0; col < metadata->num_columns(); ++col) {
bool expect_no_bloom_filter = expect_columns_without_filter.find(col) !=
expect_columns_without_filter.cend();

auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
if (expect_no_bloom_filter) {
ASSERT_EQ(bloom_filter, nullptr);
} else {
bloom_filters_.push_back(std::move(bloom_filter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changing bloom_filters_ to be an output parameter to function ReadBloomFilters instead of a class member variable?

}
}
}
}

template <typename ArrowType>
void verifyBloomFilter(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
auto iter = ::arrow::stl::Begin<ArrowType>(chunked_array);
auto end = ::arrow::stl::End<ArrowType>(chunked_array);
while (iter != end) {
auto value = *iter;
if (value == std::nullopt) {
++iter;
continue;
}
if constexpr (std::is_same_v<ArrowType, ::arrow::StringType>) {
ByteArray ba(value.value());
EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
} else {
EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
}
++iter;
}
}

protected:
std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
};

TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three test cases below share a lot of common logic (with exactly same data). Should we refactor them to eliminate the duplicate?

BloomFilterOptions options;
options.ndv = 100;
auto writer_properties = WriterProperties::Builder()
.set_bloom_filter_options(options)
->max_row_group_length(4)
->build();
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
auto table = ::arrow::TableFromJSON(schema, {R"([
[1, "a" ],
[2, "b" ],
[3, "c" ],
[null, "d"],
[5, null],
[6, "f" ]
])"});
WriteFile(writer_properties, table);

ReadBloomFilters(/*expect_num_row_groups=*/2);
ASSERT_EQ(4, bloom_filters_.size());
{
ASSERT_NE(nullptr, bloom_filters_[0]);
auto col = table->column(0)->Slice(0, 4);
verifyBloomFilter<::arrow::Int64Type>(bloom_filters_[0].get(), *col);
}
{
ASSERT_NE(nullptr, bloom_filters_[1]);
auto col = table->column(1)->Slice(0, 4);
verifyBloomFilter<::arrow::StringType>(bloom_filters_[1].get(), *col);
}
{
ASSERT_NE(nullptr, bloom_filters_[2]);
auto col = table->column(0)->Slice(4, 2);
verifyBloomFilter<::arrow::Int64Type>(bloom_filters_[2].get(), *col);
}
{
ASSERT_NE(nullptr, bloom_filters_[3]);
auto col = table->column(1)->Slice(4, 2);
verifyBloomFilter<::arrow::StringType>(bloom_filters_[3].get(), *col);
}
}

} // namespace arrow
} // namespace parquet
6 changes: 6 additions & 0 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ class PARQUET_EXPORT BloomFilter {

virtual ~BloomFilter() = default;

// Variant of const pointer argument to facilitate template
uint64_t Hash(const int32_t* value) const { return Hash(*value); }
uint64_t Hash(const int64_t* value) const { return Hash(*value); }
uint64_t Hash(const float* value) const { return Hash(*value); }
uint64_t Hash(const double* value) const { return Hash(*value); }

protected:
// Hash strategy available for Bloom filter.
enum class HashStrategy : uint32_t { XXHASH = 0 };
Expand Down
142 changes: 142 additions & 0 deletions cpp/src/parquet/bloom_filter_builder.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This module defines an abstract interface for iterating through pages in a
// Parquet column chunk within a row group. It could be extended in the future
// to iterate through all data pages in all chunks in a file.

#include "parquet/bloom_filter_builder.h"

#include <map>
#include <utility>
#include <vector>

#include "arrow/io/interfaces.h"

#include "parquet/bloom_filter.h"
#include "parquet/exception.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"

namespace parquet {

class BloomFilterBuilderImpl : public BloomFilterBuilder {
public:
explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
WriterProperties properties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of making a copy here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway underlying builder doesn't hold a reference here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builder cannot outlive the FileWriter, so why not simply follow other places like this https://github.com/search?q=repo%3Aapache%2Farrow+%22const+WriterProperties*%22&type=code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

: schema_(schema), properties_(std::move(properties)) {}
/// Append a new row group to host all incoming bloom filters.
void AppendRowGroup() override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually appending a new row-group just marking that a row-group is starting so filters should be reset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Parquet uses row-group level bloom filter, so this just setup a new row-group for filters


BloomFilter* GetOrCreateBloomFilter(
int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) override;

/// Serialize all bloom filters with header and bitset in the order of row group and
/// column id. Column encryption is not implemented yet. The side effect is that it
/// deletes all bloom filters after they have been flushed.
void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) override;

void Finish() override { finished_ = true; }

private:
/// Make sure column ordinal is not out of bound and the builder is in good state.
void CheckState(int32_t column_ordinal) const {
if (finished_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (finished_) {
if (finished_) [[unlikely]] {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckState might be called for multiple times so adding [[unlikely]] is something worth doing?

Copy link
Member Author

@mapleFU mapleFU Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checked once per rowgroup, so I don't think this would be heavy

And I suspect that compiler can already well handle this under -O2: https://godbolt.org/z/6qvevr3G1

throw ParquetException("BloomFilterBuilder is already finished.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make this message more accurate reflect the user error (WriteTo called multiple times)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
throw ParquetException("Invalid column ordinal: ", column_ordinal);
}
if (row_group_bloom_filters_.empty()) {
throw ParquetException("No row group appended to BloomFilterBuilder.");
}
if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
throw ParquetException("BloomFilterBuilder not supports Boolean.");
}
}

const SchemaDescriptor* schema_;
WriterProperties properties_;
bool finished_ = false;

// vector: row_group_ordinal
// map: column_ordinal -> bloom filter
std::vector<std::map<int32_t, std::unique_ptr<BloomFilter>>> row_group_bloom_filters_;
};

std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
const SchemaDescriptor* schema, const WriterProperties& properties) {
return std::unique_ptr<BloomFilterBuilder>(
new BloomFilterBuilderImpl(schema, properties));
}

void BloomFilterBuilderImpl::AppendRowGroup() { row_group_bloom_filters_.emplace_back(); }

BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(
int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) {
CheckState(column_ordinal);
std::unique_ptr<BloomFilter>& bloom_filter =
row_group_bloom_filters_.back()[column_ordinal];
if (bloom_filter == nullptr) {
auto block_split_bloom_filter =
std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a recent discussion on the parquet mailing list about bloom filters and what good writers should do. My take-away was:

  1. Knowing NDV up-front typically requires two passes which I don't think we are doing. It might be better to take FPP and a byte size, and work out ndv if necessary.
  2. An extension of this idea (I think someone tried in Java, maybe it was you?) is to have multiple byte sizes (e.g. at log_2 intervals with an FPP) write to all of them and then choose the smallest one that is reasonably sparse.
  3. In either case it would likely be a good idea to evaluate the final bloom filters for sparcity before choosing to write them (this might be another config parameter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think someone tried in Java, maybe it was you?

Personally I think the best way is to buffering the hash values and making a decision later when hash value too much or buffer is too large. But personally I think we can first make a "static" config and enhance it later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed that PR and it could be a followup change. Writer implementation has the freedom to try smart things.

FYI, parquet-java also discards the bloom filter if dictionary encoding is applied to all data pages, though I don't think we should do the same thing.

bloom_filter_options.ndv, bloom_filter_options.fpp));
bloom_filter = std::move(block_split_bloom_filter);
}
return bloom_filter.get();
}

void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
BloomFilterLocation* location) {
if (!finished_) {
throw ParquetException("Cannot call WriteTo() to unfinished PageIndexBuilder.");
}
if (row_group_bloom_filters_.empty()) {
// Return quickly if there is no bloom filter
return;
}

for (size_t row_group_ordinal = 0; row_group_ordinal < row_group_bloom_filters_.size();
++row_group_ordinal) {
const auto& row_group_bloom_filters = row_group_bloom_filters_[row_group_ordinal];
// the whole row group has no bloom filter
if (row_group_bloom_filters.empty()) {
continue;
}
bool has_valid_bloom_filter = false;
int num_columns = schema_->num_columns();
std::vector<std::optional<IndexLocation>> locations(num_columns, std::nullopt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it maybe better to make this a map. I expect the number of columns with a bloom filter to be relatively small compared to the number of overall columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 This reuse some structure in PageIndex, however, I think IndexLocaction is just 9b, and even for parquet file with 10000 columns, the cost here is low(about 200kib) , so I think we can keep vector here?


// serialize bloom filter by ascending order of column id
for (int32_t column_id = 0; column_id < num_columns; ++column_id) {
auto iter = row_group_bloom_filters.find(column_id);
if (iter != row_group_bloom_filters.cend() && iter->second != nullptr) {
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
iter->second->WriteTo(sink);
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
has_valid_bloom_filter = true;
locations[column_id] = IndexLocation{offset, static_cast<int32_t>(pos - offset)};
}
}
if (has_valid_bloom_filter) {
location->bloom_filter_location.emplace(row_group_ordinal, std::move(locations));
}
}
}

} // namespace parquet
72 changes: 72 additions & 0 deletions cpp/src/parquet/bloom_filter_builder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This module defines an abstract interface for iterating through pages in a
// Parquet column chunk within a row group. It could be extended in the future
// to iterate through all data pages in all chunks in a file.

#pragma once

#include "arrow/io/interfaces.h"
#include "parquet/types.h"

namespace parquet {

class BloomFilter;
class SchemaDescriptor;
struct BloomFilterOptions;
struct BloomFilterLocation;

namespace schema {
class ColumnPath;
}

/// \brief Interface for collecting bloom filter of a parquet file.
class PARQUET_EXPORT BloomFilterBuilder {
public:
/// \brief API convenience to create a BloomFilterBuilder.
static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema,
const WriterProperties& properties);

/// Append a new row group to host all incoming bloom filters.
virtual void AppendRowGroup() = 0;

/// \brief Get the BloomFilter from column ordinal.
///
/// \param column_ordinal Column ordinal in schema, which is only for leaf columns.
/// \param bloom_filter_options The options(like num distinct values and false positive
/// rate) to create a BloomFilter.
///
/// \return BloomFilter for the column and its memory ownership belongs to the
/// BloomFilterBuilder.
virtual BloomFilter* GetOrCreateBloomFilter(
int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) = 0;

/// \brief Write the bloom filter to sink.
///
/// \param[out] sink The output stream to write the bloom filter.
/// \param[out] location The location of all bloom filter to the start of sink.
virtual void WriteTo(::arrow::io::OutputStream* sink,
BloomFilterLocation* location) = 0;

/// \brief Complete the bloom filter builder and no more write is allowed.
virtual void Finish() = 0;

virtual ~BloomFilterBuilder() = default;
};

} // namespace parquet
Loading