diff --git a/cpp/examples/hybrid_scan_io/CMakeLists.txt b/cpp/examples/hybrid_scan_io/CMakeLists.txt index 5f996a44538..677bc236613 100644 --- a/cpp/examples/hybrid_scan_io/CMakeLists.txt +++ b/cpp/examples/hybrid_scan_io/CMakeLists.txt @@ -39,6 +39,15 @@ target_link_libraries( target_compile_features(hybrid_scan_io PRIVATE cxx_std_20) install(TARGETS hybrid_scan_io DESTINATION bin/examples/libcudf/hybrid_scan_io) +# Build and install hybrid_scan_io_multithreaded +add_executable(hybrid_scan_io_multithreaded hybrid_scan_io_multithreaded.cpp) +target_link_libraries( + hybrid_scan_io_multithreaded PRIVATE cudf::cudf $ + $ +) +target_compile_features(hybrid_scan_io_multithreaded PRIVATE cxx_std_20) +install(TARGETS hybrid_scan_io_multithreaded DESTINATION bin/examples/libcudf/hybrid_scan_io) + # Install the example.parquet file install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf/hybrid_scan_io diff --git a/cpp/examples/hybrid_scan_io/common_utils.cpp b/cpp/examples/hybrid_scan_io/common_utils.cpp index 35f80dd6454..1b4bf209f53 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.cpp +++ b/cpp/examples/hybrid_scan_io/common_utils.cpp @@ -5,21 +5,29 @@ #include "common_utils.hpp" +#include "timer.hpp" + #include -#include +#include #include +#include #include #include #include #include +#include #include +#include #include #include #include #include +#include +#include #include +#include #include /** @@ -31,11 +39,91 @@ std::shared_ptr create_memory_resource(bool is_ { if (is_pool_used) { return rmm::mr::make_owning_wrapper( - std::make_shared(), rmm::percent_of_free_device_memory(50)); + std::make_shared(), rmm::percent_of_free_device_memory(80)); } return std::make_shared(); } +std::vector extract_input_sources(std::string const& paths, + int32_t input_multiplier, + int32_t thread_count, + io_source_type io_source_type, + rmm::cuda_stream_view stream) +{ + // Get the delimited paths to directory and/or files. + std::vector const delimited_paths = [&]() { + std::vector paths_list; + std::stringstream strstream{paths}; + std::string path; + // Extract the delimited paths. + while (std::getline(strstream, path, char{','})) { + paths_list.push_back(path); + } + return paths_list; + }(); + + // List of parquet files + std::vector parquet_files; + std::for_each(delimited_paths.cbegin(), delimited_paths.cend(), [&](auto const& path_string) { + auto const path = std::filesystem::path{path_string}; + // If this is a parquet file, add it. + if (std::filesystem::is_regular_file(path)) { + parquet_files.push_back(path_string); + } + // If this is a directory, add all files in the directory. + else if (std::filesystem::is_directory(path)) { + for (auto const& file : std::filesystem::directory_iterator(path)) { + if (std::filesystem::is_regular_file(file.path())) { + parquet_files.push_back(file.path().string()); + } else { + std::cout << "Skipping sub-directory: " << file.path().string() << "\n"; + } + } + } else { + throw std::runtime_error("Encountered an invalid input path\n"); + } + }); + + // Current size of list of parquet files + auto const initial_size = parquet_files.size(); + if (initial_size == 0) { throw std::runtime_error("No input files to read. Exiting early.\n"); } + + // Reserve space + parquet_files.reserve(std::max(thread_count, input_multiplier * parquet_files.size())); + + // Append the input files by input_multiplier times + std::for_each(thrust::make_counting_iterator(1), + thrust::make_counting_iterator(input_multiplier), + [&](auto i) { + parquet_files.insert(parquet_files.end(), + parquet_files.begin(), + parquet_files.begin() + initial_size); + }); + + if (parquet_files.size() < thread_count) { + // Cycle append parquet files from the existing ones if less than the thread_count + std::cout << "Warning: Number of input sources < thread count. Cycling from\n" + "and appending to current input sources such that the number of\n" + "input source == thread count\n"; + for (size_t idx = 0; thread_count > static_cast(parquet_files.size()); idx++) { + parquet_files.emplace_back(parquet_files[idx % initial_size]); + } + } + + // Vector of io sources + std::vector input_sources; + input_sources.reserve(parquet_files.size()); + // Transform input files to the specified io sources + std::transform(parquet_files.begin(), + parquet_files.end(), + std::back_inserter(input_sources), + [&](auto const& file_name) { + return io_source{file_name, io_source_type, stream}; + }); + stream.synchronize(); + return input_sources; +} + cudf::ast::operation create_filter_expression(std::string const& column_name, std::string const& literal_value) { @@ -45,21 +133,6 @@ cudf::ast::operation create_filter_expression(std::string const& column_name, return cudf::ast::operation(cudf::ast::ast_operator::EQUAL, column_reference, literal); } -std::unique_ptr combine_tables(std::unique_ptr filter_table, - std::unique_ptr payload_table) -{ - auto filter_columns = filter_table->release(); - auto payload_columns = payload_table->release(); - - auto all_columns = std::vector>{}; - all_columns.reserve(filter_columns.size() + payload_columns.size()); - std::move(filter_columns.begin(), filter_columns.end(), std::back_inserter(all_columns)); - std::move(payload_columns.begin(), payload_columns.end(), std::back_inserter(all_columns)); - auto table = std::make_unique(std::move(all_columns)); - - return table; -} - void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table, rmm::cuda_stream_view stream) @@ -91,6 +164,14 @@ void check_tables_equal(cudf::table_view const& lhs_table, } } +namespace { + +/** + * @brief Fetches a host span of Parquet footer bytes from the input buffer span + * + * @param buffer Input buffer span + * @return A host span of the footer bytes + */ cudf::host_span fetch_footer_bytes(cudf::host_span buffer) { CUDF_FUNC_RANGE(); @@ -117,6 +198,13 @@ cudf::host_span fetch_footer_bytes(cudf::host_span ender->footer_len); } +/** + * @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span + * + * @param buffer Input buffer span + * @param page_index_bytes Byte range of `PageIndex` to fetch + * @return A host span of the PageIndex bytes + */ cudf::host_span fetch_page_index_bytes( cudf::host_span buffer, cudf::io::text::byte_range_info const page_index_bytes) { @@ -125,6 +213,16 @@ cudf::host_span fetch_page_index_bytes( page_index_bytes.size()); } +/** + * @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers + * + * @param host_buffer Host buffer span + * @param byte_ranges Byte ranges to fetch + * @param stream CUDA stream + * @param mr Device memory resource to create device buffers with + * + * @return Vector of device buffers + */ std::vector fetch_byte_ranges( cudf::host_span host_buffer, cudf::host_span byte_ranges, @@ -149,3 +247,264 @@ std::vector fetch_byte_ranges( stream.synchronize_no_throw(); return buffers; } + +/** + * @brief Combine columns from filter and payload tables into a single table + * + * @param filter_table Filter table + * @param payload_table Payload table + * @return Combined table + */ +std::unique_ptr combine_tables(std::unique_ptr filter_table, + std::unique_ptr payload_table) +{ + auto filter_columns = filter_table->release(); + auto payload_columns = payload_table->release(); + + auto all_columns = std::vector>{}; + all_columns.reserve(filter_columns.size() + payload_columns.size()); + std::move(filter_columns.begin(), filter_columns.end(), std::back_inserter(all_columns)); + std::move(payload_columns.begin(), payload_columns.end(), std::back_inserter(all_columns)); + auto table = std::make_unique(std::move(all_columns)); + + return table; +} + +} // namespace + +/** + * @brief Read parquet file with the next-gen parquet reader + * + * @param io_source io source to read + * @param filter_expression Filter expression + * @param filters Set of parquet filters to apply + * @param stream CUDA stream for hybrid scan reader + * @param mr Device memory resource + * + * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final + * row validity column + */ +template +std::unique_ptr hybrid_scan(io_source const& io_source, + cudf::ast::expression const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto options = cudf::io::parquet_reader_options::builder().filter(filter_expression).build(); + + // Input file buffer span + auto const file_buffer_span = io_source.get_host_buffer_span(); + + if constexpr (print_progress) { std::cout << "\nREADER: Setup, metadata and page index...\n"; } + timer timer; + + // Fetch footer bytes and setup reader + auto const footer_buffer = fetch_footer_bytes(file_buffer_span); + auto const reader = + std::make_unique(footer_buffer, options); + + // Get page index byte range from the reader + auto const page_index_byte_range = reader->page_index_byte_range(); + auto const page_index_buffer = fetch_page_index_bytes(file_buffer_span, page_index_byte_range); + reader->setup_page_index(page_index_buffer); + + // Get all row groups from the reader + auto input_row_group_indices = reader->all_row_groups(options); + auto current_row_group_indices = cudf::host_span(input_row_group_indices); + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + + // Filter row groups with stats + auto stats_filtered_row_group_indices = std::vector{}; + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_STATS)) { + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with stats...\n"; + timer.reset(); + } + stats_filtered_row_group_indices = + reader->filter_row_groups_with_stats(current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = stats_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } + + std::vector bloom_filter_byte_ranges; + std::vector dict_page_byte_ranges; + + // Get bloom filter and dictionary page byte ranges from the reader + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) or + filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS)) { + if constexpr (print_progress) { + std::cout << "READER: Get bloom filter and dictionary page byte ranges...\n"; + timer.reset(); + } + std::tie(bloom_filter_byte_ranges, dict_page_byte_ranges) = + reader->secondary_filters_byte_ranges(current_row_group_indices, options); + if constexpr (print_progress) { timer.print_elapsed_millis(); } + } + + // Filter row groups with dictionary pages + std::vector dictionary_page_filtered_row_group_indices; + dictionary_page_filtered_row_group_indices.reserve(current_row_group_indices.size()); + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) and + dict_page_byte_ranges.size()) { + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with dictionary pages...\n"; + timer.reset(); + } + // Fetch dictionary page buffers from the input file buffer + std::vector dictionary_page_buffers = + fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr); + dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages( + dictionary_page_buffers, current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = dictionary_page_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Row group filtering with dictionary pages...\n\n"; + } + } + + // Filter row groups with bloom filters + std::vector bloom_filtered_row_group_indices; + bloom_filtered_row_group_indices.reserve(current_row_group_indices.size()); + if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS) and + bloom_filter_byte_ranges.size()) { + // Fetch 32 byte aligned bloom filter data buffers from the input file buffer + auto constexpr bloom_filter_alignment = rmm::CUDA_ALLOCATION_ALIGNMENT; + auto aligned_mr = rmm::mr::aligned_resource_adaptor( + mr, bloom_filter_alignment); + if constexpr (print_progress) { + std::cout << "READER: Filter row groups with bloom filters...\n"; + timer.reset(); + } + std::vector bloom_filter_data = + fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr); + // Filter row groups with bloom filters + bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters( + bloom_filter_data, current_row_group_indices, options, stream); + + // Update current row group indices + current_row_group_indices = bloom_filtered_row_group_indices; + if constexpr (print_progress) { + std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; + timer.print_elapsed_millis(); + } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Row group filtering with bloom filters...\n\n"; + } + } + + // Check whether to prune filter column data pages + using cudf::io::parquet::experimental::use_data_page_mask; + auto const prune_filter_data_pages = + filters.contains(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); + + auto row_mask = std::unique_ptr{}; + if (prune_filter_data_pages) { + if constexpr (print_progress) { + std::cout << "READER: Filter data pages of filter columns with page index stats...\n"; + timer.reset(); + } + // Filter data pages with page index stats + row_mask = + reader->build_row_mask_with_page_index_stats(current_row_group_indices, options, stream, mr); + if constexpr (print_progress) { timer.print_elapsed_millis(); } + } else { + if constexpr (print_progress) { + std::cout << "SKIP: Filter column data page filtering with page index stats...\n\n"; + } + auto num_rows = reader->total_rows_in_row_groups(current_row_group_indices); + row_mask = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::BOOL8}, num_rows, rmm::device_buffer{}, 0, stream, mr); + } + + if constexpr (print_progress) { + std::cout << "READER: Materialize filter columns...\n"; + timer.reset(); + } + // Get column chunk byte ranges from the reader + auto const filter_column_chunk_byte_ranges = + reader->filter_column_chunks_byte_ranges(current_row_group_indices, options); + auto filter_column_chunk_buffers = + fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr); + + // Materialize the table with only the filter columns + auto row_mask_mutable_view = row_mask->mutable_view(); + auto filter_table = + reader + ->materialize_filter_columns( + current_row_group_indices, + std::move(filter_column_chunk_buffers), + row_mask_mutable_view, + prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, + options, + stream) + .tbl; + if constexpr (print_progress) { timer.print_elapsed_millis(); } + + // Check whether to prune payload column data pages + auto const prune_payload_data_pages = + filters.contains(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); + + if constexpr (print_progress) { + if (prune_payload_data_pages) { + std::cout << "READER: Filter data pages of payload columns with row mask...\n"; + } else { + std::cout << "SKIP: Payload column data page filtering with row mask...\n\n"; + } + + std::cout << "READER: Materialize payload columns...\n"; + timer.reset(); + } + // Get column chunk byte ranges from the reader + auto const payload_column_chunk_byte_ranges = + reader->payload_column_chunks_byte_ranges(current_row_group_indices, options); + auto payload_column_chunk_buffers = + fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr); + + // Materialize the table with only the payload columns + auto payload_table = + reader + ->materialize_payload_columns( + current_row_group_indices, + std::move(payload_column_chunk_buffers), + row_mask->view(), + prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, + options, + stream) + .tbl; + if constexpr (print_progress) { timer.print_elapsed_millis(); } + + return combine_tables(std::move(filter_table), std::move(payload_table)); +} + +// Explicit template instantiations +template std::unique_ptr hybrid_scan( + io_source const& io_source, + cudf::ast::expression const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template std::unique_ptr hybrid_scan( + io_source const& io_source, + cudf::ast::expression const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/common_utils.hpp b/cpp/examples/hybrid_scan_io/common_utils.hpp index 2be0f80fdac..5f29c1aa4ae 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.hpp +++ b/cpp/examples/hybrid_scan_io/common_utils.hpp @@ -5,6 +5,8 @@ #pragma once +#include "io_source.hpp" + #include #include #include @@ -13,12 +15,24 @@ #include #include +#include /** * @file common_utils.hpp * @brief Utilities for `hybrid_scan_io` example */ +/** + * @brief Enum to represent the available parquet filters + */ +enum class parquet_filter_type : uint8_t { + ROW_GROUPS_WITH_STATS = 0, + ROW_GROUPS_WITH_DICT_PAGES = 1, + ROW_GROUPS_WITH_BLOOM_FILTERS = 2, + FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3, + PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4, +}; + /** * @brief Create memory resource for libcudf functions * @@ -27,6 +41,29 @@ */ std::shared_ptr create_memory_resource(bool is_pool_used); +/** + * @brief Function to process comma delimited input paths string to parquet files and/or dirs + * and convert them to specified io sources. + * + * Process the input path string containing directories (of parquet files) and/or individual + * parquet files into a list of input parquet files, multiple the list by `input_multiplier`, + * make sure to have at least `thread_count` files to satisfy at least file per parallel thread, + * and convert the final list of files to a list of `io_source` and return. + * + * @param paths Comma delimited input paths string + * @param input_multiplier Multiplier for the input files list + * @param thread_count Number of threads being used in the example + * @param io_source_type Specified IO source type to convert input files to + * @param stream CUDA stream to use + * + * @return Vector of input sources for the given paths + */ +[[nodiscard]] std::vector extract_input_sources(std::string const& paths, + int32_t input_multiplier, + int32_t thread_count, + io_source_type io_source_type, + rmm::cuda_stream_view stream); + /** * @brief Create a filter expression of the form `column_name == literal` for string type point * lookups @@ -38,16 +75,6 @@ std::shared_ptr create_memory_resource(bool is_ cudf::ast::operation create_filter_expression(std::string const& column_name, std::string const& literal_value); -/** - * @brief Combine columns from filter and payload tables into a single table - * - * @param filter_table Filter table - * @param payload_table Payload table - * @return Combined table - */ -std::unique_ptr combine_tables(std::unique_ptr filter_table, - std::unique_ptr payload_table); - /** * @brief Check if two tables are identical, throw an error otherwise * @@ -60,34 +87,22 @@ void check_tables_equal(cudf::table_view const& lhs_table, rmm::cuda_stream_view stream = cudf::get_default_stream()); /** - * @brief Fetches a host span of Parquet footer bytes from the input buffer span - * - * @param buffer Input buffer span - * @return A host span of the footer bytes - */ -cudf::host_span fetch_footer_bytes(cudf::host_span buffer); -/** - * @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span + * @brief Read parquet file with the next-gen parquet reader * - * @param buffer Input buffer span - * @param page_index_bytes Byte range of `PageIndex` to fetch - * @return A host span of the PageIndex bytes - */ -cudf::host_span fetch_page_index_bytes( - cudf::host_span buffer, cudf::io::text::byte_range_info const page_index_bytes); - -/** - * @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers + * @tparam print_progress Boolean indicating whether to print progress * - * @param host_buffer Host buffer span - * @param byte_ranges Byte ranges to fetch - * @param stream CUDA stream - * @param mr Device memory resource to create device buffers with + * @param io_source io source to read + * @param filter_expression Filter expression + * @param filters Set of parquet filters to apply + * @param stream CUDA stream for hybrid scan reader + * @param mr Device memory resource * - * @return Vector of device buffers + * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final + * row validity column */ -std::vector fetch_byte_ranges( - cudf::host_span host_buffer, - cudf::host_span byte_ranges, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +template +std::unique_ptr hybrid_scan(io_source const& io_source, + cudf::ast::expression const& filter_expression, + std::unordered_set const& filters, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp index ce8b545a082..b18fe47d790 100644 --- a/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp @@ -7,14 +7,11 @@ #include "io_source.hpp" #include "timer.hpp" -#include #include #include #include #include -#include -#include #include #include @@ -51,212 +48,6 @@ cudf::io::table_with_metadata read_parquet(io_source const& io_source, return cudf::io::read_parquet(options); } -/** - * @brief Enum to represent the available parquet filters - */ -enum class parquet_filter_type : uint8_t { - ROW_GROUPS_WITH_STATS = 0, - ROW_GROUPS_WITH_DICT_PAGES = 1, - ROW_GROUPS_WITH_BLOOM_FILTERS = 2, - FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3, - PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4, -}; - -/** - * @brief Read parquet file with the next-gen parquet reader - * - * @param io_source io source to read - * @param filter_expression Filter expression - * @param filters Set of parquet filters to apply - * @param stream CUDA stream for hybrid scan reader - * @param mr Device memory resource - * - * @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final - * row validity column - */ -auto hybrid_scan(io_source const& io_source, - cudf::ast::operation const& filter_expression, - std::unordered_set const& filters, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - auto options = cudf::io::parquet_reader_options::builder().filter(filter_expression).build(); - - // Input file buffer span - auto const file_buffer_span = io_source.get_host_buffer_span(); - - std::cout << "\nREADER: Setup, metadata and page index...\n"; - timer timer; - - // Fetch footer bytes and setup reader - auto const footer_buffer = fetch_footer_bytes(file_buffer_span); - auto const reader = - std::make_unique(footer_buffer, options); - - // Get page index byte range from the reader - auto const page_index_byte_range = reader->page_index_byte_range(); - auto const page_index_buffer = fetch_page_index_bytes(file_buffer_span, page_index_byte_range); - reader->setup_page_index(page_index_buffer); - - // Get all row groups from the reader - auto input_row_group_indices = reader->all_row_groups(options); - auto current_row_group_indices = cudf::host_span(input_row_group_indices); - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - - timer.print_elapsed_millis(); - - // Filter row groups with stats - auto stats_filtered_row_group_indices = std::vector{}; - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_STATS)) { - std::cout << "READER: Filter row groups with stats...\n"; - timer.reset(); - stats_filtered_row_group_indices = - reader->filter_row_groups_with_stats(current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = stats_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } - - std::vector bloom_filter_byte_ranges; - std::vector dict_page_byte_ranges; - - // Get bloom filter and dictionary page byte ranges from the reader - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) or - filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS)) { - std::cout << "READER: Get bloom filter and dictionary page byte ranges...\n"; - timer.reset(); - std::tie(bloom_filter_byte_ranges, dict_page_byte_ranges) = - reader->secondary_filters_byte_ranges(current_row_group_indices, options); - timer.print_elapsed_millis(); - } - - // Filter row groups with dictionary pages - std::vector dictionary_page_filtered_row_group_indices; - dictionary_page_filtered_row_group_indices.reserve(current_row_group_indices.size()); - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) and - dict_page_byte_ranges.size()) { - std::cout << "READER: Filter row groups with dictionary pages...\n"; - timer.reset(); - // Fetch dictionary page buffers from the input file buffer - std::vector dictionary_page_buffers = - fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr); - dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages( - dictionary_page_buffers, current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = dictionary_page_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Row group filtering with dictionary pages...\n\n"; - } - - // Filter row groups with bloom filters - std::vector bloom_filtered_row_group_indices; - bloom_filtered_row_group_indices.reserve(current_row_group_indices.size()); - if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS) and - bloom_filter_byte_ranges.size()) { - // Fetch 32 byte aligned bloom filter data buffers from the input file buffer - auto constexpr bloom_filter_alignment = rmm::CUDA_ALLOCATION_ALIGNMENT; - auto aligned_mr = rmm::mr::aligned_resource_adaptor( - mr, bloom_filter_alignment); - std::cout << "READER: Filter row groups with bloom filters...\n"; - timer.reset(); - std::vector bloom_filter_data = - fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr); - // Filter row groups with bloom filters - bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters( - bloom_filter_data, current_row_group_indices, options, stream); - - // Update current row group indices - current_row_group_indices = bloom_filtered_row_group_indices; - std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n"; - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Row group filtering with bloom filters...\n\n"; - } - - // Check whether to prune filter column data pages - using cudf::io::parquet::experimental::use_data_page_mask; - auto const prune_filter_data_pages = - filters.contains(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); - - auto row_mask = std::unique_ptr{}; - if (prune_filter_data_pages) { - std::cout << "READER: Filter data pages of filter columns with page index stats...\n"; - timer.reset(); - // Filter data pages with page index stats - row_mask = - reader->build_row_mask_with_page_index_stats(current_row_group_indices, options, stream, mr); - timer.print_elapsed_millis(); - } else { - std::cout << "SKIP: Filter column data page filtering with page index stats...\n\n"; - auto num_rows = reader->total_rows_in_row_groups(current_row_group_indices); - row_mask = cudf::make_numeric_column( - cudf::data_type{cudf::type_id::BOOL8}, num_rows, rmm::device_buffer{}, 0, stream, mr); - } - - std::cout << "READER: Materialize filter columns...\n"; - timer.reset(); - // Get column chunk byte ranges from the reader - auto const filter_column_chunk_byte_ranges = - reader->filter_column_chunks_byte_ranges(current_row_group_indices, options); - auto filter_column_chunk_buffers = - fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr); - - // Materialize the table with only the filter columns - auto row_mask_mutable_view = row_mask->mutable_view(); - auto filter_table = - reader - ->materialize_filter_columns( - current_row_group_indices, - std::move(filter_column_chunk_buffers), - row_mask_mutable_view, - prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, - options, - stream) - .tbl; - timer.print_elapsed_millis(); - - // Check whether to prune payload column data pages - auto const prune_payload_data_pages = - filters.contains(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); - - if (prune_payload_data_pages) { - std::cout << "READER: Filter data pages of payload columns with row mask...\n"; - } else { - std::cout << "SKIP: Payload column data page filtering with row mask...\n\n"; - } - - std::cout << "READER: Materialize payload columns...\n"; - timer.reset(); - // Get column chunk byte ranges from the reader - auto const payload_column_chunk_byte_ranges = - reader->payload_column_chunks_byte_ranges(current_row_group_indices, options); - auto payload_column_chunk_buffers = - fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr); - - // Materialize the table with only the payload columns - auto payload_table = - reader - ->materialize_payload_columns( - current_row_group_indices, - std::move(payload_column_chunk_buffers), - row_mask->view(), - prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO, - options, - stream) - .tbl; - timer.print_elapsed_millis(); - - return std::make_tuple(combine_tables(std::move(filter_table), std::move(payload_table)), - std::move(row_mask)); -} - /** * @brief Function to print example usage and argument information. */ @@ -353,13 +144,13 @@ int main(int argc, char const** argv) timer timer; std::cout << "Reading " << input_filepath << " with next-gen parquet reader...\n"; timer.reset(); - auto [table_next_gen_reader, row_mask] = - hybrid_scan(data_source, filter_expression, filters, stream, stats_mr); + auto const table_next_gen_reader = + hybrid_scan(data_source, filter_expression, filters, stream, stats_mr); timer.print_elapsed_millis(); std::cout << "Reading " << input_filepath << " with main parquet reader...\n"; timer.reset(); - auto [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream); + auto const [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream); timer.print_elapsed_millis(); // Check for validity diff --git a/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp new file mode 100644 index 00000000000..4d7a10358b1 --- /dev/null +++ b/cpp/examples/hybrid_scan_io/hybrid_scan_io_multithreaded.cpp @@ -0,0 +1,247 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common_utils.hpp" +#include "io_source.hpp" +#include "timer.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +/** + * @file hybrid_scan_io_multithreaded.cpp + * + * @brief Demonstrates reading parquet data from the specified io source with libcudf's next-gen + * parquet reader (hybrid scan reader) subject to a highly selective point lookup (col_name == + * literal) filter using multiple threads. + * + * The input parquet data is provided via files which are converted to the specified io source type + * to be read using multiple threads. Optionally, the parquet data read by each thread can be + * written to corresponding files and checked for validatity of the output files against the input + * data. + * + * Run: ``hybrid_scan_io_multithreaded -h`` to see help with input args and more information. + * + * The following io source types are supported: + * IO source types: HOST_BUFFER, PINNED_BUFFER + */ + +/** + * @brief Functor for multithreaded parquet reading based on the provided read_mode + */ +struct hybrid_scan_fn { + std::vector const& input_sources; + cudf::ast::expression const& filter_expression; + std::unordered_set const& filters; + int const thread_id; + int const thread_count; + rmm::cuda_stream_view stream; + rmm::device_async_resource_ref mr; + void operator()() + { + // Sweep the available input files + for (auto curr_file_idx = thread_id; curr_file_idx < input_sources.size(); + curr_file_idx += thread_count) { + timer timer; + std::ignore = + hybrid_scan(input_sources[curr_file_idx], filter_expression, filters, stream, mr); + std::cout << "Thread " << thread_id << " "; + timer.print_elapsed_millis(); + } + + // Just synchronize this stream and exit + stream.synchronize_no_throw(); + } +}; + +/** + * @brief Function to setup and launch multithreaded hybrid scan reading. + * + * @param input_sources List of input sources to read + * @param filter_expression Filter expression + * @param filters Filters to apply + * @param thread_count Number of threads + * @param stream_pool CUDA stream pool to use for threads + */ +void hybrid_scan_multithreaded( + std::vector const& input_sources, + std::vector const& filter_expressions, + std::unordered_set const& filters, + int32_t thread_count, + rmm::cuda_stream_pool& stream_pool, + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) +{ + // Table reading tasks + std::vector read_tasks; + read_tasks.reserve(thread_count); + auto const num_filter_expressions = filter_expressions.size(); + // Create the read tasks + std::for_each( + thrust::make_counting_iterator(0), thrust::make_counting_iterator(thread_count), [&](auto tid) { + read_tasks.emplace_back( + hybrid_scan_fn{.input_sources = input_sources, + .filter_expression = filter_expressions[tid % num_filter_expressions], + .filters = filters, + .thread_id = tid, + .thread_count = thread_count, + .stream = stream_pool.get_stream(), + .mr = mr}); + }); + + // Create threads with tasks + std::vector threads; + threads.reserve(thread_count); + for (auto& c : read_tasks) { + threads.emplace_back(c); + } + for (auto& t : threads) { + t.join(); + } +} + +/** + * @brief Function to print example usage and argument information + */ +void inline print_usage() +{ + std::cout << "\nUsage: hybrid_scan_io_multithreaded " + "\n" + " " + "\n" + " \n\n" + + "Available IO source types: HOST_BUFFER, PINNED_BUFFER (Default)\n\n" + "Note: Provide as many arguments as you like in the above order. Default values\n" + " for the unprovided arguments will be used. All input parquet files will\n" + " be converted to the specified IO source type before reading\n\n"; +} + +/** + * @brief The main function + */ +int32_t main(int argc, char const** argv) +{ + constexpr int32_t max_thread_count = 64; + + // Set arguments to defaults + std::string input_paths = "example.parquet"; + int32_t input_multiplier = 1; + int32_t thread_count = 1; + int32_t num_reads = 1; + auto column_name = std::string{"string_col"}; + auto literal_value = std::string{"0000001"}; + io_source_type io_source_type = io_source_type::PINNED_BUFFER; + + // Set to the provided args + switch (argc) { + case 8: io_source_type = get_io_source_type(argv[7]); [[fallthrough]]; + case 7: literal_value = argv[6]; [[fallthrough]]; + case 6: column_name = argv[5]; [[fallthrough]]; + case 5: num_reads = std::max(1, std::stoi(argv[4])); [[fallthrough]]; + case 4: + thread_count = + std::min(max_thread_count, std::max(thread_count, std::stoi(std::string{argv[3]}))); + [[fallthrough]]; + case 3: + input_multiplier = + std::min(max_thread_count, std::max(input_multiplier, std::stoi(std::string{argv[2]}))); + [[fallthrough]]; + case 2: + // Check if instead of input_paths, the first argument is `-h` or `--help` + if (auto arg = std::string{argv[1]}; + arg != "-h" and arg != "--help" and not arg.starts_with("-")) { + input_paths = std::move(arg); + break; + } + [[fallthrough]]; + default: print_usage(); throw std::runtime_error("Exiting..."); + } + + // Initialize mr, default stream and stream pool + bool constexpr is_pool_used = false; + auto resource = create_memory_resource(is_pool_used); + auto default_stream = cudf::get_default_stream(); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + + // List of input sources from the input_paths string. + auto const input_sources = [&]() { + try { + return extract_input_sources( + input_paths, input_multiplier, thread_count, io_source_type, default_stream); + default_stream.synchronize(); + } catch (const std::exception& e) { + print_usage(); + throw std::runtime_error(e.what()); + } + }(); + + // Check if there is nothing to do + if (input_sources.empty()) { + print_usage(); + throw std::runtime_error("No input files to read. Exiting early.\n"); + } + + // Create filter expression + auto const column_reference = cudf::ast::column_name_reference(column_name); + auto scalar1 = cudf::string_scalar(literal_value); + auto literal1 = cudf::ast::literal(scalar1); + auto scalar2 = cudf::numeric_scalar(std::stoll(literal_value)); + auto literal2 = cudf::ast::literal(scalar2); + + std::vector filter_expressions; + filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal2); + filter_expressions.emplace_back(cudf::ast::ast_operator::EQUAL, column_reference, literal1); + + // Insert which filters to apply + std::unordered_set filters; + { + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_STATS); + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES); + filters.insert(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS); + // Deliberately disabled as it has a high cost to benefit ratio + // filters.insert(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX); + filters.insert(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK); + } + + // Read the same parquet files specified times with multiple threads and discard the read tables + { + std::cout << "\nReading " << input_sources.size() << " input sources " << num_reads + << " time(s) using " << thread_count + << " threads and discarding output " + "tables..\n"; + + std::cout << "Note that the first read may include times for nvcomp, cufile loading and RMM " + "growth.\n\n"; + + auto stream_pool = rmm::cuda_stream_pool(thread_count); + + timer timer; + std::for_each(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_reads), + [&](auto i) { // Read parquet files and discard the tables + hybrid_scan_multithreaded( + input_sources, filter_expressions, filters, thread_count, stream_pool); + }); + std::cout << "Total "; + timer.print_elapsed_millis(); + } + + // Print peak memory + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1'048'576.0) << " MB\n\n"; + + return 0; +} diff --git a/cpp/examples/hybrid_scan_io/io_source.hpp b/cpp/examples/hybrid_scan_io/io_source.hpp index 76d4262ce7f..6fbf57340f4 100644 --- a/cpp/examples/hybrid_scan_io/io_source.hpp +++ b/cpp/examples/hybrid_scan_io/io_source.hpp @@ -24,7 +24,7 @@ /** * @brief Available IO source types */ -enum class io_source_type { HOST_BUFFER, PINNED_BUFFER }; +enum class io_source_type : uint8_t { HOST_BUFFER, PINNED_BUFFER }; /** * @brief Get io source type from the string keyword argument diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index 3818e5ed36f..f8408ca2fe5 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -76,7 +76,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(FileMetaData const& parquet : aggregate_reader_metadata_base({}, false, false) { // Just copy over the FileMetaData struct to the internal metadata struct - per_file_metadata.emplace_back(metadata{parquet_metadata}.get_file_metadata()); + per_file_metadata.emplace_back(metadata{parquet_metadata}); initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); } @@ -86,7 +86,7 @@ aggregate_reader_metadata::aggregate_reader_metadata(cudf::host_span return; } - auto& row_groups = per_file_metadata.front().row_groups; - - CUDF_EXPECTS(not row_groups.empty() and not row_groups.front().columns.empty(), - "No column chunks in Parquet schema to read page index for"); - - CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); - + // Get the file metadata and setup the page index + auto& file_metadata = per_file_metadata.front(); // Set the first ColumnChunk's offset of ColumnIndex as the adjusted zero offset - int64_t const min_offset = row_groups.front().columns.front().column_index_offset; - // now loop over row groups - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - // Read the ColumnIndex for this ColumnChunk - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.column_index_length); - ColumnIndex ci; - cp.read(&ci); - col.column_index = std::move(ci); - } - // Read the OffsetIndex for this ColumnChunk - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.offset_index_length); - OffsetIndex oi; - cp.read(&oi); - col.offset_index = std::move(oi); - } - } - } + int64_t const min_offset = file_metadata.row_groups.front().columns.front().column_index_offset; + + file_metadata.setup_page_index(page_index_bytes, min_offset); } size_type aggregate_reader_metadata::total_rows_in_row_groups( diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp index bce18919003..3adfd93c1af 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp @@ -33,10 +33,15 @@ using parquet::detail::row_group_info; /** * @brief Class for parsing dataset metadata */ -struct metadata : private metadata_base { +struct metadata : public metadata_base { explicit metadata(cudf::host_span footer_bytes); explicit metadata(FileMetaData const& other) { static_cast(*this) = other; } - metadata_base get_file_metadata() && { return std::move(*this); } + metadata(metadata const& other) = delete; + metadata(metadata&& other) = default; + metadata& operator=(metadata const& other) = delete; + metadata& operator=(metadata&& other) = default; + + ~metadata() = default; }; class aggregate_reader_metadata : public aggregate_reader_metadata_base { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 9c73f5c22d8..e3dfacd2156 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -340,85 +340,93 @@ metadata::metadata(datasource* source, bool read_page_indexes) auto const& last_col = row_groups.back().columns.back(); int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length; - if (max_offset > 0) { - int64_t const length = max_offset - min_offset; - auto const idx_buf = source->host_read(min_offset, length); - // Flatten all columns into a single vector for easier task distribution - std::vector> all_column_chunks; - all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - all_column_chunks.emplace_back(std::ref(col)); - } - } + if (max_offset > min_offset) { + size_t const length = max_offset - min_offset; + auto const page_idx_buf = source->host_read(min_offset, length); + setup_page_index({page_idx_buf->data(), length}, min_offset); + } + } - auto read_column_indexes = [&idx_buf, min_offset](CompactProtocolReader& reader, - ColumnChunk& col) { - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.column_index_length); - col.column_index = ColumnIndex(); - reader.read(&col.column_index.value()); - } - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.offset_index_length); - col.offset_index = OffsetIndex(); - reader.read(&col.offset_index.value()); - } - }; - - // Use parallel processing only if we have enough columns to justify the overhead - constexpr std::size_t parallel_threshold = 512; - auto const total_column_chunks = all_column_chunks.size(); - if (total_column_chunks >= parallel_threshold) { - // Dynamically calculate number of tasks based the number or row groups and columns - constexpr std::size_t min_tasks = 4; - constexpr std::size_t max_tasks = 32; - auto const ratio = static_cast(total_column_chunks) / parallel_threshold; - // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements - // doubles both the number of tasks and the task size) - auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); - - auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); - auto const column_chunks_per_task = total_column_chunks / num_tasks; - auto const remainder = total_column_chunks % num_tasks; - - std::vector> tasks; - tasks.reserve(num_tasks); - - std::size_t start_idx = 0; - for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { - auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); - auto const end_idx = start_idx + task_size; - - if (start_idx >= total_column_chunks) break; - - tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { - CompactProtocolReader local_cp; - - for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { - read_column_indexes(local_cp, all_column_chunks[i].get()); - } - })); - - start_idx = end_idx; - } + sanitize_schema(); +} - for (auto& task : tasks) { - task.get(); - } - } else { - // For small numbers of columns, use sequential processing to avoid overhead - for (auto& col_ref : all_column_chunks) { - read_column_indexes(cp, col_ref.get()); - } - } +void metadata::setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset) +{ + CUDF_FUNC_RANGE(); + + // Flatten all columns into a single vector for easier task distribution + std::vector> all_column_chunks; + all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + all_column_chunks.emplace_back(std::ref(col)); } } - sanitize_schema(); + auto read_column_indexes = [page_index_bytes, min_offset](CompactProtocolReader& reader, + ColumnChunk& col) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + int64_t const offset = col.column_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.column_index_length); + col.column_index = ColumnIndex(); + reader.read(&col.column_index.value()); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + int64_t const offset = col.offset_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.offset_index_length); + col.offset_index = OffsetIndex(); + reader.read(&col.offset_index.value()); + } + }; + + // Use parallel processing only if we have enough columns to justify the overhead + constexpr std::size_t parallel_threshold = 256; + auto const total_column_chunks = all_column_chunks.size(); + if (total_column_chunks >= parallel_threshold) { + // Dynamically calculate number of tasks based the number or row groups and columns + constexpr std::size_t min_tasks = 4; + constexpr std::size_t max_tasks = 32; + auto const ratio = static_cast(total_column_chunks) / parallel_threshold; + // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements + // doubles both the number of tasks and the task size) + auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); + + auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); + auto const column_chunks_per_task = total_column_chunks / num_tasks; + auto const remainder = total_column_chunks % num_tasks; + + std::vector> tasks; + tasks.reserve(num_tasks); + + std::size_t start_idx = 0; + for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { + auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); + auto const end_idx = start_idx + task_size; + + if (start_idx >= total_column_chunks) break; + + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { + CompactProtocolReader local_cp; + + for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { + read_column_indexes(local_cp, all_column_chunks[i].get()); + } + })); + + start_idx = end_idx; + } + + for (auto& task : tasks) { + task.get(); + } + } else { + CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); + // For small numbers of columns, use sequential processing to avoid overhead + for (auto& col_ref : all_column_chunks) { + read_column_indexes(cp, col_ref.get()); + } + } } metadata::~metadata() diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4b431ce45bd..515688aa215 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -110,6 +110,9 @@ struct metadata : public FileMetaData { metadata& operator=(metadata&& other) = default; ~metadata(); + void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + + protected: void sanitize_schema(); };