Skip to content
32 changes: 27 additions & 5 deletions cpp/include/cudf/io/experimental/hybrid_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,25 @@ enum class use_data_page_mask : bool {
* @endcode
*
* Row group pruning (OPTIONAL): Start with either a list of custom or all row group indices in the
* parquet file and optionally filter it subject to filter expression using column chunk statistics,
* dictionaries and bloom filters. Byte ranges for column chunk dictionary pages and bloom filters
* within parquet file may be obtained via `secondary_filters_byte_ranges()` function. The byte
* ranges may be read into a corresponding vector of device buffers and passed to the corresponding
* row group filtration function.
* parquet file and optionally filter it using a byte range and/or the filter expression using
* column chunk statistics, dictionaries and bloom filters. Byte ranges for column chunk dictionary
* pages and bloom filters within parquet file may be obtained via `secondary_filters_byte_ranges()`
* function. The byte ranges may be read into a corresponding vector of device buffers and passed to
* the corresponding row group filtration function.
* @code{.cpp}
* // Start with a list of all parquet row group indices from the file footer
* auto all_row_group_indices = reader->all_row_groups(options);
*
* // Span to track the indices of row groups currently at hand
* auto current_row_group_indices = cudf::host_span<size_type>(all_row_group_indices);
*
* // Optional: Prune row group indices to the ones that start within the byte range
* auto byte_range_filtered_row_group_indices = reader->filter_row_groups_with_byte_range(
* current_row_group_indices, options);
*
* // Update current row group indices to byte range filtered row group indices
* current_row_group_indices = byte_range_filtered_row_group_indices;
*
* // Optional: Prune row group indices subject to filter expression using row group statistics
* auto stats_filtered_row_group_indices =
* reader->filter_row_groups_with_stats(current_row_group_indices, options, stream);
Expand Down Expand Up @@ -335,6 +342,21 @@ class hybrid_scan_reader {
[[nodiscard]] size_type total_rows_in_row_groups(
cudf::host_span<size_type const> row_group_indices) const;

/**
* @brief Filter the row groups using the specified byte range specified by [`bytes_to_skip`,
* `bytes_to_skip + bytes_to_read`)
*
* Filters the row groups such that only the row groups that start within the byte range are
* selected. Note that the last selected row group may end beyond the byte range.
*
* @param row_group_indices Input row groups indices
* @param options Parquet reader options
* @return Filtered row group indices
*/
[[nodiscard]] std::vector<size_type> filter_row_groups_with_byte_range(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public API

cudf::host_span<size_type const> row_group_indices,
parquet_reader_options const& options) const;

/**
* @brief Filter the input row groups using column chunk statistics
*
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ void parquet_reader_options::set_num_rows(int64_t val)

void parquet_reader_options::set_skip_bytes(size_t val)
{
CUDF_EXPECTS(val == 0 or std::cmp_equal(_source.num_sources(), 1),
CUDF_EXPECTS(val == 0 or std::cmp_less_equal(_source.num_sources(), 1),
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no internal source in case of the hybrid scan reader so relaxing the condition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit weird, but it's fine
maybe a comment above?

"skip_bytes can only be set for single parquet source case");
CUDF_EXPECTS(val == 0 or (not _num_rows.has_value() and _skip_rows == 0),
"skip_bytes cannot be set along with skip_rows and num_rows");
Expand All @@ -833,7 +833,7 @@ void parquet_reader_options::set_skip_bytes(size_t val)

void parquet_reader_options::set_num_bytes(size_t val)
{
CUDF_EXPECTS(std::cmp_equal(_source.num_sources(), 1),
CUDF_EXPECTS(std::cmp_less_equal(_source.num_sources(), 1),
"num_bytes can only be set for single parquet source case");
CUDF_EXPECTS(not _num_rows.has_value() and _skip_rows == 0,
"num_bytes cannot be set along with skip_rows and num_rows");
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ size_type hybrid_scan_reader::total_rows_in_row_groups(
return _impl->total_rows_in_row_groups(input_row_group_indices);
}

std::vector<size_type> hybrid_scan_reader::filter_row_groups_with_byte_range(
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
{
CUDF_FUNC_RANGE();

// Temporary vector with row group indices from the first source
auto const input_row_group_indices =
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};

return _impl->filter_row_groups_with_byte_range(input_row_group_indices, options).front();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call the impl API

}

std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_stats(
cudf::host_span<size_type const> row_group_indices,
parquet_reader_options const& options,
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ aggregate_reader_metadata::select_payload_columns(
timestamp_type_id);
}

std::vector<std::vector<cudf::size_type>>
aggregate_reader_metadata::filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::size_t bytes_to_skip,
std::optional<std::size_t> const& bytes_to_read) const
{
return apply_byte_bounds_filter(row_group_indices, bytes_to_skip, bytes_to_read);
}

std::vector<std::vector<cudf::size_type>> aggregate_reader_metadata::filter_row_groups_with_stats(
host_span<std::vector<cudf::size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,24 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
bool ignore_missing_columns,
type_id timestamp_type_id);

/**
* @brief Filters row groups such that only the row groups that start within the byte range
* specified by [`bytes_to_skip`, `bytes_to_skip + bytes_to_read`) are selected
*
* @note The last selected row group may end beyond the byte range.
*
* @param row_group_indices Input row groups indices
* @param bytes_to_skip Bytes to skip before selecting row groups
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
* until the end of the file are selected if not provided
*
* @return Filtered row group indices
*/
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::size_t bytes_to_skip,
std::optional<std::size_t> const& bytes_to_read) const;

/**
* @brief Filter the row groups with statistics based on predicate filter
*
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ size_type hybrid_scan_reader_impl::total_rows_in_row_groups(
return _extended_metadata->total_rows_in_row_groups(row_group_indices);
}

std::vector<std::vector<cudf::size_type>>
hybrid_scan_reader_impl::filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options) const
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");

if (options.get_skip_bytes() == 0 and not options.get_num_bytes().has_value()) {
return std::vector<std::vector<cudf::size_type>>{row_group_indices.begin(),
row_group_indices.end()};
}

return _extended_metadata->filter_row_groups_with_byte_range(
row_group_indices, options.get_skip_bytes(), options.get_num_bytes());
}

std::vector<std::vector<size_type>> hybrid_scan_reader_impl::filter_row_groups_with_stats(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options,
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
[[nodiscard]] size_type total_rows_in_row_groups(
cudf::host_span<std::vector<size_type> const> row_group_indices) const;

/**
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_byte_range
*/
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options) const;

