From 3a03f3a0b9c1665ce7c2e11908e6c6bae00c454f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 25 Nov 2025 01:35:07 +0000 Subject: [PATCH 1/7] Add example code --- cpp/examples/hybrid_scan_io/CMakeLists.txt | 9 + cpp/examples/hybrid_scan_io/common_utils.cpp | 391 +++++++++++++++++- cpp/examples/hybrid_scan_io/common_utils.hpp | 89 ++-- .../hybrid_scan_io/hybrid_scan_io.cpp | 215 +--------- .../hybrid_scan_io_multithreaded.cpp | 241 +++++++++++ cpp/examples/hybrid_scan_io/io_source.hpp | 2 +- 6 files changed, 680 insertions(+), 267 deletions(-) create mode 100644 cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp diff --git a/cpp/examples/hybrid_scan_io/CMakeLists.txt b/cpp/examples/hybrid_scan_io/CMakeLists.txt index 5f996a44538..f28ea62ae1a 100644 --- a/cpp/examples/hybrid_scan_io/CMakeLists.txt +++ b/cpp/examples/hybrid_scan_io/CMakeLists.txt @@ -39,6 +39,15 @@ target_link_libraries( target_compile_features(hybrid_scan_io PRIVATE cxx_std_20) install(TARGETS hybrid_scan_io DESTINATION bin/examples/libcudf/hybrid_scan_io) +# Build and install hybrid_scan_io_multithreaded +add_executable(hybrid_scan_io_multithreaded hybrid_scan_io_multithreaded.cpp) +target_link_libraries( + hybrid_scan_io_multithreaded PRIVATE cudf::cudf $ + $ +) +target_compile_features(hybrid_scan_io_multithreaded PRIVATE cxx_std_20) +install(TARGETS hybrid_scan_io_multithreaded DESTINATION bin/examples/libcudf/hybrid_scan_io) + # Install the example.parquet file install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf/hybrid_scan_io diff --git a/cpp/examples/hybrid_scan_io/common_utils.cpp b/cpp/examples/hybrid_scan_io/common_utils.cpp index 35f80dd6454..ffea7030671 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.cpp +++ b/cpp/examples/hybrid_scan_io/common_utils.cpp @@ -5,21 +5,29 @@ #include "common_utils.hpp" +#include "timer.hpp" + #include -#include +#include #include +#include #include #include #include #include +#include #include +#include #include #include #include #include +#include +#include #include +#include #include /** @@ -31,11 +39,89 @@ std::shared_ptr create_memory_resource(bool is_ { if (is_pool_used) { return rmm::mr::make_owning_wrapper( - std::make_shared(), rmm::percent_of_free_device_memory(50)); + std::make_shared(), rmm::percent_of_free_device_memory(80)); } return std::make_shared(); } +std::vector extract_input_sources(std::string const& paths, + int32_t input_multiplier, + int32_t thread_count, + io_source_type io_source_type, + rmm::cuda_stream_view stream) +{ + // Get the delimited paths to directory and/or files. + std::vector const delimited_paths = [&]() { + std::vector paths_list; + std::stringstream strstream{paths}; + std::string path; + // Extract the delimited paths. + while (std::getline(strstream, path, char{','})) { + paths_list.push_back(path); + } + return paths_list; + }(); + + // List of parquet files + std::vector parquet_files; + std::for_each(delimited_paths.cbegin(), delimited_paths.cend(), [&](auto const& path_string) { + auto const path = std::filesystem::path{path_string}; + // If this is a parquet file, add it. + if (std::filesystem::is_regular_file(path)) { + parquet_files.push_back(path_string); + } + // If this is a directory, add all files in the directory. + else if (std::filesystem::is_directory(path)) { + for (auto const& file : std::filesystem::directory_iterator(path)) { + if (std::filesystem::is_regular_file(file.path())) { + parquet_files.push_back(file.path().string()); + } else { + std::cout << "Skipping sub-directory: " << file.path().string() << "\n"; + } + } + } else { + throw std::runtime_error("Encountered an invalid input path\n"); + } + }); + + // Current size of list of parquet files + auto const initial_size = parquet_files.size(); + if (initial_size == 0) { return {}; } + + // Reserve space + parquet_files.reserve(std::max(thread_count, input_multiplier * parquet_files.size())); + + // Append the input files by input_multiplier times + std::for_each(thrust::make_counting_iterator(1), + thrust::make_counting_iterator(input_multiplier), + [&](auto i) { + parquet_files.insert(parquet_files.end(), + parquet_files.begin(), + parquet_files.begin() + initial_size); + }); + + // Cycle append parquet files from the existing ones if less than the thread_count + std::cout << "Warning: Number of input sources < thread count. Cycling from\n" + "and appending to current input sources such that the number of\n" + "input source == thread count\n"; + for (size_t idx = 0; thread_count > static_cast(parquet_files.size()); idx++) { + parquet_files.emplace_back(parquet_files[idx % initial_size]); + } + + // Vector of io sources + std::vector input_sources; + input_sources.reserve(parquet_files.size()); + // Transform input files to the specified io sources + std::transform(parquet_files.begin(), + parquet_files.end(), + std::back_inserter(input_sources), + [&](auto const& file_name) { + return io_source{file_name, io_source_type, stream}; + }); + stream.synchronize(); + return input_sources; +} + cudf::ast::operation create_filter_expression(std::string const& column_name, std::string const& literal_value) { @@ -45,21 +131,6 @@ cudf::ast::operation create_filter_expression(std::string const& column_name, return cudf::ast::operation(cudf::ast::ast_operator::EQUAL, column_reference, literal); } -std::unique_ptr combine_tables(std::unique_ptr filter_table, - std::unique_ptr payload_table) -{ - auto filter_columns = filter_table->release(); - auto payload_columns = payload_table->release(); - - auto all_columns = std::vector>{}; - all_columns.reserve(filter_columns.size() + payload_columns.size()); - std::move(filter_columns.begin(), filter_columns.end(), std::back_inserter(all_columns)); - std::move(payload_columns.begin(), payload_columns.end(), std::back_inserter(all_columns)); - auto table = std::make_unique(std::move(all_columns)); - - return table; -} - void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table, rmm::cuda_stream_view stream) @@ -91,6 +162,14 @@ void check_tables_equal(cudf::table_view const& lhs_table, } } +namespace { + +/** + * @brief Fetches a host span of Parquet footer bytes from the input buffer span + * + * @param buffer Input buffer span + * @return A host span of the footer bytes + */ cudf::host_span fetch_footer_bytes(cudf::host_span buffer) { CUDF_FUNC_RANGE(); @@ -117,6 +196,13 @@ cudf::host_span fetch_footer_bytes(cudf::host_span ender->footer_len); } +/** + * @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span + * + * @param buffer Input buffer span + * @param page_index_bytes Byte range of `PageIndex` to fetch + * @return A host span of the PageIndex bytes + */ cudf::host_span fetch_page_index_bytes( cudf::host_span buffer, cudf::io::text::byte_range_info const page_index_bytes) { @@ -125,6 +211,16 @@ cudf::host_span fetch_page_index_bytes( page_index_bytes.size()); } +/** + * @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers + * + * @param host_buffer Host buffer span + * @param byte_ranges Byte ranges to fetch + * @param stream CUDA stream + * @param mr Device memory resource to create device buffers with + * + * @return Vector of device buffers + */ std::vector fetch_byte_ranges( cudf::host_span host_buffer, cudf::host_span byte_ranges, @@ -149,3 +245,264 @@ std::vector fetch_byte_ranges( stream.synchronize_no_throw(); return buffers; } + +/** + * @brief Combine columns from filter and payload tables into a single table + * + * @param filter_table Filter table + * @param payload_table Payload table + * @return Combined table + */ +std::unique_ptr combine_tables(std::unique_ptr filter_table, + std::unique_ptr payload_table) +{ + auto filter_columns = filter_table->release(); + auto payload_columns = payload_table->release(); + + auto all_columns = std::vector>{}; + all_columns.reserve(filter_columns.size() + payload_columns.size()); + std::move(filter_columns.begin(), filter_columns.end(), std::back_inserter(all_columns)); + std::move(payload_columns.begin(), payload_columns.end(), std::back_inserter(all_columns)); + auto table = std::make_unique(std::move(all_columns)); + + return table; +} + +} // namespace + +/** + * @brief Read parquet file with the next-gen parquet reader + * + * @param io_source io source to read + * @param filter_expression Filter expression + * @param filters Set of parquet filters to apply + * @param stream CUDA stream for hybrid scan reader + * @param mr Device memory resource + * + * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final + * row validity column + */ +template +std::unique_ptr hybrid_scan(io_source const& io_source, + cudf::ast::operation const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto options = cudf::io::parquet_reader_options::builder().filter(filter_expression).build(); + + // Input file buffer span + auto const file_buffer_span = io_source.get_host_buffer_span(); + + if constexpr (print_progress) { std::cout << "\nREADER: Setup, metadata and page index...\n"; } + timer timer; + + // Fetch footer bytes and setup reader + auto const footer_buffer = fetch_footer_bytes(file_buffer_span); + auto const reader = + std::make_unique(footer_buffer, options); + + // Get page index byte range from the reader + auto const page_index_byte_range = reader->page_index_byte_range(); + auto const page_index_buffer = fetch_page_index_bytes(file_buffer_span, page_index_byte_range); + reader->setup_page_index(page_index_buffer); + + // Get all row groups from the reader + auto input_row_group_indices = reader->all_row_groups(options); + auto current_row_group_indices = cudf::host_span(input_row_group_indices); + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + + // Filter row groups with stats + auto stats_filtered_row_group_indices = std::vector{}; + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_STATS)) { + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with stats...\n"; + timer.reset(); + } + stats_filtered_row_group_indices = + reader->filter_row_groups_with_stats(current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = stats_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } + + std::vector bloom_filter_byte_ranges; + std::vector dict_page_byte_ranges; + + // Get bloom filter and dictionary page byte ranges from the reader + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) or + filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS)) { + if constexpr (print_progress) { + std::cout << "READER: Get bloom filter and dictionary page byte ranges...\n"; + timer.reset(); + } + std::tie(bloom_filter_byte_ranges, dict_page_byte_ranges) = + reader->secondary_filters_byte_ranges(current_row_group_indices, options); + if constexpr (print_progress) { timer.print_elapsed_millis(); } + } + + // Filter row groups with dictionary pages + std::vector dictionary_page_filtered_row_group_indices; + dictionary_page_filtered_row_group_indices.reserve(current_row_group_indices.size()); + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) and + dict_page_byte_ranges.size()) { + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with dictionary pages...\n"; + timer.reset(); + } + // Fetch dictionary page buffers from the input file buffer + std::vector dictionary_page_buffers = + fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr); + dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages( + dictionary_page_buffers, current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = dictionary_page_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Row group filtering with dictionary pages...\n\n"; + } + } + + // Filter row groups with bloom filters + std::vector bloom_filtered_row_group_indices; + bloom_filtered_row_group_indices.reserve(current_row_group_indices.size()); + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS) and + bloom_filter_byte_ranges.size()) { + // Fetch 32 byte aligned bloom filter data buffers from the input file buffer + auto constexpr bloom_filter_alignment = rmm::CUDA_ALLOCATION_ALIGNMENT; + auto aligned_mr = rmm::mr::aligned_resource_adaptor( + mr, bloom_filter_alignment); + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with bloom filters...\n"; + timer.reset(); + } + std::vector bloom_filter_data = + fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr); + // Filter row groups with bloom filters + bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters( + bloom_filter_data, current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = bloom_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Row group filtering with bloom filters...\n\n"; + } + } + + // Check whether to prune filter column data pages + using cudf::io::parquet::experimental::use_data_page_mask; + auto const prune_filter_data_pages = + filters.contains(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); + + auto row_mask = std::unique_ptr{}; + if (prune_filter_data_pages) { + if constexpr (print_progress) { + std::cout << "READER: Filter data pages of filter columns with page index stats...\n"; + timer.reset(); + } + // Filter data pages with page index stats + row_mask = + reader->build_row_mask_with_page_index_stats(current_row_group_indices, options, stream, mr); + if constexpr (print_progress) { timer.print_elapsed_millis(); } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Filter column data page filtering with page index stats...\n\n"; + } + auto num_rows = reader->total_rows_in_row_groups(current_row_group_indices); + row_mask = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::BOOL8}, num_rows, rmm::device_buffer{}, 0, stream, mr); + } + + if constexpr (print_progress) { + std::cout << "READER: Materialize filter columns...\n"; + timer.reset(); + } + // Get column chunk byte ranges from the reader + auto const filter_column_chunk_byte_ranges = + reader->filter_column_chunks_byte_ranges(current_row_group_indices, options); + auto filter_column_chunk_buffers = + fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr); + + // Materialize the table with only the filter columns + auto row_mask_mutable_view = row_mask->mutable_view(); + auto filter_table = + reader + ->materialize_filter_columns( + current_row_group_indices, + std::move(filter_column_chunk_buffers), + row_mask_mutable_view, + prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, + options, + stream) + .tbl; + if constexpr (print_progress) { timer.print_elapsed_millis(); } + + // Check whether to prune payload column data pages + auto const prune_payload_data_pages = + filters.contains(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); + + if constexpr (print_progress) { + if (prune_payload_data_pages) { + std::cout << "READER: Filter data pages of payload columns with row mask...\n"; + } else { + std::cout << "SKIP: Payload column data page filtering with row mask...\n\n"; + } + + std::cout << "READER: Materialize payload columns...\n"; + timer.reset(); + } + // Get column chunk byte ranges from the reader + auto const payload_column_chunk_byte_ranges = + reader->payload_column_chunks_byte_ranges(current_row_group_indices, options); + auto payload_column_chunk_buffers = + fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr); + + // Materialize the table with only the payload columns + auto payload_table = + reader + ->materialize_payload_columns( + current_row_group_indices, + std::move(payload_column_chunk_buffers), + row_mask->view(), + prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, + options, + stream) + .tbl; + if constexpr (print_progress) { timer.print_elapsed_millis(); } + + return combine_tables(std::move(filter_table), std::move(payload_table)); +} + +// Explicit template instantiations +template std::unique_ptr hybrid_scan( + io_source const& io_source, + cudf::ast::operation const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template std::unique_ptr hybrid_scan( + io_source const& io_source, + cudf::ast::operation const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/common_utils.hpp b/cpp/examples/hybrid_scan_io/common_utils.hpp index 2be0f80fdac..c09fe3b4b37 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.hpp +++ b/cpp/examples/hybrid_scan_io/common_utils.hpp @@ -5,6 +5,8 @@ #pragma once +#include "io_source.hpp" + #include #include #include @@ -13,12 +15,24 @@ #include #include +#include /** * @file common_utils.hpp * @brief Utilities for `hybrid_scan_io` example */ +/** + * @brief Enum to represent the available parquet filters + */ +enum class parquet_filter_type : uint8_t { + ROW_GROUPS_WITH_STATS = 0, + ROW_GROUPS_WITH_DICT_PAGES = 1, + ROW_GROUPS_WITH_BLOOM_FILTERS = 2, + FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3, + PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4, +}; + /** * @brief Create memory resource for libcudf functions * @@ -27,6 +41,29 @@ */ std::shared_ptr create_memory_resource(bool is_pool_used); +/** + * @brief Function to process comma delimited input paths string to parquet files and/or dirs + * and convert them to specified io sources. + * + * Process the input path string containing directories (of parquet files) and/or individual + * parquet files into a list of input parquet files, multiple the list by `input_multiplier`, + * make sure to have at least `thread_count` files to satisfy at least file per parallel thread, + * and convert the final list of files to a list of `io_source` and return. + * + * @param paths Comma delimited input paths string + * @param input_multiplier Multiplier for the input files list + * @param thread_count Number of threads being used in the example + * @param io_source_type Specified IO source type to convert input files to + * @param stream CUDA stream to use + * + * @return Vector of input sources for the given paths + */ +[[nodiscard]] std::vector extract_input_sources(std::string const& paths, + int32_t input_multiplier, + int32_t thread_count, + io_source_type io_source_type, + rmm::cuda_stream_view stream); + /** * @brief Create a filter expression of the form `column_name == literal` for string type point * lookups @@ -38,16 +75,6 @@ std::shared_ptr create_memory_resource(bool is_ cudf::ast::operation create_filter_expression(std::string const& column_name, std::string const& literal_value); -/** - * @brief Combine columns from filter and payload tables into a single table - * - * @param filter_table Filter table - * @param payload_table Payload table - * @return Combined table - */ -std::unique_ptr combine_tables(std::unique_ptr filter_table, - std::unique_ptr payload_table); - /** * @brief Check if two tables are identical, throw an error otherwise * @@ -60,34 +87,22 @@ void check_tables_equal(cudf::table_view const& lhs_table, rmm::cuda_stream_view stream = cudf::get_default_stream()); /** - * @brief Fetches a host span of Parquet footer bytes from the input buffer span - * - * @param buffer Input buffer span - * @return A host span of the footer bytes - */ -cudf::host_span fetch_footer_bytes(cudf::host_span buffer); -/** - * @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span + * @brief Read parquet file with the next-gen parquet reader * - * @param buffer Input buffer span - * @param page_index_bytes Byte range of `PageIndex` to fetch - * @return A host span of the PageIndex bytes - */ -cudf::host_span fetch_page_index_bytes( - cudf::host_span buffer, cudf::io::text::byte_range_info const page_index_bytes); - -/** - * @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers + * @tparam print_progress Boolean indicating whether to print progress * - * @param host_buffer Host buffer span - * @param byte_ranges Byte ranges to fetch - * @param stream CUDA stream - * @param mr Device memory resource to create device buffers with + * @param io_source io source to read + * @param filter_expression Filter expression + * @param filters Set of parquet filters to apply + * @param stream CUDA stream for hybrid scan reader + * @param mr Device memory resource * - * @return Vector of device buffers + * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final + * row validity column */ -std::vector fetch_byte_ranges( - cudf::host_span host_buffer, - cudf::host_span byte_ranges, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +template +std::unique_ptr hybrid_scan(io_source const& io_source, + cudf::ast::operation const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp index ce8b545a082..b18fe47d790 100644 --- a/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp @@ -7,14 +7,11 @@ #include "io_source.hpp" #include "timer.hpp" -#include #include #include #include #include -#include -#include #include #include @@ -51,212 +48,6 @@ cudf::io::table_with_metadata read_parquet(io_source const& io_source, return cudf::io::read_parquet(options); } -/** - * @brief Enum to represent the available parquet filters - */ -enum class parquet_filter_type : uint8_t { - ROW_GROUPS_WITH_STATS = 0, - ROW_GROUPS_WITH_DICT_PAGES = 1, - ROW_GROUPS_WITH_BLOOM_FILTERS = 2, - FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3, - PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4, -}; - -/** - * @brief Read parquet file with the next-gen parquet reader - * - * @param io_source io source to read - * @param filter_expression Filter expression - * @param filters Set of parquet filters to apply - * @param stream CUDA stream for hybrid scan reader - * @param mr Device memory resource - * - * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final - * row validity column - */ -auto hybrid_scan(io_source const& io_source, - cudf::ast::operation const& filter_expression, - std::unordered_set const& filters, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - auto options = cudf::io::parquet_reader_options::builder().filter(filter_expression).build(); - - // Input file buffer span - auto const file_buffer_span = io_source.get_host_buffer_span(); - - std::cout << "\nREADER: Setup, metadata and page index...\n"; - timer timer; - - // Fetch footer bytes and setup reader - auto const footer_buffer = fetch_footer_bytes(file_buffer_span); - auto const reader = - std::make_unique(footer_buffer, options); - - // Get page index byte range from the reader - auto const page_index_byte_range = reader->page_index_byte_range(); - auto const page_index_buffer = fetch_page_index_bytes(file_buffer_span, page_index_byte_range); - reader->setup_page_index(page_index_buffer); - - // Get all row groups from the reader - auto input_row_group_indices = reader->all_row_groups(options); - auto current_row_group_indices = cudf::host_span(input_row_group_indices); - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - - timer.print_elapsed_millis(); - - // Filter row groups with stats - auto stats_filtered_row_group_indices = std::vector{}; - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_STATS)) { - std::cout << "READER: Filter row groups with stats...\n"; - timer.reset(); - stats_filtered_row_group_indices = - reader->filter_row_groups_with_stats(current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = stats_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } - - std::vector bloom_filter_byte_ranges; - std::vector dict_page_byte_ranges; - - // Get bloom filter and dictionary page byte ranges from the reader - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) or - filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS)) { - std::cout << "READER: Get bloom filter and dictionary page byte ranges...\n"; - timer.reset(); - std::tie(bloom_filter_byte_ranges, dict_page_byte_ranges) = - reader->secondary_filters_byte_ranges(current_row_group_indices, options); - timer.print_elapsed_millis(); - } - - // Filter row groups with dictionary pages - std::vector dictionary_page_filtered_row_group_indices; - dictionary_page_filtered_row_group_indices.reserve(current_row_group_indices.size()); - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) and - dict_page_byte_ranges.size()) { - std::cout << "READER: Filter row groups with dictionary pages...\n"; - timer.reset(); - // Fetch dictionary page buffers from the input file buffer - std::vector dictionary_page_buffers = - fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr); - dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages( - dictionary_page_buffers, current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = dictionary_page_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Row group filtering with dictionary pages...\n\n"; - } - - // Filter row groups with bloom filters - std::vector bloom_filtered_row_group_indices; - bloom_filtered_row_group_indices.reserve(current_row_group_indices.size()); - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS) and - bloom_filter_byte_ranges.size()) { - // Fetch 32 byte aligned bloom filter data buffers from the input file buffer - auto constexpr bloom_filter_alignment = rmm::CUDA_ALLOCATION_ALIGNMENT; - auto aligned_mr = rmm::mr::aligned_resource_adaptor( - mr, bloom_filter_alignment); - std::cout << "READER: Filter row groups with bloom filters...\n"; - timer.reset(); - std::vector bloom_filter_data = - fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr); - // Filter row groups with bloom filters - bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters( - bloom_filter_data, current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = bloom_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Row group filtering with bloom filters...\n\n"; - } - - // Check whether to prune filter column data pages - using cudf::io::parquet::experimental::use_data_page_mask; - auto const prune_filter_data_pages = - filters.contains(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); - - auto row_mask = std::unique_ptr{}; - if (prune_filter_data_pages) { - std::cout << "READER: Filter data pages of filter columns with page index stats...\n"; - timer.reset(); - // Filter data pages with page index stats - row_mask = - reader->build_row_mask_with_page_index_stats(current_row_group_indices, options, stream, mr); - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Filter column data page filtering with page index stats...\n\n"; - auto num_rows = reader->total_rows_in_row_groups(current_row_group_indices); - row_mask = cudf::make_numeric_column( - cudf::data_type{cudf::type_id::BOOL8}, num_rows, rmm::device_buffer{}, 0, stream, mr); - } - - std::cout << "READER: Materialize filter columns...\n"; - timer.reset(); - // Get column chunk byte ranges from the reader - auto const filter_column_chunk_byte_ranges = - reader->filter_column_chunks_byte_ranges(current_row_group_indices, options); - auto filter_column_chunk_buffers = - fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr); - - // Materialize the table with only the filter columns - auto row_mask_mutable_view = row_mask->mutable_view(); - auto filter_table = - reader - ->materialize_filter_columns( - current_row_group_indices, - std::move(filter_column_chunk_buffers), - row_mask_mutable_view, - prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, - options, - stream) - .tbl; - timer.print_elapsed_millis(); - - // Check whether to prune payload column data pages - auto const prune_payload_data_pages = - filters.contains(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); - - if (prune_payload_data_pages) { - std::cout << "READER: Filter data pages of payload columns with row mask...\n"; - } else { - std::cout << "SKIP: Payload column data page filtering with row mask...\n\n"; - } - - std::cout << "READER: Materialize payload columns...\n"; - timer.reset(); - // Get column chunk byte ranges from the reader - auto const payload_column_chunk_byte_ranges = - reader->payload_column_chunks_byte_ranges(current_row_group_indices, options); - auto payload_column_chunk_buffers = - fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr); - - // Materialize the table with only the payload columns - auto payload_table = - reader - ->materialize_payload_columns( - current_row_group_indices, - std::move(payload_column_chunk_buffers), - row_mask->view(), - prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, - options, - stream) - .tbl; - timer.print_elapsed_millis(); - - return std::make_tuple(combine_tables(std::move(filter_table), std::move(payload_table)), - std::move(row_mask)); -} - /** * @brief Function to print example usage and argument information. */ @@ -353,13 +144,13 @@ int main(int argc, char const** argv) timer timer; std::cout << "Reading " << input_filepath << " with next-gen parquet reader...\n"; timer.reset(); - auto [table_next_gen_reader, row_mask] = - hybrid_scan(data_source, filter_expression, filters, stream, stats_mr); + auto const table_next_gen_reader = + hybrid_scan(data_source, filter_expression, filters, stream, stats_mr); timer.print_elapsed_millis(); std::cout << "Reading " << input_filepath << " with main parquet reader...\n"; timer.reset(); - auto [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream); + auto const [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream); timer.print_elapsed_millis(); // Check for validity diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp new file mode 100644 index 00000000000..2c234b8ec8a --- /dev/null +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp @@ -0,0 +1,241 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common_utils.hpp" +#include "io_source.hpp" +#include "timer.hpp" + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +/** + * @file hybrid_scan_io_multithreaded.cpp + * + * @brief Demonstrates reading parquet data from the specified io source with libcudf's next-gen + * parquet reader (hybrid scan reader) subject to a highly selective point lookup (col_name == + * literal) filter using multiple threads. + * + * The input parquet data is provided via files which are converted to the specified io source type + * to be read using multiple threads. Optionally, the parquet data read by each thread can be + * written to corresponding files and checked for validatity of the output files against the input + * data. + * + * Run: ``hybrid_scan_io_multithreaded -h`` to see help with input args and more information. + * + * The following io source types are supported: + * IO source types: HOST_BUFFER, PINNED_BUFFER + */ + +/** + * @brief Functor for multithreaded parquet reading based on the provided read_mode + */ +struct hybrid_scan_fn { + std::vector const& input_sources; + cudf::ast::operation const& filter_expression; + std::unordered_set const& filters; + int const thread_id; + int const thread_count; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + void operator()() + { + // Sweep the available input files + for (auto curr_file_idx = thread_id; curr_file_idx < input_sources.size(); + curr_file_idx += thread_count) { + timer timer; + std::ignore = + hybrid_scan(input_sources[curr_file_idx], filter_expression, filters, stream, mr); + std::cout << "Thread " << thread_id << " "; + timer.print_elapsed_millis(); + } + + // Just synchronize this stream and exit + stream.synchronize_no_throw(); + } +}; + +/** + * @brief Function to setup and launch multithreaded hybrid scan reading. + * + * @param input_sources List of input sources to read + * @param filter_expression Filter expression + * @param filters Filters to apply + * @param thread_count Number of threads + * @param stream_pool CUDA stream pool to use for threads + */ +void hybrid_scan_multithreaded( + std::vector const& input_sources, + cudf::ast::operation const& filter_expression, + std::unordered_set const& filters, + int32_t thread_count, + rmm::cuda_stream_pool& stream_pool, + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) +{ + // Table reading tasks + std::vector read_tasks; + read_tasks.reserve(thread_count); + + // Create the read tasks + std::for_each( + thrust::make_counting_iterator(0), thrust::make_counting_iterator(thread_count), [&](auto tid) { + read_tasks.emplace_back(hybrid_scan_fn{.input_sources = input_sources, + .filter_expression = filter_expression, + .filters = filters, + .thread_id = tid, + .thread_count = thread_count, + .stream = stream_pool.get_stream(), + .mr = mr}); + }); + + // Create threads with tasks + std::vector threads; + threads.reserve(thread_count); + for (auto& c : read_tasks) { + threads.emplace_back(c); + } + for (auto& t : threads) { + t.join(); + } +} + +/** + * @brief Function to print example usage and argument information + */ +void inline print_usage() +{ + std::cout << "\nUsage: hybrid_scan_io_multithreaded " + "\n" + " " + "\n" + " \n\n" + + "Available IO source types: HOST_BUFFER, PINNED_BUFFER (Default)\n\n" + "Note: Provide as many arguments as you like in the above order. Default values\n" + " for the unprovided arguments will be used. All input parquet files will\n" + " be converted to the specified IO source type before reading\n\n"; +} + +/** + * @brief The main function + */ +int32_t main(int argc, char const** argv) +{ + constexpr int32_t max_thread_count = 64; + + // Set arguments to defaults + std::string input_paths = "example.parquet"; + int32_t input_multiplier = 1; + int32_t thread_count = 1; + int32_t num_reads = 1; + auto column_name = std::string{"string_col"}; + auto literal_value = std::string{"0000001"}; + io_source_type io_source_type = io_source_type::PINNED_BUFFER; + + // Set to the provided args + switch (argc) { + case 8: io_source_type = get_io_source_type(argv[7]); [[fallthrough]]; + case 7: literal_value = argv[6]; [[fallthrough]]; + case 6: column_name = argv[5]; [[fallthrough]]; + case 5: num_reads = std::max(1, std::stoi(argv[4])); [[fallthrough]]; + case 4: + thread_count = + std::min(max_thread_count, std::max(thread_count, std::stoi(std::string{argv[3]}))); + [[fallthrough]]; + case 3: + input_multiplier = + std::min(max_thread_count, std::max(input_multiplier, std::stoi(std::string{argv[2]}))); + [[fallthrough]]; + case 2: + // Check if instead of input_paths, the first argument is `-h` or `--help` + if (auto arg = std::string{argv[1]}; + arg != "-h" and arg != "--help" and not arg.starts_with("-")) { + input_paths = std::move(arg); + break; + } + [[fallthrough]]; + default: print_usage(); throw std::runtime_error("Exiting..."); + } + + // Initialize mr, default stream and stream pool + bool constexpr is_pool_used = true; + auto resource = create_memory_resource(is_pool_used); + auto default_stream = cudf::get_default_stream(); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + + // List of input sources from the input_paths string. + auto const input_sources = [&]() { + try { + return extract_input_sources( + input_paths, input_multiplier, thread_count, io_source_type, default_stream); + default_stream.synchronize(); + } catch (const std::exception& e) { + print_usage(); + throw std::runtime_error(e.what()); + } + }(); + + // Check if there is nothing to do + if (input_sources.empty()) { + print_usage(); + throw std::runtime_error("No input files to read. Exiting early.\n"); + } + + // Create filter expression + auto const column_reference = cudf::ast::column_name_reference(column_name); + auto scalar = cudf::string_scalar(literal_value); + auto literal = cudf::ast::literal(scalar); + auto filter_expression = + cudf::ast::operation(cudf::ast::ast_operator::EQUAL, column_reference, literal); + + // Insert which filters to apply + std::unordered_set filters; + { + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_STATS); + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES); + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS); + // Deliberately disabled as it has a high cost to benefit ratio + // filters.insert(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); + filters.insert(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); + } + + // Read the same parquet files specified times with multiple threads and discard the read tables + { + std::cout << "\nReading " << input_sources.size() << " input sources " << num_reads + << " time(s) using " << thread_count + << " threads and discarding output " + "tables..\n"; + + std::cout << "Note that the first read may include times for nvcomp, cufile loading and RMM " + "growth.\n\n"; + + auto stream_pool = rmm::cuda_stream_pool(thread_count); + + timer timer; + std::for_each(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_reads), + [&](auto i) { // Read parquet files and discard the tables + hybrid_scan_multithreaded( + input_sources, filter_expression, filters, thread_count, stream_pool); + }); + std::cout << "Total "; + timer.print_elapsed_millis(); + } + + // Print peak memory + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1'048'576.0) << " MB\n\n"; + + return 0; +} diff --git a/cpp/examples/hybrid_scan_io/io_source.hpp b/cpp/examples/hybrid_scan_io/io_source.hpp index 76d4262ce7f..6fbf57340f4 100644 --- a/cpp/examples/hybrid_scan_io/io_source.hpp +++ b/cpp/examples/hybrid_scan_io/io_source.hpp @@ -24,7 +24,7 @@ /** * @brief Available IO source types */ -enum class io_source_type { HOST_BUFFER, PINNED_BUFFER }; +enum class io_source_type : uint8_t { HOST_BUFFER, PINNED_BUFFER }; /** * @brief Get io source type from the string keyword argument From f7ce4714e7e428289826b9b4ba0b38df6289484c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 25 Nov 2025 01:36:03 +0000 Subject: [PATCH 2/7] Use multithreaded `setup_page_index` in hybrid scan reader --- .../experimental/hybrid_scan_helpers.cpp | 34 +--- cpp/src/io/parquet/reader_impl_helpers.cpp | 166 ++++++++++-------- cpp/src/io/parquet/reader_impl_helpers.hpp | 2 + 3 files changed, 96 insertions(+), 106 deletions(-) diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index 3818e5ed36f..d6e55c842d1 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -148,36 +148,12 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span return; } - auto& row_groups = per_file_metadata.front().row_groups; - - CUDF_EXPECTS(not row_groups.empty() and not row_groups.front().columns.empty(), - "No column chunks in Parquet schema to read page index for"); - - CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); - + // Get the file metadata and setup the page index + auto& file_metadata = per_file_metadata.front(); // Set the first ColumnChunk's offset of ColumnIndex as the adjusted zero offset - int64_t const min_offset = row_groups.front().columns.front().column_index_offset; - // now loop over row groups - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - // Read the ColumnIndex for this ColumnChunk - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.column_index_length); - ColumnIndex ci; - cp.read(&ci); - col.column_index = std::move(ci); - } - // Read the OffsetIndex for this ColumnChunk - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.offset_index_length); - OffsetIndex oi; - cp.read(&oi); - col.offset_index = std::move(oi); - } - } - } + int64_t const min_offset = file_metadata.row_groups.front().columns.front().column_index_offset; + + file_metadata.setup_page_index(page_index_bytes, min_offset); } size_type aggregate_reader_metadata::total_rows_in_row_groups( diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index fe9d33678c3..f85caee96fd 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -339,85 +339,93 @@ metadata::metadata(datasource* source, bool read_page_indexes) auto const& last_col = row_groups.back().columns.back(); int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length; - if (max_offset > 0) { - int64_t const length = max_offset - min_offset; - auto const idx_buf = source->host_read(min_offset, length); - // Flatten all columns into a single vector for easier task distribution - std::vector> all_column_chunks; - all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - all_column_chunks.emplace_back(std::ref(col)); - } - } + if (max_offset > min_offset) { + size_t const length = max_offset - min_offset; + auto const page_idx_buf = source->host_read(min_offset, length); + setup_page_index({page_idx_buf->data(), length}, min_offset); + } + } - auto read_column_indexes = [&idx_buf, min_offset](CompactProtocolReader& reader, - ColumnChunk& col) { - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.column_index_length); - col.column_index = ColumnIndex(); - reader.read(&col.column_index.value()); - } - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.offset_index_length); - col.offset_index = OffsetIndex(); - reader.read(&col.offset_index.value()); - } - }; - - // Use parallel processing only if we have enough columns to justify the overhead - constexpr std::size_t parallel_threshold = 512; - auto const total_column_chunks = all_column_chunks.size(); - if (total_column_chunks >= parallel_threshold) { - // Dynamically calculate number of tasks based the number or row groups and columns - constexpr std::size_t min_tasks = 4; - constexpr std::size_t max_tasks = 32; - auto const ratio = static_cast(total_column_chunks) / parallel_threshold; - // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements - // doubles both the number of tasks and the task size) - auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); - - auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); - auto const column_chunks_per_task = total_column_chunks / num_tasks; - auto const remainder = total_column_chunks % num_tasks; - - std::vector> tasks; - tasks.reserve(num_tasks); - - std::size_t start_idx = 0; - for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { - auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); - auto const end_idx = start_idx + task_size; - - if (start_idx >= total_column_chunks) break; - - tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { - CompactProtocolReader local_cp; - - for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { - read_column_indexes(local_cp, all_column_chunks[i].get()); - } - })); - - start_idx = end_idx; - } + sanitize_schema(); +} - for (auto& task : tasks) { - task.get(); - } - } else { - // For small numbers of columns, use sequential processing to avoid overhead - for (auto& col_ref : all_column_chunks) { - read_column_indexes(cp, col_ref.get()); - } - } +void metadata::setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset) +{ + CUDF_FUNC_RANGE(); + + // Flatten all columns into a single vector for easier task distribution + std::vector> all_column_chunks; + all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + all_column_chunks.emplace_back(std::ref(col)); } } - sanitize_schema(); + auto read_column_indexes = [page_index_bytes, min_offset](CompactProtocolReader& reader, + ColumnChunk& col) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + int64_t const offset = col.column_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.column_index_length); + col.column_index = ColumnIndex(); + reader.read(&col.column_index.value()); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + int64_t const offset = col.offset_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.offset_index_length); + col.offset_index = OffsetIndex(); + reader.read(&col.offset_index.value()); + } + }; + + // Use parallel processing only if we have enough columns to justify the overhead + constexpr std::size_t parallel_threshold = 256; + auto const total_column_chunks = all_column_chunks.size(); + if (total_column_chunks >= parallel_threshold) { + // Dynamically calculate number of tasks based the number or row groups and columns + constexpr std::size_t min_tasks = 4; + constexpr std::size_t max_tasks = 32; + auto const ratio = static_cast(total_column_chunks) / parallel_threshold; + // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements + // doubles both the number of tasks and the task size) + auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); + + auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); + auto const column_chunks_per_task = total_column_chunks / num_tasks; + auto const remainder = total_column_chunks % num_tasks; + + std::vector> tasks; + tasks.reserve(num_tasks); + + std::size_t start_idx = 0; + for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { + auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); + auto const end_idx = start_idx + task_size; + + if (start_idx >= total_column_chunks) break; + + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { + CompactProtocolReader local_cp; + + for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { + read_column_indexes(local_cp, all_column_chunks[i].get()); + } + })); + + start_idx = end_idx; + } + + for (auto& task : tasks) { + task.get(); + } + } else { + CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); + // For small numbers of columns, use sequential processing to avoid overhead + for (auto& col_ref : all_column_chunks) { + read_column_indexes(cp, col_ref.get()); + } + } } metadata::~metadata() @@ -448,8 +456,10 @@ std::vector aggregate_reader_metadata::metadatas_from_sources( std::vector> metadata_ctor_tasks; metadata_ctor_tasks.reserve(sources.size()); for (auto const& source : sources) { - metadata_ctor_tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [source = source.get(), read_page_indexes] { return metadata{source, read_page_indexes}; })); + metadata_ctor_tasks.emplace_back( + cudf::detail::host_worker_pool().submit_task([source = source.get(), read_page_indexes] { + return metadata{source, read_page_indexes}; + })); } std::vector metadatas; metadatas.reserve(sources.size()); @@ -472,7 +482,9 @@ aggregate_reader_metadata::collect_keyval_metadata() const std::transform(pfm.key_value_metadata.cbegin(), pfm.key_value_metadata.cend(), std::inserter(kv_map, kv_map.end()), - [](auto const& kv) { return std::pair{kv.key, kv.value}; }); + [](auto const& kv) { + return std::pair{kv.key, kv.value}; + }); return kv_map; }); diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4b431ce45bd..b49191b31bd 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -110,6 +110,8 @@ struct metadata : public FileMetaData { metadata& operator=(metadata&& other) = default; ~metadata(); + void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + void sanitize_schema(); }; From 8b85949bd8351f9f9abd923239e82df5885e3ca6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 25 Nov 2025 01:46:30 +0000 Subject: [PATCH 3/7] style fix --- cpp/src/io/parquet/reader_impl_helpers.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index f85caee96fd..3ef002c931e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -456,10 +456,8 @@ std::vector aggregate_reader_metadata::metadatas_from_sources( std::vector> metadata_ctor_tasks; metadata_ctor_tasks.reserve(sources.size()); for (auto const& source : sources) { - metadata_ctor_tasks.emplace_back( - cudf::detail::host_worker_pool().submit_task([source = source.get(), read_page_indexes] { - return metadata{source, read_page_indexes}; - })); + metadata_ctor_tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [source = source.get(), read_page_indexes] { return metadata{source, read_page_indexes}; })); } std::vector metadatas; metadatas.reserve(sources.size()); @@ -482,9 +480,7 @@ aggregate_reader_metadata::collect_keyval_metadata() const std::transform(pfm.key_value_metadata.cbegin(), pfm.key_value_metadata.cend(), std::inserter(kv_map, kv_map.end()), - [](auto const& kv) { - return std::pair{kv.key, kv.value}; - }); + [](auto const& kv) { return std::pair{kv.key, kv.value}; }); return kv_map; }); From e9e89767165f42e1d5bcac6ae4f8b81d03979255 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 25 Nov 2025 20:12:42 +0000 Subject: [PATCH 4/7] Minor improvements --- cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp | 4 ++-- cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp | 9 +++++++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index d6e55c842d1..f8408ca2fe5 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -76,7 +76,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(FileMetaData const& parquet : aggregate_reader_metadata_base({}, false, false) { // Just copy over the FileMetaData struct to the internal metadata struct - per_file_metadata.emplace_back(metadata{parquet_metadata}.get_file_metadata()); + per_file_metadata.emplace_back(metadata{parquet_metadata}); initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); } @@ -86,7 +86,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(cudf::host_span footer_bytes); explicit metadata(FileMetaData const& other) { static_cast(*this) = other; } - metadata_base get_file_metadata() && { return std::move(*this); } + metadata(metadata const& other) = delete; + metadata(metadata&& other) = default; + metadata& operator=(metadata const& other) = delete; + metadata& operator=(metadata&& other) = default; + + ~metadata() = default; }; class aggregate_reader_metadata : public aggregate_reader_metadata_base { diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index b49191b31bd..515688aa215 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -112,6 +112,7 @@ struct metadata : public FileMetaData { void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + protected: void sanitize_schema(); }; From 8b2e81aecf943d698d6948585ce76a84d1f27881 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Dec 2025 02:24:44 +0000 Subject: [PATCH 5/7] Style fix --- cpp/examples/hybrid_scan_io/CMakeLists.txt | 2 +- cpp/examples/hybrid_scan_io/common_utils.cpp | 11 +++++------ .../hybrid_scan_io/hybrid_scan_io_multithreaded.cpp | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/examples/hybrid_scan_io/CMakeLists.txt b/cpp/examples/hybrid_scan_io/CMakeLists.txt index f28ea62ae1a..677bc236613 100644 --- a/cpp/examples/hybrid_scan_io/CMakeLists.txt +++ b/cpp/examples/hybrid_scan_io/CMakeLists.txt @@ -43,7 +43,7 @@ install(TARGETS hybrid_scan_io DESTINATION bin/examples/libcudf/hybrid_scan_io) add_executable(hybrid_scan_io_multithreaded hybrid_scan_io_multithreaded.cpp) target_link_libraries( hybrid_scan_io_multithreaded PRIVATE cudf::cudf $ - $ + $ ) target_compile_features(hybrid_scan_io_multithreaded PRIVATE cxx_std_20) install(TARGETS hybrid_scan_io_multithreaded DESTINATION bin/examples/libcudf/hybrid_scan_io) diff --git a/cpp/examples/hybrid_scan_io/common_utils.cpp b/cpp/examples/hybrid_scan_io/common_utils.cpp index ffea7030671..58442d72141 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.cpp +++ b/cpp/examples/hybrid_scan_io/common_utils.cpp @@ -112,12 +112,11 @@ std::vector extract_input_sources(std::string const& paths, std::vector input_sources; input_sources.reserve(parquet_files.size()); // Transform input files to the specified io sources - std::transform(parquet_files.begin(), - parquet_files.end(), - std::back_inserter(input_sources), - [&](auto const& file_name) { - return io_source{file_name, io_source_type, stream}; - }); + std::transform( + parquet_files.begin(), + parquet_files.end(), + std::back_inserter(input_sources), + [&](auto const& file_name) { return io_source{file_name, io_source_type, stream}; }); stream.synchronize(); return input_sources; } diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp index 2c234b8ec8a..9528fac7a69 100644 --- a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp @@ -118,7 +118,7 @@ void inline print_usage() "\n" " " "\n" - " \n\n" + " \n\n" "Available IO source types: HOST_BUFFER, PINNED_BUFFER (Default)\n\n" "Note: Provide as many arguments as you like in the above order. Default values\n" From dabadfaac08023b2fef6503d2faa1c327a64eab9 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Dec 2025 21:03:00 +0000 Subject: [PATCH 6/7] Updates --- cpp/examples/hybrid_scan_io/common_utils.cpp | 33 +++++++++-------- cpp/examples/hybrid_scan_io/common_utils.hpp | 2 +- .../hybrid_scan_io_multithreaded.cpp | 36 +++++++++++-------- 3 files changed, 40 insertions(+), 31 deletions(-) diff --git a/cpp/examples/hybrid_scan_io/common_utils.cpp b/cpp/examples/hybrid_scan_io/common_utils.cpp index 58442d72141..1b4bf209f53 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.cpp +++ b/cpp/examples/hybrid_scan_io/common_utils.cpp @@ -86,7 +86,7 @@ std::vector extract_input_sources(std::string const& paths, // Current size of list of parquet files auto const initial_size = parquet_files.size(); - if (initial_size == 0) { return {}; } + if (initial_size == 0) { throw std::runtime_error("No input files to read. Exiting early.\n"); } // Reserve space parquet_files.reserve(std::max(thread_count, input_multiplier * parquet_files.size())); @@ -100,23 +100,26 @@ std::vector extract_input_sources(std::string const& paths, parquet_files.begin() + initial_size); }); - // Cycle append parquet files from the existing ones if less than the thread_count - std::cout << "Warning: Number of input sources < thread count. Cycling from\n" - "and appending to current input sources such that the number of\n" - "input source == thread count\n"; - for (size_t idx = 0; thread_count > static_cast(parquet_files.size()); idx++) { - parquet_files.emplace_back(parquet_files[idx % initial_size]); + if (parquet_files.size() < thread_count) { + // Cycle append parquet files from the existing ones if less than the thread_count + std::cout << "Warning: Number of input sources < thread count. Cycling from\n" + "and appending to current input sources such that the number of\n" + "input source == thread count\n"; + for (size_t idx = 0; thread_count > static_cast(parquet_files.size()); idx++) { + parquet_files.emplace_back(parquet_files[idx % initial_size]); + } } // Vector of io sources std::vector input_sources; input_sources.reserve(parquet_files.size()); // Transform input files to the specified io sources - std::transform( - parquet_files.begin(), - parquet_files.end(), - std::back_inserter(input_sources), - [&](auto const& file_name) { return io_source{file_name, io_source_type, stream}; }); + std::transform(parquet_files.begin(), + parquet_files.end(), + std::back_inserter(input_sources), + [&](auto const& file_name) { + return io_source{file_name, io_source_type, stream}; + }); stream.synchronize(); return input_sources; } @@ -283,7 +286,7 @@ std::unique_ptr combine_tables(std::unique_ptr filter_ */ template std::unique_ptr hybrid_scan(io_source const& io_source, - cudf::ast::operation const& filter_expression, + cudf::ast::expression const& filter_expression, std::unordered_set const& filters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -494,14 +497,14 @@ std::unique_ptr hybrid_scan(io_source const& io_source, // Explicit template instantiations template std::unique_ptr hybrid_scan( io_source const& io_source, - cudf::ast::operation const& filter_expression, + cudf::ast::expression const& filter_expression, std::unordered_set const& filters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); template std::unique_ptr hybrid_scan( io_source const& io_source, - cudf::ast::operation const& filter_expression, + cudf::ast::expression const& filter_expression, std::unordered_set const& filters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/common_utils.hpp b/cpp/examples/hybrid_scan_io/common_utils.hpp index c09fe3b4b37..5f29c1aa4ae 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.hpp +++ b/cpp/examples/hybrid_scan_io/common_utils.hpp @@ -102,7 +102,7 @@ void check_tables_equal(cudf::table_view const& lhs_table, */ template std::unique_ptr hybrid_scan(io_source const& io_source, - cudf::ast::operation const& filter_expression, + cudf::ast::expression const& filter_expression, std::unordered_set const& filters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp index 9528fac7a69..f2bfff20ced 100644 --- a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp @@ -7,6 +7,7 @@ #include "io_source.hpp" #include "timer.hpp" +#include #include #include #include @@ -42,7 +43,7 @@ */ struct hybrid_scan_fn { std::vector const& input_sources; - cudf::ast::operation const& filter_expression; + cudf::ast::expression const& filter_expression; std::unordered_set const& filters; int const thread_id; int const thread_count; @@ -76,7 +77,7 @@ struct hybrid_scan_fn { */ void hybrid_scan_multithreaded( std::vector const& input_sources, - cudf::ast::operation const& filter_expression, + std::vector const& filter_expressions, std::unordered_set const& filters, int32_t thread_count, rmm::cuda_stream_pool& stream_pool, @@ -85,17 +86,18 @@ void hybrid_scan_multithreaded( // Table reading tasks std::vector read_tasks; read_tasks.reserve(thread_count); - + auto const num_filter_expressions = filter_expressions.size(); // Create the read tasks std::for_each( thrust::make_counting_iterator(0), thrust::make_counting_iterator(thread_count), [&](auto tid) { - read_tasks.emplace_back(hybrid_scan_fn{.input_sources = input_sources, - .filter_expression = filter_expression, - .filters = filters, - .thread_id = tid, - .thread_count = thread_count, - .stream = stream_pool.get_stream(), - .mr = mr}); + read_tasks.emplace_back( + hybrid_scan_fn{.input_sources = input_sources, + .filter_expression = filter_expressions[tid % num_filter_expressions], + .filters = filters, + .thread_id = tid, + .thread_count = thread_count, + .stream = stream_pool.get_stream(), + .mr = mr}); }); // Create threads with tasks @@ -195,10 +197,14 @@ int32_t main(int argc, char const** argv) // Create filter expression auto const column_reference = cudf::ast::column_name_reference(column_name); - auto scalar = cudf::string_scalar(literal_value); - auto literal = cudf::ast::literal(scalar); - auto filter_expression = - cudf::ast::operation(cudf::ast::ast_operator::EQUAL, column_reference, literal); + auto scalar1 = cudf::string_scalar(literal_value); + auto literal1 = cudf::ast::literal(scalar1); + auto scalar2 = cudf::numeric_scalar(std::stoll(literal_value)); + auto literal2 = cudf::ast::literal(scalar2); + + std::vector filter_expressions; + filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal1); + filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal2); // Insert which filters to apply std::unordered_set filters; @@ -228,7 +234,7 @@ int32_t main(int argc, char const** argv) thrust::make_counting_iterator(num_reads), [&](auto i) { // Read parquet files and discard the tables hybrid_scan_multithreaded( - input_sources, filter_expression, filters, thread_count, stream_pool); + input_sources, filter_expressions, filters, thread_count, stream_pool); }); std::cout << "Total "; timer.print_elapsed_millis(); From a5d51a940883e284a08ca21aaf617e6a3593e38c Mon Sep 17 00:00:00 2001 From: anon Date: Wed, 3 Dec 2025 01:09:15 +0000 Subject: [PATCH 7/7] Don't use a pool --- cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp index f2bfff20ced..4d7a10358b1 100644 --- a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp @@ -170,7 +170,7 @@ int32_t main(int argc, char const** argv) } // Initialize mr, default stream and stream pool - bool constexpr is_pool_used = true; + bool constexpr is_pool_used = false; auto resource = create_memory_resource(is_pool_used); auto default_stream = cudf::get_default_stream(); auto stats_mr = @@ -203,8 +203,8 @@ int32_t main(int argc, char const** argv) auto literal2 = cudf::ast::literal(scalar2); std::vector filter_expressions; - filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal1); filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal2); + filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal1); // Insert which filters to apply std::unordered_set filters;