Skip to content
9 changes: 9 additions & 0 deletions cpp/examples/hybrid_scan_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ target_link_libraries(
target_compile_features(hybrid_scan_io PRIVATE cxx_std_20)
install(TARGETS hybrid_scan_io DESTINATION bin/examples/libcudf/hybrid_scan_io)

# Build and install hybrid_scan_io_multithreaded
add_executable(hybrid_scan_io_multithreaded hybrid_scan_io_multithreaded.cpp)
target_link_libraries(
hybrid_scan_io_multithreaded PRIVATE cudf::cudf $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp>
$<TARGET_OBJECTS:hybrid_scan_io_utils>
)
target_compile_features(hybrid_scan_io_multithreaded PRIVATE cxx_std_20)
install(TARGETS hybrid_scan_io_multithreaded DESTINATION bin/examples/libcudf/hybrid_scan_io)

# Install the example.parquet file
install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet
DESTINATION bin/examples/libcudf/hybrid_scan_io
Expand Down
393 changes: 376 additions & 17 deletions cpp/examples/hybrid_scan_io/common_utils.cpp

Large diffs are not rendered by default.

89 changes: 52 additions & 37 deletions cpp/examples/hybrid_scan_io/common_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#pragma once

#include "io_source.hpp"

#include <cudf/ast/expressions.hpp>
#include <cudf/io/text/byte_range_info.hpp>
#include <cudf/io/types.hpp>
Expand All @@ -13,12 +15,24 @@
#include <rmm/cuda_stream_view.hpp>

#include <string>
#include <unordered_set>

/**
* @file common_utils.hpp
* @brief Utilities for `hybrid_scan_io` example
*/

/**
* @brief Enum to represent the available parquet filters
*/
enum class parquet_filter_type : uint8_t {
ROW_GROUPS_WITH_STATS = 0,
ROW_GROUPS_WITH_DICT_PAGES = 1,
ROW_GROUPS_WITH_BLOOM_FILTERS = 2,
FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3,
PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4,
};

/**
* @brief Create memory resource for libcudf functions
*
Expand All @@ -27,6 +41,29 @@
*/
std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used);

/**
* @brief Function to process comma delimited input paths string to parquet files and/or dirs
* and convert them to specified io sources.
*
* Process the input path string containing directories (of parquet files) and/or individual
* parquet files into a list of input parquet files, multiple the list by `input_multiplier`,
* make sure to have at least `thread_count` files to satisfy at least file per parallel thread,
* and convert the final list of files to a list of `io_source` and return.
*
* @param paths Comma delimited input paths string
* @param input_multiplier Multiplier for the input files list
* @param thread_count Number of threads being used in the example
* @param io_source_type Specified IO source type to convert input files to
* @param stream CUDA stream to use
*
* @return Vector of input sources for the given paths
*/
[[nodiscard]] std::vector<io_source> extract_input_sources(std::string const& paths,
int32_t input_multiplier,
int32_t thread_count,
io_source_type io_source_type,
rmm::cuda_stream_view stream);

/**
* @brief Create a filter expression of the form `column_name == literal` for string type point
* lookups
Expand All @@ -38,16 +75,6 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_
cudf::ast::operation create_filter_expression(std::string const& column_name,
std::string const& literal_value);

/**
* @brief Combine columns from filter and payload tables into a single table
*
* @param filter_table Filter table
* @param payload_table Payload table
* @return Combined table
*/
std::unique_ptr<cudf::table> combine_tables(std::unique_ptr<cudf::table> filter_table,
std::unique_ptr<cudf::table> payload_table);

/**
* @brief Check if two tables are identical, throw an error otherwise
*
Expand All @@ -60,34 +87,22 @@ void check_tables_equal(cudf::table_view const& lhs_table,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Fetches a host span of Parquet footer bytes from the input buffer span
*
* @param buffer Input buffer span
* @return A host span of the footer bytes
*/
cudf::host_span<uint8_t const> fetch_footer_bytes(cudf::host_span<uint8_t const> buffer);
/**
* @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span
* @brief Read parquet file with the next-gen parquet reader
*
* @param buffer Input buffer span
* @param page_index_bytes Byte range of `PageIndex` to fetch
* @return A host span of the PageIndex bytes
*/
cudf::host_span<uint8_t const> fetch_page_index_bytes(
cudf::host_span<uint8_t const> buffer, cudf::io::text::byte_range_info const page_index_bytes);