/**
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_stats
*/
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,9 +1434,8 @@ aggregate_reader_metadata::select_row_groups(
}

// Flag to check if the row groups will be filtered using byte bounds
bool const is_byte_bounded_row_groups = row_group_indices.empty() and

(skip_bytes_opt > 0 or byte_count_opt.has_value());
bool const is_byte_bounded_row_groups =
row_group_indices.empty() and (skip_bytes_opt > 0 or byte_count_opt.has_value());

// We can't filter with both row bounds and byte bounds
CUDF_EXPECTS(not(is_row_bounded_row_groups and is_byte_bounded_row_groups),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ class aggregate_reader_metadata {
*
* @param input_row_group_indices Lists of input row groups, one per source
* @param bytes_to_skip Bytes to skip before selecting row groups
* @param bytes_to_read Bytes to select row groups after skipping
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
* until the end of the file are selected if not provided.
*
* @return A vector of surviving row group indices
*/
Expand Down
66 changes: 66 additions & 0 deletions cpp/tests/io/experimental/hybrid_scan_filters_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <src/io/parquet/parquet_gpu.hpp>

#include <filesystem>
#include <fstream>

namespace {

/**
Expand Down Expand Up @@ -225,6 +228,69 @@ TEST_F(HybridScanFiltersTest, TestExternalMetadata)
EXPECT_EQ(reader->total_rows_in_row_groups(input_row_group_indices), 2 * rows_per_row_group);
}

TEST_F(HybridScanFiltersTest, FilterRowGroupsWithByteRanges)
{
using T = cudf::string_view;
auto const [table, filepath] = create_parquet_typed_with_stats<T>("ByteBounds.parquet");

auto const file_size = std::filesystem::file_size(filepath);
std::vector<char> file_buffer(file_size);
std::ifstream file{filepath, std::ifstream::binary};
file.read(file_buffer.data(), file_size);
file.close();

// Input file buffer span
auto const file_buffer_span = cudf::host_span<uint8_t const>(
reinterpret_cast<uint8_t const*>(file_buffer.data()), file_buffer.size());

// Fetch footer and page index bytes from the buffer.
auto const footer_buffer = fetch_footer_bytes(file_buffer_span);

// Create hybrid scan reader with footer bytes
auto options = cudf::io::parquet_reader_options::builder().build();
auto const reader =
std::make_unique<cudf::io::parquet::experimental::hybrid_scan_reader>(footer_buffer, options);

auto const input_row_group_indices = reader->all_row_groups(options);

// @note: In the above parquet file, the row groups start at the following byte offsets: 4, 75224,
// 150332, 225561. The `skip_bytes` and `num_bytes` have been chosen to have enough cushion but
// may need to be adjusted in the future if this test suddenly starts failing.

{
// Start with all row groups and only read row group 0 as only it will start in [0, 1000) byte
// range
auto constexpr num_bytes = 1000;
options.set_num_bytes(num_bytes);
auto const filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
auto const expected_row_group_indices = std::vector<cudf::size_type>{0};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
}

{
// Start with all row groups and skip row group 0 as it won't start in [1000, inf) byte range
auto skip_bytes = 1000;
options.set_skip_bytes(skip_bytes);
options.set_num_bytes(std::numeric_limits<size_t>::max());
auto filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
auto expected_row_group_indices = std::vector<cudf::size_type>{1, 2, 3};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);

// Now start with filtered row groups and only read row group 1 as only it starts in [50000,
// 100000) byte range
skip_bytes = 50000;
auto constexpr num_bytes = 50000;
options.set_skip_bytes(skip_bytes);
options.set_num_bytes(num_bytes);
filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(filtered_row_group_indices, options);
expected_row_group_indices = std::vector<cudf::size_type>{1};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
}
}

TEST_F(HybridScanFiltersTest, FilterRowGroupsWithStats)
{
srand(0xc001);
Expand Down
Loading