Skip to content

Commit ce70316

Browse files
authored
Filter row groups using byte range in the new experimental parquet reader (#20733)
This PR implements filtering row groups using byte ranges in the new experimental parquet reader using the existing filtering APIs from the main parquet reader Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - David Wendt (https://github.com/davidwendt) - Vukasin Milovanovic (https://github.com/vuule) URL: #20733
1 parent 4d01c25 commit ce70316

File tree

10 files changed

+175
-11
lines changed

10 files changed

+175
-11
lines changed

cpp/include/cudf/io/experimental/hybrid_scan.hpp

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,25 @@ enum class use_data_page_mask : bool {
113113
* @endcode
114114
*
115115
* Row group pruning (OPTIONAL): Start with either a list of custom or all row group indices in the
116-
* parquet file and optionally filter it subject to filter expression using column chunk statistics,
117-
* dictionaries and bloom filters. Byte ranges for column chunk dictionary pages and bloom filters
118-
* within parquet file may be obtained via `secondary_filters_byte_ranges()` function. The byte
119-
* ranges may be read into a corresponding vector of device buffers and passed to the corresponding
120-
* row group filtration function.
116+
* parquet file and optionally filter it using a byte range and/or the filter expression using
117+
* column chunk statistics, dictionaries and bloom filters. Byte ranges for column chunk dictionary
118+
* pages and bloom filters within parquet file may be obtained via `secondary_filters_byte_ranges()`
119+
* function. The byte ranges may be read into a corresponding vector of device buffers and passed to
120+
* the corresponding row group filtration function.
121121
* @code{.cpp}
122122
* // Start with a list of all parquet row group indices from the file footer
123123
* auto all_row_group_indices = reader->all_row_groups(options);
124124
*
125125
* // Span to track the indices of row groups currently at hand
126126
* auto current_row_group_indices = cudf::host_span<size_type>(all_row_group_indices);
127127
*
128+
* // Optional: Prune row group indices to the ones that start within the byte range
129+
* auto byte_range_filtered_row_group_indices = reader->filter_row_groups_with_byte_range(
130+
* current_row_group_indices, options);
131+
*
132+
* // Update current row group indices to byte range filtered row group indices
133+
* current_row_group_indices = byte_range_filtered_row_group_indices;
134+
*
128135
* // Optional: Prune row group indices subject to filter expression using row group statistics
129136
* auto stats_filtered_row_group_indices =
130137
* reader->filter_row_groups_with_stats(current_row_group_indices, options, stream);
@@ -335,6 +342,21 @@ class hybrid_scan_reader {
335342
[[nodiscard]] size_type total_rows_in_row_groups(
336343
cudf::host_span<size_type const> row_group_indices) const;
337344

345+
/**
346+
* @brief Filter the row groups using the specified byte range specified by [`bytes_to_skip`,
347+
* `bytes_to_skip + bytes_to_read`)
348+
*
349+
* Filters the row groups such that only the row groups that start within the byte range are
350+
* selected. Note that the last selected row group may end beyond the byte range.
351+
*
352+
* @param row_group_indices Input row groups indices
353+
* @param options Parquet reader options
354+
* @return Filtered row group indices
355+
*/
356+
[[nodiscard]] std::vector<size_type> filter_row_groups_with_byte_range(
357+
cudf::host_span<size_type const> row_group_indices,
358+
parquet_reader_options const& options) const;
359+
338360
/**
339361
* @brief Filter the input row groups using column chunk statistics
340362
*

cpp/src/io/functions.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,8 @@ void parquet_reader_options::set_num_rows(int64_t val)
821821

822822
void parquet_reader_options::set_skip_bytes(size_t val)
823823
{
824-
CUDF_EXPECTS(val == 0 or std::cmp_equal(_source.num_sources(), 1),
824+
// Hybrid scan reader does not contain a source so relaxing this check to zero or one source
825+
CUDF_EXPECTS(val == 0 or _source.num_sources() == 1 or _source.num_sources() == 0,
825826
"skip_bytes can only be set for single parquet source case");
826827
CUDF_EXPECTS(val == 0 or (not _num_rows.has_value() and _skip_rows == 0),
827828
"skip_bytes cannot be set along with skip_rows and num_rows");
@@ -833,7 +834,9 @@ void parquet_reader_options::set_skip_bytes(size_t val)
833834

834835
void parquet_reader_options::set_num_bytes(size_t val)
835836
{
836-
CUDF_EXPECTS(std::cmp_equal(_source.num_sources(), 1),
837+
// Hybrid scan reader does not contain a source so relaxing this check to zero or one source
838+
CUDF_EXPECTS(val == std::numeric_limits<size_t>::max() or _source.num_sources() == 1 or
839+
_source.num_sources() == 0,
837840
"num_bytes can only be set for single parquet source case");
838841
CUDF_EXPECTS(not _num_rows.has_value() and _skip_rows == 0,
839842
"num_bytes cannot be set along with skip_rows and num_rows");

cpp/src/io/parquet/experimental/hybrid_scan.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@ size_type hybrid_scan_reader::total_rows_in_row_groups(
7474
return _impl->total_rows_in_row_groups(input_row_group_indices);
7575
}
7676

77+
std::vector<size_type> hybrid_scan_reader::filter_row_groups_with_byte_range(
78+
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
79+
{
80+
CUDF_FUNC_RANGE();
81+
82+
// Temporary vector with row group indices from the first source
83+
auto const input_row_group_indices =
84+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
85+
86+
return _impl->filter_row_groups_with_byte_range(input_row_group_indices, options).front();
87+
}
88+
7789
std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_stats(
7890
cudf::host_span<size_type const> row_group_indices,
7991
parquet_reader_options const& options,

cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,15 @@ aggregate_reader_metadata::select_payload_columns(
279279
timestamp_type_id);
280280
}
281281

282+
std::vector<std::vector<cudf::size_type>>
283+
aggregate_reader_metadata::filter_row_groups_with_byte_range(
284+
cudf::host_span<std::vector<size_type> const> row_group_indices,
285+
std::size_t bytes_to_skip,
286+
std::optional<std::size_t> const& bytes_to_read) const
287+
{
288+
return apply_byte_bounds_filter(row_group_indices, bytes_to_skip, bytes_to_read);
289+
}
290+
282291
std::vector<std::vector<cudf::size_type>> aggregate_reader_metadata::filter_row_groups_with_stats(
283292
host_span<std::vector<cudf::size_type> const> row_group_indices,
284293
host_span<data_type const> output_dtypes,

cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
146146
bool ignore_missing_columns,
147147
type_id timestamp_type_id);
148148

149+
/**
150+
* @brief Filters row groups such that only the row groups that start within the byte range
151+
* specified by [`bytes_to_skip`, `bytes_to_skip + bytes_to_read`) are selected
152+
*
153+
* @note The last selected row group may end beyond the byte range.
154+
*
155+
* @param row_group_indices Input row groups indices
156+
* @param bytes_to_skip Bytes to skip before selecting row groups
157+
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
158+
* until the end of the file are selected if not provided
159+
*
160+
* @return Filtered row group indices
161+
*/
162+
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
163+
cudf::host_span<std::vector<size_type> const> row_group_indices,
164+
std::size_t bytes_to_skip,
165+
std::optional<std::size_t> const& bytes_to_read) const;
166+
149167
/**
150168
* @brief Filter the row groups with statistics based on predicate filter
151169
*

cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,22 @@ size_type hybrid_scan_reader_impl::total_rows_in_row_groups(
168168
return _extended_metadata->total_rows_in_row_groups(row_group_indices);
169169
}
170170

171+
std::vector<std::vector<cudf::size_type>>
172+
hybrid_scan_reader_impl::filter_row_groups_with_byte_range(
173+
cudf::host_span<std::vector<size_type> const> row_group_indices,
174+
parquet_reader_options const& options) const
175+
{
176+
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
177+
178+
if (options.get_skip_bytes() == 0 and not options.get_num_bytes().has_value()) {
179+
return std::vector<std::vector<cudf::size_type>>{row_group_indices.begin(),
180+
row_group_indices.end()};
181+
}
182+
183+
return _extended_metadata->filter_row_groups_with_byte_range(
184+
row_group_indices, options.get_skip_bytes(), options.get_num_bytes());
185+
}
186+
171187
std::vector<std::vector<size_type>> hybrid_scan_reader_impl::filter_row_groups_with_stats(
172188
cudf::host_span<std::vector<size_type> const> row_group_indices,
173189
parquet_reader_options const& options,

cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
8585
[[nodiscard]] size_type total_rows_in_row_groups(
8686
cudf::host_span<std::vector<size_type> const> row_group_indices) const;
8787

88+
/**
89+
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_byte_range
90+
*/
91+
[[nodiscard]] std::vector<std::vector<cudf::size_type>> filter_row_groups_with_byte_range(
92+
cudf::host_span<std::vector<size_type> const> row_group_indices,
93+
parquet_reader_options const& options) const;
94+
8895
/**
8996
* @copydoc cudf::io::experimental::hybrid_scan::filter_row_groups_with_stats
9097
*/

cpp/src/io/parquet/reader_impl_helpers.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,9 +1434,8 @@ aggregate_reader_metadata::select_row_groups(
14341434
}
14351435

14361436
// Flag to check if the row groups will be filtered using byte bounds
1437-
bool const is_byte_bounded_row_groups = row_group_indices.empty() and
1438-
1439-
(skip_bytes_opt > 0 or byte_count_opt.has_value());
1437+
bool const is_byte_bounded_row_groups =
1438+
row_group_indices.empty() and (skip_bytes_opt > 0 or byte_count_opt.has_value());
14401439

14411440
// We can't filter with both row bounds and byte bounds
14421441
CUDF_EXPECTS(not(is_row_bounded_row_groups and is_byte_bounded_row_groups),

cpp/src/io/parquet/reader_impl_helpers.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ class aggregate_reader_metadata {
267267
*
268268
* @param input_row_group_indices Lists of input row groups, one per source
269269
* @param bytes_to_skip Bytes to skip before selecting row groups
270-
* @param bytes_to_read Bytes to select row groups after skipping
270+
* @param bytes_to_read Optional bytes to select row groups from after skipping. All row groups
271+
* until the end of the file are selected if not provided.
271272
*
272273
* @return A vector of surviving row group indices
273274
*/

cpp/tests/io/experimental/hybrid_scan_filters_test.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
#include <src/io/parquet/parquet_gpu.hpp>
2020

21+
#include <filesystem>
22+
#include <fstream>
23+
2124
namespace {
2225

2326
/**
@@ -225,6 +228,80 @@ TEST_F(HybridScanFiltersTest, TestExternalMetadata)
225228
EXPECT_EQ(reader->total_rows_in_row_groups(input_row_group_indices), 2 * rows_per_row_group);
226229
}
227230

231+
TEST_F(HybridScanFiltersTest, FilterRowGroupsWithByteRanges)
232+
{
233+
using T = cudf::string_view;
234+
auto const [table, filepath] = create_parquet_typed_with_stats<T>("ByteBounds.parquet");
235+
236+
auto const file_size = std::filesystem::file_size(filepath);
237+
std::vector<char> file_buffer(file_size);
238+
std::ifstream file{filepath, std::ifstream::binary};
239+
file.read(file_buffer.data(), file_size);
240+
file.close();
241+
242+
// Input file buffer span
243+
auto const file_buffer_span = cudf::host_span<uint8_t const>(
244+
reinterpret_cast<uint8_t const*>(file_buffer.data()), file_buffer.size());
245+
246+
// Fetch footer and page index bytes from the buffer.
247+
auto const footer_buffer = fetch_footer_bytes(file_buffer_span);
248+
249+
// Create hybrid scan reader with footer bytes
250+
auto options = cudf::io::parquet_reader_options::builder().build();
251+
auto const reader =
252+
std::make_unique<cudf::io::parquet::experimental::hybrid_scan_reader>(footer_buffer, options);
253+
254+
auto const input_row_group_indices = reader->all_row_groups(options);
255+
256+
// @note: In the above parquet file, the row groups start at the following byte offsets: 4, 75224,
257+
// 150332, 225561. The `skip_bytes` and `num_bytes` have been chosen to have enough cushion but
258+
// may need to be adjusted in the future if this test suddenly starts failing.
259+
260+
{
261+
// Start with all row groups and only read row group 0 as only it will start in [0, 1000) byte
262+
// range
263+
auto constexpr num_bytes = 1000;
264+
options.set_num_bytes(num_bytes);
265+
auto const filtered_row_group_indices =
266+
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
267+
auto const expected_row_group_indices = std::vector<cudf::size_type>{0};
268+
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
269+
}
270+
271+
{
272+
// Start with all row groups and skip row group 0 as it won't start in [1000, inf) byte range
273+
auto skip_bytes = 1000;
274+
options.set_skip_bytes(skip_bytes);
275+
options.set_num_bytes(std::numeric_limits<size_t>::max());
276+
auto filtered_row_group_indices =
277+
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
278+
auto expected_row_group_indices = std::vector<cudf::size_type>{1, 2, 3};
279+
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
280+
281+
// Now start with filtered row groups and only read row group 1 as only it starts in [50000,
282+
// 100000) byte range
283+
skip_bytes = 50000;
284+
auto constexpr num_bytes = 50000;
285+
options.set_skip_bytes(skip_bytes);
286+
options.set_num_bytes(num_bytes);
287+
filtered_row_group_indices =
288+
reader->filter_row_groups_with_byte_range(filtered_row_group_indices, options);
289+
expected_row_group_indices = std::vector<cudf::size_type>{1};
290+
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
291+
}
292+
293+
{
294+
// Start with all row groups and skip all row groups as [500000, inf) byte range is beyond the
295+
// file size
296+
auto constexpr skip_bytes = 500'000;
297+
options.set_skip_bytes(skip_bytes);
298+
auto const filtered_row_group_indices =
299+
reader->filter_row_groups_with_byte_range(input_row_group_indices, options);
300+
auto const expected_row_group_indices = std::vector<cudf::size_type>{};
301+
EXPECT_EQ(filtered_row_group_indices, expected_row_group_indices);
302+
}
303+
}
304+
228305
TEST_F(HybridScanFiltersTest, FilterRowGroupsWithStats)
229306
{
230307
srand(0xc001);

0 commit comments

Comments
 (0)