Skip to content
32 changes: 27 additions & 5 deletions cpp/include/cudf/io/experimental/hybrid_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,25 @@ enum class use_data_page_mask : bool {
* @endcode
*
* Row group pruning (OPTIONAL): Start with either a list of custom or all row group indices in the
* parquet file and optionally filter it subject to filter expression using column chunk statistics,
* dictionaries and bloom filters. Byte ranges for column chunk dictionary pages and bloom filters
* within parquet file may be obtained via `secondary_filters_byte_ranges()` function. The byte
* ranges may be read into a corresponding vector of device buffers and passed to the corresponding
* row group filtration function.
* parquet file and optionally filter it using a byte range and/or the filter expression using
* column chunk statistics, dictionaries and bloom filters. Byte ranges for column chunk dictionary
* pages and bloom filters within parquet file may be obtained via `secondary_filters_byte_ranges()`
* function. The byte ranges may be read into a corresponding vector of device buffers and passed to
* the corresponding row group filtration function.
* @code{.cpp}
* // Start with a list of all parquet row group indices from the file footer
* auto all_row_group_indices = reader->all_row_groups(options);
*
* // Span to track the indices of row groups currently at hand
* auto current_row_group_indices = cudf::host_span<size_type>(all_row_group_indices);
*
* // Optional: Prune row group indices to the ones that start within the byte range
* auto byte_range_filtered_row_group_indices = reader->filter_row_groups_with_byte_range(
* current_row_group_indices, options);
*
* // Update current row group indices to byte range filtered row group indices
* current_row_group_indices = byte_range_filtered_row_group_indices;
*
* // Optional: Prune row group indices subject to filter expression using row group statistics
* auto stats_filtered_row_group_indices =
* reader->filter_row_groups_with_stats(current_row_group_indices, options, stream);
Expand Down Expand Up @@ -335,6 +342,21 @@ class hybrid_scan_reader {
[[nodiscard]] size_type total_rows_in_row_groups(
cudf::host_span<size_type const> row_group_indices) const;

/**
* @brief Filter the row groups using the specified byte range specified by [`bytes_to_skip`,
* `bytes_to_skip + bytes_to_read`)
*
* Filters the row groups such that only the row groups that start within the byte range are
* selected. Note that the last selected row group may end beyond the byte range.
*
* @param row_group_indices Input row groups indices
* @param options Parquet reader options
* @return Filtered row group indices
*/
[[nodiscard]] std::vector<size_type> filter_row_groups_with_byte_range(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public API

cudf::host_span<size_type const> row_group_indices,
parquet_reader_options const& options) const;

/**
* @brief Filter the input row groups using column chunk statistics
*
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ void parquet_reader_options::set_num_rows(int64_t val)

void parquet_reader_options::set_skip_bytes(size_t val)
{
CUDF_EXPECTS(val == 0 or std::cmp_equal(_source.num_sources(), 1),
// Hybrid scan reader does not contain a source so relaxing this check to zero or one source
CUDF_EXPECTS(val == 0 or _source.num_sources() == 1 or _source.num_sources() == 0,
"skip_bytes can only be set for single parquet source case");
CUDF_EXPECTS(val == 0 or (not _num_rows.has_value() and _skip_rows == 0),
"skip_bytes cannot be set along with skip_rows and num_rows");
Expand All @@ -833,7 +834,9 @@ void parquet_reader_options::set_skip_bytes(size_t val)

void parquet_reader_options::set_num_bytes(size_t val)
{
CUDF_EXPECTS(std::cmp_equal(_source.num_sources(), 1),
// Hybrid scan reader does not contain a source so relaxing this check to zero or one source
CUDF_EXPECTS(val == std::numeric_limits<size_t>::max() or _source.num_sources() == 1 or
_source.num_sources() == 0,
"num_bytes can only be set for single parquet source case");
CUDF_EXPECTS(not _num_rows.has_value() and _skip_rows == 0,
"num_bytes cannot be set along with skip_rows and num_rows");
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ size_type hybrid_scan_reader::total_rows_in_row_groups(
return _impl->total_rows_in_row_groups(input_row_group_indices);
}

std::vector<size_type> hybrid_scan_reader::filter_row_groups_with_byte_range(
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
{
CUDF_FUNC_RANGE();

// Temporary vector with row group indices from the first source
auto const input_row_group_indices =
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};

return _impl->filter_row_groups_with_byte_range(input_row_group_indices, options).front();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call the impl API

}

std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_stats(
cudf::host_span<size_type const> row_group_indices,
parquet_reader_options const& options,
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ aggregate_reader_metadata::select_payload_columns(
timestamp_type_id);
}

std::vector<std::vector<cudf::size_type>>
aggregate_reader_metadata::filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::size_t bytes_to_skip,
std::optional<std::size_t> const& bytes_to_read) const
{
return apply_byte_bounds_filter(row_group_indices, bytes_to_skip, bytes_to_read);
}

std::vector<std::vector<cudf::size_type>> aggregate_reader_metadata::filter_row_groups_with_stats(
host_span<std::vector<cudf::size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,24 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
bool ignore_missing_columns,
type_id timestamp_type_id);

/**
* @brief Filters row groups such that only the row groups that start within the byte range
* specified by [`bytes_to_skip`, `bytes_to_skip + bytes_to_read`) are selected
*
* @note The last selected row group may end beyond the byte range.
*
* @param row_group_indices Input row groups indices
* @param bytes_to_skip Bytes to skip before selecting row groups
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
* until the end of the file are selected if not provided
*
* @return Filtered row group indices
*/
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::size_t bytes_to_skip,
std::optional<std::size_t> const& bytes_to_read) const;

/**
* @brief Filter the row groups with statistics based on predicate filter
*
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ size_type hybrid_scan_reader_impl::total_rows_in_row_groups(
return _extended_metadata->total_rows_in_row_groups(row_group_indices);
}

std::vector<std::vector<cudf::size_type>>
hybrid_scan_reader_impl::filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options) const
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");

if (options.get_skip_bytes() == 0 and not options.get_num_bytes().has_value()) {
return std::vector<std::vector<cudf::size_type>>{row_group_indices.begin(),
row_group_indices.end()};
}

return _extended_metadata->filter_row_groups_with_byte_range(
row_group_indices, options.get_skip_bytes(), options.get_num_bytes());
}

std::vector<std::vector<size_type>> hybrid_scan_reader_impl::filter_row_groups_with_stats(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options,
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
[[nodiscard]] size_type total_rows_in_row_groups(
cudf::host_span<std::vector<size_type> const> row_group_indices) const;

/**
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_byte_range
*/
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options) const;