/**
* @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers
* @tparam print_progress Boolean indicating whether to print progress
*
* @param host_buffer Host buffer span
* @param byte_ranges Byte ranges to fetch
* @param stream CUDA stream
* @param mr Device memory resource to create device buffers with
* @param io_source io source to read
* @param filter_expression Filter expression
* @param filters Set of parquet filters to apply
* @param stream CUDA stream for hybrid scan reader
* @param mr Device memory resource
*
* @return Vector of device buffers
* @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final
* row validity column
*/
std::vector<rmm::device_buffer> fetch_byte_ranges(
cudf::host_span<uint8_t const> host_buffer,
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
template <bool print_progress>
std::unique_ptr<cudf::table> hybrid_scan(io_source const& io_source,
cudf::ast::expression const& filter_expression,
std::unordered_set<parquet_filter_type> const& filters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
215 changes: 3 additions & 212 deletions cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
#include "io_source.hpp"
#include "timer.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/io/experimental/hybrid_scan.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/aligned.hpp>
#include <rmm/mr/aligned_resource_adaptor.hpp>
#include <rmm/mr/device_memory_resource.hpp>
#include <rmm/mr/statistics_resource_adaptor.hpp>

Expand Down Expand Up @@ -51,212 +48,6 @@ cudf::io::table_with_metadata read_parquet(io_source const& io_source,
return cudf::io::read_parquet(options);
}

/**
* @brief Enum to represent the available parquet filters
*/
enum class parquet_filter_type : uint8_t {
ROW_GROUPS_WITH_STATS = 0,
ROW_GROUPS_WITH_DICT_PAGES = 1,
ROW_GROUPS_WITH_BLOOM_FILTERS = 2,
FILTER_COLUMN_PAGES_WITH_PAGE_INDEX = 3,
PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK = 4,
};

/**
* @brief Read parquet file with the next-gen parquet reader
*
* @param io_source io source to read
* @param filter_expression Filter expression
* @param filters Set of parquet filters to apply
* @param stream CUDA stream for hybrid scan reader
* @param mr Device memory resource
*
* @return Tuple of filter table, payload table, filter metadata, payload metadata, and the final
* row validity column
*/
auto hybrid_scan(io_source const& io_source,
cudf::ast::operation const& filter_expression,
std::unordered_set<parquet_filter_type> const& filters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();

auto options = cudf::io::parquet_reader_options::builder().filter(filter_expression).build();

// Input file buffer span
auto const file_buffer_span = io_source.get_host_buffer_span();

std::cout << "\nREADER: Setup, metadata and page index...\n";
timer timer;

// Fetch footer bytes and setup reader
auto const footer_buffer = fetch_footer_bytes(file_buffer_span);
auto const reader =
std::make_unique<cudf::io::parquet::experimental::hybrid_scan_reader>(footer_buffer, options);

// Get page index byte range from the reader
auto const page_index_byte_range = reader->page_index_byte_range();
auto const page_index_buffer = fetch_page_index_bytes(file_buffer_span, page_index_byte_range);
reader->setup_page_index(page_index_buffer);

// Get all row groups from the reader
auto input_row_group_indices = reader->all_row_groups(options);
auto current_row_group_indices = cudf::host_span<cudf::size_type>(input_row_group_indices);
std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n";

timer.print_elapsed_millis();

// Filter row groups with stats
auto stats_filtered_row_group_indices = std::vector<cudf::size_type>{};
if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_STATS)) {
std::cout << "READER: Filter row groups with stats...\n";
timer.reset();
stats_filtered_row_group_indices =
reader->filter_row_groups_with_stats(current_row_group_indices, options, stream);

// Update current row group indices
current_row_group_indices = stats_filtered_row_group_indices;
std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n";
timer.print_elapsed_millis();
}

std::vector<cudf::io::text::byte_range_info> bloom_filter_byte_ranges;
std::vector<cudf::io::text::byte_range_info> dict_page_byte_ranges;

// Get bloom filter and dictionary page byte ranges from the reader
if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) or
filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS)) {
std::cout << "READER: Get bloom filter and dictionary page byte ranges...\n";
timer.reset();
std::tie(bloom_filter_byte_ranges, dict_page_byte_ranges) =
reader->secondary_filters_byte_ranges(current_row_group_indices, options);
timer.print_elapsed_millis();
}

// Filter row groups with dictionary pages
std::vector<cudf::size_type> dictionary_page_filtered_row_group_indices;
dictionary_page_filtered_row_group_indices.reserve(current_row_group_indices.size());
if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_DICT_PAGES) and
dict_page_byte_ranges.size()) {
std::cout << "READER: Filter row groups with dictionary pages...\n";
timer.reset();
// Fetch dictionary page buffers from the input file buffer
std::vector<rmm::device_buffer> dictionary_page_buffers =
fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr);
dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages(
dictionary_page_buffers, current_row_group_indices, options, stream);

// Update current row group indices
current_row_group_indices = dictionary_page_filtered_row_group_indices;
std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n";
timer.print_elapsed_millis();
} else {
std::cout << "SKIP: Row group filtering with dictionary pages...\n\n";
}

