Skip to content

Commit b839919

Browse files
authored
Accelerate data page mask computation on device (rapidsai#20280)
Closes rapidsai#19748 This PR implements a GPU [Fenwick tree](https://cp-algorithms.com/data_structures/fenwick.html) + search algorithm along with several host side optimizations to significantly accelerate data page computation mask in the next-gen parquet reader. See before and after performance results [here](rapidsai#20280 (comment)). Credits to @vuule for the Fenwick tree solution for this problem (Thanks again!) Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: rapidsai#20280
1 parent f5161e7 commit b839919

File tree

12 files changed

+824
-395
lines changed

12 files changed

+824
-395
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ add_library(
538538
src/io/parquet/experimental/hybrid_scan_impl.cpp
539539
src/io/parquet/experimental/hybrid_scan_preprocess.cu
540540
src/io/parquet/experimental/page_index_filter.cu
541+
src/io/parquet/experimental/page_index_filter_utils.cu
541542
src/io/parquet/page_data.cu
542543
src/io/parquet/chunk_dict.cu
543544
src/io/parquet/page_enc.cu

cpp/examples/hybrid_scan_io/common_utils.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,24 @@ void check_tables_equal(cudf::table_view const& lhs_table,
7070
cudf::filtered_join join_obj(
7171
lhs_table, cudf::null_equality::EQUAL, cudf::set_as_build_table::RIGHT, stream);
7272
auto const indices = join_obj.anti_join(rhs_table, stream);
73-
7473
// No exception thrown, check indices
75-
auto const valid = indices->size() == 0;
76-
std::cout << "Tables identical: " << std::boolalpha << valid << "\n\n";
74+
auto const tables_equal = indices->size() == 0;
75+
if (tables_equal) {
76+
std::cout << "Tables identical: " << std::boolalpha << tables_equal << "\n\n";
77+
} else {
78+
// Helper to write parquet data for inspection
79+
auto const write_parquet =
80+
[](cudf::table_view table, std::string filepath, rmm::cuda_stream_view stream) {
81+
auto sink_info = cudf::io::sink_info(filepath);
82+
auto opts = cudf::io::parquet_writer_options::builder(sink_info, table).build();
83+
cudf::io::write_parquet(opts, stream);
84+
};
85+
write_parquet(lhs_table, "lhs_table.parquet", stream);
86+
write_parquet(rhs_table, "rhs_table.parquet", stream);
87+
throw std::logic_error("Tables identical: false\n\n");
88+
}
7789
} catch (std::exception& e) {
78-
std::cerr << e.what() << std::endl << std::endl;
79-
throw std::runtime_error("Tables identical: false\n\n");
90+
std::cout << e.what() << std::endl;
8091
}
8192
}
8293

cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ using parquet::detail::pass_intermediate_data;
3232

3333
void hybrid_scan_reader_impl::handle_chunking(
3434
read_mode mode,
35-
std::vector<rmm::device_buffer> column_chunk_buffers,
36-
cudf::host_span<std::vector<bool> const> data_page_mask)
35+
std::vector<rmm::device_buffer>&& column_chunk_buffers,
36+
cudf::host_span<bool const> data_page_mask)
3737
{
3838
// if this is our first time in here, setup the first pass.
3939
if (!_pass_itm_data) {
@@ -77,7 +77,8 @@ void hybrid_scan_reader_impl::handle_chunking(
7777
setup_next_subpass(mode);
7878
}
7979

80-
void hybrid_scan_reader_impl::setup_next_pass(std::vector<rmm::device_buffer> column_chunk_buffers)
80+
void hybrid_scan_reader_impl::setup_next_pass(
81+
std::vector<rmm::device_buffer>&& column_chunk_buffers)
8182
{
8283
auto const num_passes = _file_itm_data.num_passes();
8384
CUDF_EXPECTS(num_passes == 1,

cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,21 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
270270
* Compute a vector of boolean vectors indicating which data pages need to be decoded to
271271
* construct each input column based on the row mask, one vector per column
272272
*
273+
* @tparam ColumnView Type of the row mask column view - cudf::mutable_column_view for filter
274+
* columns and cudf::column_view for payload columns
275+
*
273276
* @param row_mask Boolean column indicating which rows need to be read after page-pruning
274277
* @param row_group_indices Input row groups indices
275278
* @param input_columns Input column information
276279
* @param row_mask_offset Offset into the row mask column for the current pass
277280
* @param stream CUDA stream used for device memory operations and kernel launches
278281
*
279-
* @return A vector of boolean vectors indicating which data pages need to be decoded to produce
280-
* the output table based on the input row mask, one per input column
282+
* @return Boolean vector indicating which data pages need to be decoded to produce
283+
* the output table based on the input row mask across all input columns
281284
*/
282-
[[nodiscard]] std::vector<std::vector<bool>> compute_data_page_mask(
283-
cudf::column_view row_mask,
285+
template <typename ColumnView>
286+
[[nodiscard]] thrust::host_vector<bool> compute_data_page_mask(
287+
ColumnView const& row_mask,
284288
cudf::host_span<std::vector<size_type> const> row_group_indices,
285289
cudf::host_span<input_column_info const> input_columns,
286290
cudf::size_type row_mask_offset,

cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_filter_columns(
444444
(mask_data_pages == use_data_page_mask::YES)
445445
? _extended_metadata->compute_data_page_mask(
446446
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream)
447-
: std::vector<std::vector<bool>>{};
447+
: thrust::host_vector<bool>{};
448448

449449
prepare_data(
450450
read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
@@ -474,7 +474,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_payload_columns(
474474
(mask_data_pages == use_data_page_mask::YES)
475475
? _extended_metadata->compute_data_page_mask(
476476
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream)
477-
: std::vector<std::vector<bool>>{};
477+
: thrust::host_vector<bool>{};
478478

479479
prepare_data(
480480
read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
@@ -513,7 +513,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_filter_columns(
513513
(mask_data_pages == use_data_page_mask::YES)
514514
? _extended_metadata->compute_data_page_mask(
515515
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream)
516-
: std::vector<std::vector<bool>>{};
516+
: thrust::host_vector<bool>{};
517517

518518
prepare_data(
519519
read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
@@ -564,7 +564,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_payload_columns(
564564
(mask_data_pages == use_data_page_mask::YES)
565565
? _extended_metadata->compute_data_page_mask(
566566
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream)
567-
: std::vector<std::vector<bool>>{};
567+
: thrust::host_vector<bool>{};
568568

569569
prepare_data(
570570
read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
@@ -645,7 +645,7 @@ void hybrid_scan_reader_impl::prepare_data(
645645
read_mode mode,
646646
cudf::host_span<std::vector<size_type> const> row_group_indices,
647647
std::vector<rmm::device_buffer>&& column_chunk_buffers,
648-
cudf::host_span<std::vector<bool> const> data_page_mask)
648+
cudf::host_span<bool const> data_page_mask)
649649
{
650650
// if we have not preprocessed at the whole-file level, do that now
651651
if (not _file_preprocessed) {
@@ -857,8 +857,7 @@ table_with_metadata hybrid_scan_reader_impl::finalize_output(
857857
}
858858
}
859859

860-
void hybrid_scan_reader_impl::set_pass_page_mask(
861-
cudf::host_span<std::vector<bool> const> data_page_mask)
860+
void hybrid_scan_reader_impl::set_pass_page_mask(cudf::host_span<bool const> data_page_mask)
862861
{
863862
auto const& pass = _pass_itm_data;
864863
auto const& chunks = pass->chunks;
@@ -872,13 +871,11 @@ void hybrid_scan_reader_impl::set_pass_page_mask(
872871
return;
873872
}
874873

874+
size_t num_inserted_data_pages = 0;
875875
std::for_each(
876876
thrust::counting_iterator<size_t>(0),
877877
thrust::counting_iterator(_input_columns.size()),
878878
[&](auto col_idx) {
879-
auto const& col_page_mask = data_page_mask[col_idx];
880-
size_t num_inserted_data_pages = 0;
881-
882879
for (size_t chunk_idx = col_idx; chunk_idx < chunks.size(); chunk_idx += num_columns) {
883880
// Insert a true value for each dictionary page
884881
if (chunks[chunk_idx].num_dict_pages > 0) { _pass_page_mask.push_back(true); }
@@ -888,21 +885,17 @@ void hybrid_scan_reader_impl::set_pass_page_mask(
888885

889886
// Make sure we have enough page mask for this column chunk
890887
CUDF_EXPECTS(
891-
col_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk,
888+
data_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk,
892889
"Encountered invalid data page mask size");
893890

894891
// Insert page mask for this column chunk
895892
_pass_page_mask.insert(
896893
_pass_page_mask.end(),
897-
col_page_mask.begin() + num_inserted_data_pages,
898-
col_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk);
899-
894+
data_page_mask.begin() + num_inserted_data_pages,
895+
data_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk);
900896
// Update the number of inserted data pages
901897
num_inserted_data_pages += num_data_pages_this_col_chunk;
902898
}
903-
// Make sure we inserted exactly the number of data pages for this column
904-
CUDF_EXPECTS(num_inserted_data_pages == col_page_mask.size(),
905-
"Encountered mismatch in number of data pages and page mask size");
906899
});
907900

908901
// Make sure we inserted exactly the number of pages for this pass

cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
255255
*
256256
* @param data_page_mask Input data page mask from page-pruning step
257257
*/
258-
void set_pass_page_mask(cudf::host_span<std::vector<bool> const> data_page_mask);
258+
void set_pass_page_mask(cudf::host_span<bool const> data_page_mask);
259259

260260
/**
261261
* @brief Select the columns to be read based on the read mode
@@ -285,11 +285,12 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
285285
* @param mode Value indicating if the data sources are read all at once or chunk by chunk
286286
* @param row_group_indices Row group indices to read
287287
* @param column_chunk_buffers Device buffers containing column chunk data
288+
* @param data_page_mask Input data page mask from page-pruning step
288289
*/
289290
void prepare_data(read_mode mode,
290291
cudf::host_span<std::vector<size_type> const> row_group_indices,
291292
std::vector<rmm::device_buffer>&& column_chunk_buffers,
292-
cudf::host_span<std::vector<bool> const> data_page_mask);
293+
cudf::host_span<bool const> data_page_mask);
293294

294295
/**
295296
* @brief Create descriptors for filter column chunks and decode dictionary page headers
@@ -330,8 +331,8 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
330331
* @param data_page_mask Input data page mask from page-pruning step for the current pass
331332
*/
332333
void handle_chunking(read_mode mode,
333-
std::vector<rmm::device_buffer> column_chunk_buffers,
334-
cudf::host_span<std::vector<bool> const> data_page_mask);
334+
std::vector<rmm::device_buffer>&& column_chunk_buffers,
335+
cudf::host_span<bool const> data_page_mask);
335336

336337
/**
337338
* @brief Setup step for the next input read pass.
@@ -341,7 +342,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
341342
*
342343
* @param column_chunk_buffers Device buffers containing column chunk data
343344
*/
344-
void setup_next_pass(std::vector<rmm::device_buffer> column_chunk_buffers);
345+
void setup_next_pass(std::vector<rmm::device_buffer>&& column_chunk_buffers);
345346

346347
/**
347348
* @brief Setup pointers to columns chunks to be processed for this pass.
@@ -357,7 +358,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
357358
*
358359
* @param column_chunk_buffers Device buffers containing column chunk data
359360
*/
360-
void setup_compressed_data(std::vector<rmm::device_buffer> column_chunk_buffers);
361+
void setup_compressed_data(std::vector<rmm::device_buffer>&& column_chunk_buffers);
361362

362363
/**
363364
* @brief Reset the internal state of the reader.

cpp/src/io/parquet/experimental/hybrid_scan_preprocess.cu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ bool hybrid_scan_reader_impl::setup_column_chunks()
172172
}
173173

174174
void hybrid_scan_reader_impl::setup_compressed_data(
175-
std::vector<rmm::device_buffer> column_chunk_buffers)
175+
std::vector<rmm::device_buffer>&& column_chunk_buffers)
176176
{
177177
auto& pass = *_pass_itm_data;
178178

0 commit comments

Comments
 (0)