diff --git a/cpp/include/cudf/io/experimental/hybrid_scan.hpp b/cpp/include/cudf/io/experimental/hybrid_scan.hpp index db65a0dd06f..96d913bfaea 100644 --- a/cpp/include/cudf/io/experimental/hybrid_scan.hpp +++ b/cpp/include/cudf/io/experimental/hybrid_scan.hpp @@ -113,11 +113,11 @@ enum class use_data_page_mask : bool { * @endcode * * Row group pruning (OPTIONAL): Start with either a list of custom or all row group indices in the - * parquet file and optionally filter it subject to filter expression using column chunk statistics, - * dictionaries and bloom filters. Byte ranges for column chunk dictionary pages and bloom filters - * within parquet file may be obtained via `secondary_filters_byte_ranges()` function. The byte - * ranges may be read into a corresponding vector of device buffers and passed to the corresponding - * row group filtration function. + * parquet file and optionally filter it using a byte range and/or the filter expression using + * column chunk statistics, dictionaries and bloom filters. Byte ranges for column chunk dictionary + * pages and bloom filters within parquet file may be obtained via `secondary_filters_byte_ranges()` + * function. The byte ranges may be read into a corresponding vector of device buffers and passed to + * the corresponding row group filtration function. * @code{.cpp} * // Start with a list of all parquet row group indices from the file footer * auto all_row_group_indices = reader->all_row_groups(options); @@ -125,6 +125,13 @@ enum class use_data_page_mask : bool { * // Span to track the indices of row groups currently at hand * auto current_row_group_indices = cudf::host_span(all_row_group_indices); * + * // Optional: Prune row group indices to the ones that start within the byte range + * auto byte_range_filtered_row_group_indices = reader->filter_row_groups_with_byte_range( + * current_row_group_indices, options); + * + * // Update current row group indices to byte range filtered row group indices + * current_row_group_indices = byte_range_filtered_row_group_indices; + * * // Optional: Prune row group indices subject to filter expression using row group statistics * auto stats_filtered_row_group_indices = * reader->filter_row_groups_with_stats(current_row_group_indices, options, stream); @@ -335,6 +342,21 @@ class hybrid_scan_reader { [[nodiscard]] size_type total_rows_in_row_groups( cudf::host_span row_group_indices) const; + /** + * @brief Filter the row groups using the specified byte range specified by [`bytes_to_skip`, + * `bytes_to_skip + bytes_to_read`) + * + * Filters the row groups such that only the row groups that start within the byte range are + * selected. Note that the last selected row group may end beyond the byte range. + * + * @param row_group_indices Input row groups indices + * @param options Parquet reader options + * @return Filtered row group indices + */ + [[nodiscard]] std::vector filter_row_groups_with_byte_range( + cudf::host_span row_group_indices, + parquet_reader_options const& options) const; + /** * @brief Filter the input row groups using column chunk statistics * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 55aa7a9ba24..a8e7d1a2494 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -821,7 +821,8 @@ void parquet_reader_options::set_num_rows(int64_t val) void parquet_reader_options::set_skip_bytes(size_t val) { - CUDF_EXPECTS(val == 0 or std::cmp_equal(_source.num_sources(), 1), + // Hybrid scan reader does not contain a source so relaxing this check to zero or one source + CUDF_EXPECTS(val == 0 or _source.num_sources() == 1 or _source.num_sources() == 0, "skip_bytes can only be set for single parquet source case"); CUDF_EXPECTS(val == 0 or (not _num_rows.has_value() and _skip_rows == 0), "skip_bytes cannot be set along with skip_rows and num_rows"); @@ -833,7 +834,9 @@ void parquet_reader_options::set_skip_bytes(size_t val) void parquet_reader_options::set_num_bytes(size_t val) { - CUDF_EXPECTS(std::cmp_equal(_source.num_sources(), 1), + // Hybrid scan reader does not contain a source so relaxing this check to zero or one source + CUDF_EXPECTS(val == std::numeric_limits::max() or _source.num_sources() == 1 or + _source.num_sources() == 0, "num_bytes can only be set for single parquet source case"); CUDF_EXPECTS(not _num_rows.has_value() and _skip_rows == 0, "num_bytes cannot be set along with skip_rows and num_rows"); diff --git a/cpp/src/io/parquet/experimental/hybrid_scan.cpp b/cpp/src/io/parquet/experimental/hybrid_scan.cpp index 49a1e6d1f30..4676ed1f43d 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan.cpp @@ -74,6 +74,18 @@ size_type hybrid_scan_reader::total_rows_in_row_groups( return _impl->total_rows_in_row_groups(input_row_group_indices); } +std::vector hybrid_scan_reader::filter_row_groups_with_byte_range( + cudf::host_span row_group_indices, parquet_reader_options const& options) const +{ + CUDF_FUNC_RANGE(); + + // Temporary vector with row group indices from the first source + auto const input_row_group_indices = + std::vector>{{row_group_indices.begin(), row_group_indices.end()}}; + + return _impl->filter_row_groups_with_byte_range(input_row_group_indices, options).front(); +} + std::vector hybrid_scan_reader::filter_row_groups_with_stats( cudf::host_span row_group_indices, parquet_reader_options const& options, diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index 3818e5ed36f..5642852267b 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -279,6 +279,15 @@ aggregate_reader_metadata::select_payload_columns( timestamp_type_id); } +std::vector> +aggregate_reader_metadata::filter_row_groups_with_byte_range( + cudf::host_span const> row_group_indices, + std::size_t bytes_to_skip, + std::optional const& bytes_to_read) const +{ + return apply_byte_bounds_filter(row_group_indices, bytes_to_skip, bytes_to_read); +} + std::vector> aggregate_reader_metadata::filter_row_groups_with_stats( host_span const> row_group_indices, host_span output_dtypes, diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp index bce18919003..df4b1175594 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp @@ -146,6 +146,24 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base { bool ignore_missing_columns, type_id timestamp_type_id); + /** + * @brief Filters row groups such that only the row groups that start within the byte range + * specified by [`bytes_to_skip`, `bytes_to_skip + bytes_to_read`) are selected + * + * @note The last selected row group may end beyond the byte range. + * + * @param row_group_indices Input row groups indices + * @param bytes_to_skip Bytes to skip before selecting row groups + * @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups + * until the end of the file are selected if not provided + * + * @return Filtered row group indices + */ + [[nodiscard]] std::vector> filter_row_groups_with_byte_range( + cudf::host_span const> row_group_indices, + std::size_t bytes_to_skip, + std::optional const& bytes_to_read) const; + /** * @brief Filter the row groups with statistics based on predicate filter * diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp index 82f7d619a2b..93e60cdb9e7 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp @@ -168,6 +168,22 @@ size_type hybrid_scan_reader_impl::total_rows_in_row_groups( return _extended_metadata->total_rows_in_row_groups(row_group_indices); } +std::vector> +hybrid_scan_reader_impl::filter_row_groups_with_byte_range( + cudf::host_span const> row_group_indices, + parquet_reader_options const& options) const +{ + CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered"); + + if (options.get_skip_bytes() == 0 and not options.get_num_bytes().has_value()) { + return std::vector>{row_group_indices.begin(), + row_group_indices.end()}; + } + + return _extended_metadata->filter_row_groups_with_byte_range( + row_group_indices, options.get_skip_bytes(), options.get_num_bytes()); +} + std::vector> hybrid_scan_reader_impl::filter_row_groups_with_stats( cudf::host_span const> row_group_indices, parquet_reader_options const& options, diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp index 219428df37a..85ebb5c89fb 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp @@ -85,6 +85,13 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { [[nodiscard]] size_type total_rows_in_row_groups( cudf::host_span const> row_group_indices) const; + /** + * @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_byte_range + */ + [[nodiscard]] std::vector> filter_row_groups_with_byte_range( + cudf::host_span const> row_group_indices, + parquet_reader_options const& options) const; + /** * @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_stats */ diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 9c73f5c22d8..6391b31cdec 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1434,9 +1434,8 @@ aggregate_reader_metadata::select_row_groups( } // Flag to check if the row groups will be filtered using byte bounds - bool const is_byte_bounded_row_groups = row_group_indices.empty() and - - (skip_bytes_opt > 0 or byte_count_opt.has_value()); + bool const is_byte_bounded_row_groups = + row_group_indices.empty() and (skip_bytes_opt > 0 or byte_count_opt.has_value()); // We can't filter with both row bounds and byte bounds CUDF_EXPECTS(not(is_row_bounded_row_groups and is_byte_bounded_row_groups), diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4b431ce45bd..2357836a6db 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -267,7 +267,8 @@ class aggregate_reader_metadata { * * @param input_row_group_indices Lists of input row groups, one per source * @param bytes_to_skip Bytes to skip before selecting row groups - * @param bytes_to_read Bytes to select row groups after skipping + * @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups + * until the end of the file are selected if not provided. * * @return A vector of surviving row group indices */ diff --git a/cpp/tests/io/experimental/hybrid_scan_filters_test.cpp b/cpp/tests/io/experimental/hybrid_scan_filters_test.cpp index f2235e96d32..cde685fde32 100644 --- a/cpp/tests/io/experimental/hybrid_scan_filters_test.cpp +++ b/cpp/tests/io/experimental/hybrid_scan_filters_test.cpp @@ -18,6 +18,9 @@ #include +#include +#include + namespace { /** @@ -225,6 +228,80 @@ TEST_F(HybridScanFiltersTest, TestExternalMetadata) EXPECT_EQ(reader->total_rows_in_row_groups(input_row_group_indices), 2 * rows_per_row_group); } +TEST_F(HybridScanFiltersTest, FilterRowGroupsWithByteRanges) +{ + using T = cudf::string_view; + auto const [table, filepath] = create_parquet_typed_with_stats("ByteBounds.parquet"); + + auto const file_size = std::filesystem::file_size(filepath); + std::vector file_buffer(file_size); + std::ifstream file{filepath, std::ifstream::binary}; + file.read(file_buffer.data(), file_size); + file.close(); + + // Input file buffer span + auto const file_buffer_span = cudf::host_span( + reinterpret_cast(file_buffer.data()), file_buffer.size()); + + // Fetch footer and page index bytes from the buffer. + auto const footer_buffer = fetch_footer_bytes(file_buffer_span); + + // Create hybrid scan reader with footer bytes + auto options = cudf::io::parquet_reader_options::builder().build(); + auto const reader = + std::make_unique(footer_buffer, options); + + auto const input_row_group_indices = reader->all_row_groups(options); + + // @note: In the above parquet file, the row groups start at the following byte offsets: 4, 75224, + // 150332, 225561. The `skip_bytes` and `num_bytes` have been chosen to have enough cushion but + // may need to be adjusted in the future if this test suddenly starts failing. + + { + // Start with all row groups and only read row group 0 as only it will start in [0, 1000) byte + // range + auto constexpr num_bytes = 1000; + options.set_num_bytes(num_bytes); + auto const filtered_row_group_indices = + reader->filter_row_groups_with_byte_range(input_row_group_indices, options); + auto const expected_row_group_indices = std::vector{0}; + EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices); + } + + { + // Start with all row groups and skip row group 0 as it won't start in [1000, inf) byte range + auto skip_bytes = 1000; + options.set_skip_bytes(skip_bytes); + options.set_num_bytes(std::numeric_limits::max()); + auto filtered_row_group_indices = + reader->filter_row_groups_with_byte_range(input_row_group_indices, options); + auto expected_row_group_indices = std::vector{1, 2, 3}; + EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices); + + // Now start with filtered row groups and only read row group 1 as only it starts in [50000, + // 100000) byte range + skip_bytes = 50000; + auto constexpr num_bytes = 50000; + options.set_skip_bytes(skip_bytes); + options.set_num_bytes(num_bytes); + filtered_row_group_indices = + reader->filter_row_groups_with_byte_range(filtered_row_group_indices, options); + expected_row_group_indices = std::vector{1}; + EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices); + } + + { + // Start with all row groups and skip all row groups as [500000, inf) byte range is beyond the + // file size + auto constexpr skip_bytes = 500'000; + options.set_skip_bytes(skip_bytes); + auto const filtered_row_group_indices = + reader->filter_row_groups_with_byte_range(input_row_group_indices, options); + auto const expected_row_group_indices = std::vector{}; + EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices); + } +} + TEST_F(HybridScanFiltersTest, FilterRowGroupsWithStats) { srand(0xc001);