// Filter row groups with bloom filters
std::vector<cudf::size_type> bloom_filtered_row_group_indices;
bloom_filtered_row_group_indices.reserve(current_row_group_indices.size());
if (filters.contains(parquet_filter_type::ROW_GROUPS_WITH_BLOOM_FILTERS) and
bloom_filter_byte_ranges.size()) {
// Fetch 32 byte aligned bloom filter data buffers from the input file buffer
auto constexpr bloom_filter_alignment = rmm::CUDA_ALLOCATION_ALIGNMENT;
auto aligned_mr = rmm::mr::aligned_resource_adaptor<rmm::mr::device_memory_resource>(
mr, bloom_filter_alignment);
std::cout << "READER: Filter row groups with bloom filters...\n";
timer.reset();
std::vector<rmm::device_buffer> bloom_filter_data =
fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr);
// Filter row groups with bloom filters
bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters(
bloom_filter_data, current_row_group_indices, options, stream);

// Update current row group indices
current_row_group_indices = bloom_filtered_row_group_indices;
std::cout << "Current row group indices size: " << current_row_group_indices.size() << "\n";
timer.print_elapsed_millis();
} else {
std::cout << "SKIP: Row group filtering with bloom filters...\n\n";
}

// Check whether to prune filter column data pages
using cudf::io::parquet::experimental::use_data_page_mask;
auto const prune_filter_data_pages =
filters.contains(parquet_filter_type::FILTER_COLUMN_PAGES_WITH_PAGE_INDEX);

auto row_mask = std::unique_ptr<cudf::column>{};
if (prune_filter_data_pages) {
std::cout << "READER: Filter data pages of filter columns with page index stats...\n";
timer.reset();
// Filter data pages with page index stats
row_mask =
reader->build_row_mask_with_page_index_stats(current_row_group_indices, options, stream, mr);
timer.print_elapsed_millis();
} else {
std::cout << "SKIP: Filter column data page filtering with page index stats...\n\n";
auto num_rows = reader->total_rows_in_row_groups(current_row_group_indices);
row_mask = cudf::make_numeric_column(
cudf::data_type{cudf::type_id::BOOL8}, num_rows, rmm::device_buffer{}, 0, stream, mr);
}

std::cout << "READER: Materialize filter columns...\n";
timer.reset();
// Get column chunk byte ranges from the reader
auto const filter_column_chunk_byte_ranges =
reader->filter_column_chunks_byte_ranges(current_row_group_indices, options);
auto filter_column_chunk_buffers =
fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr);

// Materialize the table with only the filter columns
auto row_mask_mutable_view = row_mask->mutable_view();
auto filter_table =
reader
->materialize_filter_columns(
current_row_group_indices,
std::move(filter_column_chunk_buffers),
row_mask_mutable_view,
prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO,
options,
stream)
.tbl;
timer.print_elapsed_millis();

// Check whether to prune payload column data pages
auto const prune_payload_data_pages =
filters.contains(parquet_filter_type::PAYLOAD_COLUMN_PAGES_WITH_ROW_MASK);

if (prune_payload_data_pages) {
std::cout << "READER: Filter data pages of payload columns with row mask...\n";
} else {
std::cout << "SKIP: Payload column data page filtering with row mask...\n\n";
}

std::cout << "READER: Materialize payload columns...\n";
timer.reset();
// Get column chunk byte ranges from the reader
auto const payload_column_chunk_byte_ranges =
reader->payload_column_chunks_byte_ranges(current_row_group_indices, options);
auto payload_column_chunk_buffers =
fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr);

// Materialize the table with only the payload columns
auto payload_table =
reader
->materialize_payload_columns(
current_row_group_indices,
std::move(payload_column_chunk_buffers),
row_mask->view(),
prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO,
options,
stream)
.tbl;
timer.print_elapsed_millis();

return std::make_tuple(combine_tables(std::move(filter_table), std::move(payload_table)),
std::move(row_mask));
}

/**
* @brief Function to print example usage and argument information.
*/
Expand Down Expand Up @@ -353,13 +144,13 @@ int main(int argc, char const** argv)
timer timer;
std::cout << "Reading " << input_filepath << " with next-gen parquet reader...\n";
timer.reset();
auto [table_next_gen_reader, row_mask] =
hybrid_scan(data_source, filter_expression, filters, stream, stats_mr);
auto const table_next_gen_reader =
hybrid_scan<true>(data_source, filter_expression, filters, stream, stats_mr);
timer.print_elapsed_millis();

std::cout << "Reading " << input_filepath << " with main parquet reader...\n";
timer.reset();
auto [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream);
auto const [table_main_reader, metadata] = read_parquet(data_source, filter_expression, stream);
timer.print_elapsed_millis();

// Check for validity
Expand Down
Loading