/**
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_stats
*/
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,9 +1434,8 @@ aggregate_reader_metadata::select_row_groups(
}

// Flag to check if the row groups will be filtered using byte bounds
bool const is_byte_bounded_row_groups = row_group_indices.empty() and

(skip_bytes_opt > 0 or byte_count_opt.has_value());
bool const is_byte_bounded_row_groups =
row_group_indices.empty() and (skip_bytes_opt > 0 or byte_count_opt.has_value());

// We can't filter with both row bounds and byte bounds
CUDF_EXPECTS(not(is_row_bounded_row_groups and is_byte_bounded_row_groups),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ class aggregate_reader_metadata {
*
* @param input_row_group_indices Lists of input row groups, one per source
* @param bytes_to_skip Bytes to skip before selecting row groups
* @param bytes_to_read Bytes to select row groups after skipping
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
* until the end of the file are selected if not provided.
*
* @return A vector of surviving row group indices
*/
Expand Down
77 changes: 77 additions & 0 deletions cpp/tests/io/experimental/hybrid_scan_filters_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <src/io/parquet/parquet_gpu.hpp>

#include <filesystem>
#include <fstream>

namespace {

/**
Expand Down Expand Up @@ -225,6 +228,80 @@ TEST_F(HybridScanFiltersTest, TestExternalMetadata)
EXPECT_EQ(reader->total_rows_in_row_groups(input_row_group_indices), 2 * rows_per_row_group);
}

TEST_F(HybridScanFiltersTest, FilterRowGroupsWithByteRanges)
{
using T = cudf::string_view;
auto const [table, filepath] = create_parquet_typed_with_stats<T>("ByteBounds.parquet");

auto const file_size = std::filesystem::file_size(filepath);
std::vector<char> file_buffer(file_size);
std::ifstream file{filepath, std::ifstream::binary};
file.read(file_buffer.data(), file_size);
file.close();

// Input file buffer span
auto const file_buffer_span = cudf::host_span<uint8_t const>(
reinterpret_cast<uint8_t const*>(file_buffer.data()), file_buffer.size());

// Fetch footer and page index bytes from the buffer.
auto const footer_buffer = fetch_footer_bytes(file_buffer_span);

// Create hybrid scan reader with footer bytes
auto options = cudf::io::parquet_reader_options::builder().build();
auto const reader =
std::make_unique<cudf::io::parquet::experimental::hybrid_scan_reader>(footer_buffer, options);

auto const input_row_group_indices = reader->all_row_groups(options);

// @note: In the above parquet file, the row groups start at the following byte offsets: 4, 75224,
// 150332, 225561. The `skip_bytes` and `num_bytes` have been chosen to have enough cushion but
// may need to be adjusted in the future if this test suddenly starts failing.

{
// Start with all row groups and only read row group 0 as only it will start in [0, 1000) byte
// range
auto constexpr num_bytes = 1000;
options.set_num_bytes(num_bytes);
auto const filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
auto const expected_row_group_indices = std::vector<cudf::size_type>{0};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
}

{
// Start with all row groups and skip row group 0 as it won't start in [1000, inf) byte range
auto skip_bytes = 1000;
options.set_skip_bytes(skip_bytes);
options.set_num_bytes(std::numeric_limits<size_t>::max());
auto filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
auto expected_row_group_indices = std::vector<cudf::size_type>{1, 2, 3};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);

// Now start with filtered row groups and only read row group 1 as only it starts in [50000,
// 100000) byte range
skip_bytes = 50000;
auto constexpr num_bytes = 50000;
options.set_skip_bytes(skip_bytes);
options.set_num_bytes(num_bytes);
filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(filtered_row_group_indices, options);
expected_row_group_indices = std::vector<cudf::size_type>{1};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
}

{
// Start with all row groups and skip all row groups as [500000, inf) byte range is beyond the
// file size
auto constexpr skip_bytes = 500'000;
options.set_skip_bytes(skip_bytes);
auto const filtered_row_group_indices =
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
auto const expected_row_group_indices = std::vector<cudf::size_type>{};
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
}
}

TEST_F(HybridScanFiltersTest, FilterRowGroupsWithStats)
{
srand(0xc001);
Expand Down