diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index fc3cd06190d..783fab90cc7 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -402,6 +402,21 @@ class datasource { }; }; +/** + * @brief Constructs datasources from dataset source information + * + * @ingroup io_datasources + * + * @param info Dataset source information + * @param offset Starting byte offset from which data will be read (default zero) + * @param max_size_estimate Upper estimate of the data range that will be read (default zero, + * which means the entire file after `offset`) + * @return Constructed vector of datasource objects + */ +std::vector> make_datasources(source_info const& info, + size_t offset = 0, + size_t max_size_estimate = 0); + /** @} */ // end of group } // namespace io } // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 28af934d279..a5f9ab5d044 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -51,11 +51,13 @@ class reader { * @brief Constructor from an array of datasources * * @param sources Input `datasource` objects to read the dataset from + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource to use for device memory allocation */ explicit reader(std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -134,6 +136,7 @@ class chunked_reader : private reader { * @param pass_read_limit Limit on total amount of memory used for temporary computations during * loading, or `0` if there is no limit * @param sources Input `datasource` objects to read the dataset from + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation @@ -141,6 +144,7 @@ class chunked_reader : private reader { explicit chunked_reader(std::size_t chunk_read_limit, std::size_t pass_read_limit, std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -248,6 +252,17 @@ class writer { * metadata. */ parquet_metadata read_parquet_metadata(host_span const> sources); + +/** + * @brief Constructs FileMetaData objects from parquet dataset + * + * @param sources Input `datasource` objects to read the dataset from + * + * @return List of FileMetaData objects, one per parquet source + */ +std::vector read_parquet_footers( + host_span const> sources); + } // namespace parquet::detail } // namespace io } // namespace CUDF_EXPORT cudf diff --git a/cpp/include/cudf/io/orc_metadata.hpp b/cpp/include/cudf/io/orc_metadata.hpp index 9124c69c5c6..8697035978c 100644 --- a/cpp/include/cudf/io/orc_metadata.hpp +++ b/cpp/include/cudf/io/orc_metadata.hpp @@ -26,6 +26,9 @@ namespace io { * @file */ +//! ORC data type +using cudf::io::orc::TypeKind; + /** * @brief Holds column names and buffers containing raw file-level and stripe-level statistics. * @@ -224,9 +227,7 @@ struct orc_column_schema { * @param type ORC type * @param children child columns (empty for non-nested types) */ - orc_column_schema(std::string_view name, - orc::TypeKind type, - std::vector children) + orc_column_schema(std::string_view name, TypeKind type, std::vector children) : _name{name}, _type_kind{type}, _children{std::move(children)} { } @@ -282,7 +283,7 @@ struct orc_column_schema { private: std::string _name; - orc::TypeKind _type_kind; + TypeKind _type_kind; std::vector _children; }; diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 487d51faa81..1cead0f4c27 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -667,6 +667,34 @@ table_with_metadata read_parquet( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); +/** + * @brief Reads a Parquet dataset into a set of columns using pre-existing Parquet datasources and + * file metadatas. + * + * The following code snippet demonstrates how to read a dataset from a file: + * @code + * auto sources = cudf::io::make_datasources(cudf::io::source_info("dataset.parquet")); + * auto metadatas = cudf::io::read_parquet_footers(sources); + * auto options = cudf::io::parquet_reader_options::builder(); + * auto result = cudf::io::read_parquet(std::move(sources), std::move(metadatas), options); + * @endcode + * + * @param sources Input `datasource` objects to read the dataset from + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate device memory of the table in the returned + * table_with_metadata + * + * @return The set of columns along with metadata + */ +table_with_metadata read_parquet( + std::vector>&& sources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** * @brief The chunked parquet reader class to read Parquet file iteratively in to a series of * tables, chunk by chunk. @@ -705,6 +733,30 @@ class chunked_parquet_reader { rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** + * @brief Constructor for chunked reader using pre-existing Parquet datasources and + * file metadatas. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the + * output table for each reading. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param sources Input `datasource` objects to read the dataset from + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty + * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + std::vector>&& sources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** * @brief Constructor for chunked reader. * @@ -731,6 +783,37 @@ class chunked_parquet_reader { rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** + * @brief Constructor for chunked reader using pre-existing Parquet datasources and + * file metadatas. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the + * output table for each reading, and a byte limit on the amount of temporary memory to use + * when reading. pass_read_limit affects how many row groups we can read at a time by limiting + * the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an + * absolute limit - if a single row group cannot fit within the limit given, it will still be + * loaded. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or + * `0` if there is no limit + * @param sources Input `datasource` objects to read the dataset from + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty + * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + std::size_t pass_read_limit, + std::vector>&& sources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + /** * @brief Destructor, destroying the internal reader instance. * diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index 198d736b14d..f75d68695b6 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -10,6 +10,7 @@ #pragma once +#include #include #include #include @@ -262,13 +263,25 @@ class parquet_metadata { * * @ingroup io_readers * - * @param src_info Dataset source + * @param src_info Dataset source information * * @return parquet_metadata with parquet schema, number of rows, number of row groups and key-value * metadata */ parquet_metadata read_parquet_metadata(source_info const& src_info); +/** + * @brief Constructs FileMetaData objects from parquet dataset + * + * @ingroup io_readers + * + * @param sources Input `datasource` objects to read the dataset from + * + * @return List of FileMetaData objects, one per parquet source + */ +std::vector read_parquet_footers( + cudf::host_span const> sources); + /** @} */ // end of group } // namespace io } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 55aa7a9ba24..69f92d8926b 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -150,11 +151,12 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( return chunked_parquet_writer_options_builder{sink}; } -namespace { - +/** + * @copydoc cudf::io::make_datasources + */ std::vector> make_datasources(source_info const& info, - size_t offset = 0, - size_t max_size_estimate = 0) + size_t offset, + size_t max_size_estimate) { switch (info.type()) { case io_type::FILEPATH: { @@ -188,6 +190,8 @@ std::vector> make_datasources(source_info } } +namespace { + std::vector> make_datasinks(sink_info const& info) { switch (info.type()) { @@ -612,8 +616,22 @@ table_with_metadata read_parquet(parquet_reader_options const& options, CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = - std::make_unique(std::move(datasources), options, stream, mr); + auto reader = std::make_unique( + std::move(datasources), std::vector{}, options, stream, mr); + + return reader->read(); +} + +table_with_metadata read_parquet(std::vector>&& datasources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + auto reader = std::make_unique( + std::move(datasources), std::move(parquet_metadatas), options, stream, mr); return reader->read(); } @@ -626,6 +644,13 @@ parquet_metadata read_parquet_metadata(source_info const& src_info) return detail_parquet::read_parquet_metadata(datasources); } +std::vector read_parquet_footers( + host_span const> sources) +{ + CUDF_FUNC_RANGE(); + return detail_parquet::read_parquet_footers(sources); +} + /** * @copydoc cudf::io::merge_row_group_metadata */ @@ -700,8 +725,33 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) - : reader{std::make_unique( - chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)} + : reader{std::make_unique(chunk_read_limit, + 0, + make_datasources(options.get_source()), + std::vector{}, + options, + stream, + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader( + std::size_t chunk_read_limit, + std::vector>&& datasources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + : reader{std::make_unique(chunk_read_limit, + 0, + std::move(datasources), + std::move(parquet_metadatas), + options, + stream, + mr)} { } @@ -716,6 +766,28 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, : reader{std::make_unique(chunk_read_limit, pass_read_limit, make_datasources(options.get_source()), + std::vector{}, + options, + stream, + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader( + std::size_t chunk_read_limit, + std::size_t pass_read_limit, + std::vector>&& datasources, + std::vector&& parquet_metadatas, + parquet_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + : reader{std::make_unique(chunk_read_limit, + pass_read_limit, + std::move(datasources), + std::move(parquet_metadatas), options, stream, mr)} diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp index 3818e5ed36f..cb971703f27 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp @@ -73,20 +73,20 @@ metadata::metadata(cudf::host_span footer_bytes) aggregate_reader_metadata::aggregate_reader_metadata(FileMetaData const& parquet_metadata, bool use_arrow_schema, bool has_cols_from_mismatched_srcs) - : aggregate_reader_metadata_base({}, false, false) + : aggregate_reader_metadata_base(host_span const>{}, false, false) { // Just copy over the FileMetaData struct to the internal metadata struct - per_file_metadata.emplace_back(metadata{parquet_metadata}.get_file_metadata()); + per_file_metadata.emplace_back(metadata{parquet_metadata}); initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); } aggregate_reader_metadata::aggregate_reader_metadata(cudf::host_span footer_bytes, bool use_arrow_schema, bool has_cols_from_mismatched_srcs) - : aggregate_reader_metadata_base({}, false, false) + : aggregate_reader_metadata_base(host_span const>{}, false, false) { // Re-initialize internal variables here as base class was initialized without a source - per_file_metadata.emplace_back(metadata{footer_bytes}.get_file_metadata()); + per_file_metadata.emplace_back(metadata{footer_bytes}); initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); } @@ -148,36 +148,26 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span return; } - auto& row_groups = per_file_metadata.front().row_groups; + // Get the file metadata and setup the page index + auto& file_metadata = per_file_metadata.front(); + auto const& row_groups = file_metadata.row_groups; + + // Check for empty parquet file CUDF_EXPECTS(not row_groups.empty() and not row_groups.front().columns.empty(), "No column chunks in Parquet schema to read page index for"); - CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); - // Set the first ColumnChunk's offset of ColumnIndex as the adjusted zero offset int64_t const min_offset = row_groups.front().columns.front().column_index_offset; - // now loop over row groups - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - // Read the ColumnIndex for this ColumnChunk - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.column_index_length); - ColumnIndex ci; - cp.read(&ci); - col.column_index = std::move(ci); - } - // Read the OffsetIndex for this ColumnChunk - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - cp.init(page_index_bytes.data() + offset, col.offset_index_length); - OffsetIndex oi; - cp.read(&oi); - col.offset_index = std::move(oi); - } - } + + // Check if the page index buffer is valid + { + auto const& last_col = row_groups.back().columns.back(); + auto const max_offset = last_col.offset_index_offset + last_col.offset_index_length; + CUDF_EXPECTS(max_offset > min_offset, "Encountered an invalid page index buffer"); } + + file_metadata.setup_page_index(page_index_bytes, min_offset); } size_type aggregate_reader_metadata::total_rows_in_row_groups( diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp index bce18919003..43c6d4fc08c 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp @@ -33,10 +33,15 @@ using parquet::detail::row_group_info; /** * @brief Class for parsing dataset metadata */ -struct metadata : private metadata_base { +struct metadata : public metadata_base { explicit metadata(cudf::host_span footer_bytes); explicit metadata(FileMetaData const& other) { static_cast(*this) = other; } - metadata_base get_file_metadata() && { return std::move(*this); } + metadata(metadata const& other) = delete; + metadata(metadata&& other) = default; + metadata& operator=(metadata const& other) = delete; + metadata& operator=(metadata&& other) = default; + + ~metadata() = default; }; class aggregate_reader_metadata : public aggregate_reader_metadata_base { @@ -93,6 +98,11 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base { bool use_arrow_schema, bool has_cols_from_mismatched_srcs); + aggregate_reader_metadata(aggregate_reader_metadata const&) = delete; + aggregate_reader_metadata& operator=(aggregate_reader_metadata const&) = delete; + aggregate_reader_metadata(aggregate_reader_metadata&&) = default; + aggregate_reader_metadata& operator=(aggregate_reader_metadata&&) = default; + /** * @brief Initialize the internal variables */ diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index ab6d07e6922..3a22732995d 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -10,10 +10,12 @@ namespace cudf::io::parquet::detail { reader::reader() = default; reader::reader(std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) - : _impl(std::make_unique(std::move(sources), options, stream, mr)) + : _impl(std::make_unique( + std::move(sources), std::move(parquet_metadatas), options, stream, mr)) { } @@ -24,12 +26,18 @@ table_with_metadata reader::read() { return _impl->read(); } chunked_reader::chunked_reader(std::size_t chunk_read_limit, std::size_t pass_read_limit, std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - _impl = std::make_unique( - chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr); + _impl = std::make_unique(chunk_read_limit, + pass_read_limit, + std::move(sources), + std::move(parquet_metadatas), + options, + stream, + mr); } chunked_reader::~chunked_reader() = default; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 3d8fdc953e8..7f13210c60e 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -489,12 +490,14 @@ reader_impl::reader_impl() } reader_impl::reader_impl(std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) : reader_impl(0 /*chunk_read_limit*/, 0 /*input_pass_read_limit*/, std::forward>>(sources), + std::forward>(parquet_metadatas), options, stream, mr) @@ -504,6 +507,7 @@ reader_impl::reader_impl(std::vector>&& sources, reader_impl::reader_impl(std::size_t chunk_read_limit, std::size_t pass_read_limit, std::vector>&& sources, + std::vector&& file_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -524,10 +528,18 @@ reader_impl::reader_impl(std::size_t chunk_read_limit, _input_pass_read_limit{pass_read_limit} { // Open and parse the source dataset metadata - _metadata = std::make_unique( - _sources, - options.is_enabled_use_arrow_schema(), - options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas()); + CUDF_EXPECTS(file_metadatas.empty() or file_metadatas.size() == _sources.size(), + "Encountered a mismatch in the number of provided data sources and metadatas"); + _metadata = + file_metadatas.empty() + ? std::make_unique( + _sources, + options.is_enabled_use_arrow_schema(), + options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas()) + : std::make_unique( + std::forward>(file_metadatas), + options.is_enabled_use_arrow_schema(), + options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas()); // Number of input sources _num_sources = _sources.size(); @@ -1028,15 +1040,18 @@ parquet_column_schema walk_schema(aggregate_reader_metadata const* mt, int idx) parquet_metadata read_parquet_metadata(host_span const> sources) { - // Do not use arrow schema when reading information from parquet metadata. + // Do not use arrow schema when only reading the parquet footer metadata. constexpr auto use_arrow_schema = false; // Do not select any columns when only reading the parquet metadata. constexpr auto has_column_projection = false; + // Do not read page indexes as it's not used to construct `parquet_metadata`. + constexpr auto read_page_indexes = false; + // Open and parse the source dataset metadata auto metadata = - aggregate_reader_metadata(sources, use_arrow_schema, has_column_projection, false); + aggregate_reader_metadata(sources, use_arrow_schema, has_column_projection, read_page_indexes); return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)}, metadata.get_num_rows(), @@ -1047,4 +1062,22 @@ parquet_metadata read_parquet_metadata(host_span con metadata.get_column_chunk_metadata()}; } +std::vector read_parquet_footers( + host_span const> sources) +{ + // Do not use arrow schema when only reading the parquet metadata. + constexpr auto use_arrow_schema = false; + + // Do not select any columns when only reading the parquet metadata. + constexpr auto has_column_projection = false; + + // Read page indexes if available here since we will want to reuse the raw metadata for later use. + constexpr auto read_page_indexes = true; + + // Parse the source dataset metadata + return aggregate_reader_metadata( + sources, use_arrow_schema, has_column_projection, read_page_indexes) + .get_parquet_metadatas(); +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 4c03cdba19a..4da65bdd867 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -46,11 +47,13 @@ class reader_impl { * entire given file. * * @param sources Dataset sources + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit reader_impl(std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -91,6 +94,7 @@ class reader_impl { * @param pass_read_limit Limit on memory usage for the purposes of decompression and processing * of input, or `0` if there is no limit. * @param sources Dataset sources + * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation @@ -98,6 +102,7 @@ class reader_impl { explicit reader_impl(std::size_t chunk_read_limit, std::size_t pass_read_limit, std::vector>&& sources, + std::vector&& parquet_metadatas, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index fe9d33678c3..1b63fda80dd 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -300,6 +300,8 @@ void metadata::sanitize_schema() process(0); } +metadata::metadata(FileMetaData&& other) : FileMetaData(std::move(other)) {} + metadata::metadata(datasource* source, bool read_page_indexes) { constexpr auto header_len = sizeof(file_header_s); @@ -339,85 +341,93 @@ metadata::metadata(datasource* source, bool read_page_indexes) auto const& last_col = row_groups.back().columns.back(); int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length; - if (max_offset > 0) { - int64_t const length = max_offset - min_offset; - auto const idx_buf = source->host_read(min_offset, length); - // Flatten all columns into a single vector for easier task distribution - std::vector> all_column_chunks; - all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - all_column_chunks.emplace_back(std::ref(col)); - } - } + if (max_offset > min_offset) { + size_t const length = max_offset - min_offset; + auto const page_idx_buf = source->host_read(min_offset, length); + setup_page_index({page_idx_buf->data(), length}, min_offset); + } + } - auto read_column_indexes = [&idx_buf, min_offset](CompactProtocolReader& reader, - ColumnChunk& col) { - if (col.column_index_length > 0 && col.column_index_offset > 0) { - int64_t const offset = col.column_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.column_index_length); - col.column_index = ColumnIndex(); - reader.read(&col.column_index.value()); - } - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - int64_t const offset = col.offset_index_offset - min_offset; - reader.init(idx_buf->data() + offset, col.offset_index_length); - col.offset_index = OffsetIndex(); - reader.read(&col.offset_index.value()); - } - }; - - // Use parallel processing only if we have enough columns to justify the overhead - constexpr std::size_t parallel_threshold = 512; - auto const total_column_chunks = all_column_chunks.size(); - if (total_column_chunks >= parallel_threshold) { - // Dynamically calculate number of tasks based the number or row groups and columns - constexpr std::size_t min_tasks = 4; - constexpr std::size_t max_tasks = 32; - auto const ratio = static_cast(total_column_chunks) / parallel_threshold; - // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements - // doubles both the number of tasks and the task size) - auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); - - auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); - auto const column_chunks_per_task = total_column_chunks / num_tasks; - auto const remainder = total_column_chunks % num_tasks; - - std::vector> tasks; - tasks.reserve(num_tasks); - - std::size_t start_idx = 0; - for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { - auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); - auto const end_idx = start_idx + task_size; - - if (start_idx >= total_column_chunks) break; - - tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( - [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { - CompactProtocolReader local_cp; - - for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { - read_column_indexes(local_cp, all_column_chunks[i].get()); - } - })); - - start_idx = end_idx; - } + sanitize_schema(); +} - for (auto& task : tasks) { - task.get(); - } - } else { - // For small numbers of columns, use sequential processing to avoid overhead - for (auto& col_ref : all_column_chunks) { - read_column_indexes(cp, col_ref.get()); - } - } +void metadata::setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset) +{ + CUDF_FUNC_RANGE(); + + // Flatten all columns into a single vector for easier task distribution + std::vector> all_column_chunks; + all_column_chunks.reserve(row_groups.size() * row_groups.front().columns.size()); + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + all_column_chunks.emplace_back(std::ref(col)); } } - sanitize_schema(); + auto read_column_indexes = [page_index_bytes, min_offset](CompactProtocolReader& reader, + ColumnChunk& col) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + int64_t const offset = col.column_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.column_index_length); + col.column_index = ColumnIndex(); + reader.read(&col.column_index.value()); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + int64_t const offset = col.offset_index_offset - min_offset; + reader.init(page_index_bytes.data() + offset, col.offset_index_length); + col.offset_index = OffsetIndex(); + reader.read(&col.offset_index.value()); + } + }; + + // Use parallel processing only if we have enough columns to justify the overhead + constexpr std::size_t parallel_threshold = 256; + auto const total_column_chunks = all_column_chunks.size(); + if (total_column_chunks >= parallel_threshold) { + // Dynamically calculate number of tasks based the number or row groups and columns + constexpr std::size_t min_tasks = 4; + constexpr std::size_t max_tasks = 32; + auto const ratio = static_cast(total_column_chunks) / parallel_threshold; + // Scale the number of tasks and task size evenly (e.g. quadrupling the number of elements + // doubles both the number of tasks and the task size) + auto const multiplier = std::size_t(1) << (static_cast(std::log2(ratio)) / 2); + + auto const num_tasks = std::clamp(min_tasks * multiplier, min_tasks, max_tasks); + auto const column_chunks_per_task = total_column_chunks / num_tasks; + auto const remainder = total_column_chunks % num_tasks; + + std::vector> tasks; + tasks.reserve(num_tasks); + + std::size_t start_idx = 0; + for (std::size_t task_id = 0; task_id < num_tasks; ++task_id) { + auto const task_size = column_chunks_per_task + (task_id < remainder ? 1 : 0); + auto const end_idx = start_idx + task_size; + + if (start_idx >= total_column_chunks) break; + + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task( + [&all_column_chunks, &read_column_indexes, start_idx, end_idx]() { + CompactProtocolReader local_cp; + + for (size_t i = start_idx; i < end_idx && i < all_column_chunks.size(); ++i) { + read_column_indexes(local_cp, all_column_chunks[i].get()); + } + })); + + start_idx = end_idx; + } + + for (auto& task : tasks) { + task.get(); + } + } else { + CompactProtocolReader cp(page_index_bytes.data(), page_index_bytes.size()); + // For small numbers of columns, use sequential processing to avoid overhead + for (auto& col_ref : all_column_chunks) { + read_column_indexes(cp, col_ref.get()); + } + } } metadata::~metadata() @@ -669,16 +679,8 @@ void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_inf rg_info.column_chunks = std::move(chunks); } -aggregate_reader_metadata::aggregate_reader_metadata( - host_span const> sources, - bool use_arrow_schema, - bool has_cols_from_mismatched_srcs, - bool read_page_indexes) - : per_file_metadata(metadatas_from_sources(sources, read_page_indexes)), - keyval_maps(collect_keyval_metadata()), - schema_idx_maps(init_schema_idx_maps(has_cols_from_mismatched_srcs)), - num_rows(calc_num_rows()), - num_row_groups(calc_num_row_groups()) +void aggregate_reader_metadata::initialize_internals(bool use_arrow_schema, + bool has_cols_from_mismatched_srcs) { if (per_file_metadata.size() > 1) { auto& first_meta = per_file_metadata.front(); @@ -729,6 +731,38 @@ aggregate_reader_metadata::aggregate_reader_metadata( keyval_maps.begin(), keyval_maps.end(), [](auto& pfm) { pfm.erase(ARROW_SCHEMA_KEY); }); } +aggregate_reader_metadata::aggregate_reader_metadata(std::vector&& parquet_metadatas, + bool use_arrow_schema, + bool has_cols_from_mismatched_srcs) +{ + per_file_metadata.reserve(parquet_metadatas.size()); + std::transform(std::make_move_iterator(parquet_metadatas.begin()), + std::make_move_iterator(parquet_metadatas.end()), + std::back_inserter(per_file_metadata), + [](FileMetaData&& meta) { return metadata{std::move(meta)}; }); + + keyval_maps = collect_keyval_metadata(); + schema_idx_maps = init_schema_idx_maps(has_cols_from_mismatched_srcs); + num_rows = calc_num_rows(); + num_row_groups = calc_num_row_groups(); + + initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); +} + +aggregate_reader_metadata::aggregate_reader_metadata( + host_span const> sources, + bool use_arrow_schema, + bool has_cols_from_mismatched_srcs, + bool read_page_indexes) + : per_file_metadata(metadatas_from_sources(sources, read_page_indexes)), + keyval_maps(collect_keyval_metadata()), + schema_idx_maps(init_schema_idx_maps(has_cols_from_mismatched_srcs)), + num_rows(calc_num_rows()), + num_row_groups(calc_num_row_groups()) +{ + initialize_internals(use_arrow_schema, has_cols_from_mismatched_srcs); +} + arrow_schema_data_types aggregate_reader_metadata::collect_arrow_schema() const { // Check the key_value metadata for arrow schema, decode and walk it diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4b431ce45bd..4974b62f2cb 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -104,12 +105,16 @@ struct row_group_info { struct metadata : public FileMetaData { metadata() = default; explicit metadata(datasource* source, bool read_page_indexes = true); + explicit metadata(FileMetaData&& other); metadata(metadata const& other) = delete; metadata(metadata&& other) = default; metadata& operator=(metadata const& other) = delete; metadata& operator=(metadata&& other) = default; ~metadata(); + void setup_page_index(cudf::host_span page_index_bytes, int64_t min_offset); + + protected: void sanitize_schema(); }; @@ -320,19 +325,38 @@ class aggregate_reader_metadata { std::reference_wrapper filter, rmm::cuda_stream_view stream) const; + /** + * @brief Initialize the internal variables + */ + void initialize_internals(bool use_arrow_schema, bool has_cols_from_mismatched_srcs); + public: aggregate_reader_metadata(host_span const> sources, bool use_arrow_schema, bool has_cols_from_mismatched_srcs, bool read_page_indexes = true); + aggregate_reader_metadata(std::vector&& parquet_metadatas, + bool use_arrow_schema, + bool has_cols_from_mismatched_srcs); + aggregate_reader_metadata(aggregate_reader_metadata const&) = delete; aggregate_reader_metadata& operator=(aggregate_reader_metadata const&) = delete; - aggregate_reader_metadata(aggregate_reader_metadata&&) = delete; - aggregate_reader_metadata& operator=(aggregate_reader_metadata&&) = delete; + aggregate_reader_metadata(aggregate_reader_metadata&&) = default; + aggregate_reader_metadata& operator=(aggregate_reader_metadata&&) = default; [[nodiscard]] RowGroup const& get_row_group(size_type row_group_index, size_type src_idx) const; + /** + * @brief Get Parquet file metadatas + * + * @return Parquet file metadatas + */ + [[nodiscard]] std::vector get_parquet_metadatas() const + { + return std::vector{per_file_metadata.begin(), per_file_metadata.end()}; + } + /** * @brief Extracts the schema_idx'th column chunk metadata from row_group_index'th row group of * the src_idx'th file. diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index b703673c15b..64b11df0876 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -141,6 +141,37 @@ auto chunked_read(std::string const& filepath, return chunked_read(vpath, output_limit, input_limit); } +auto chunked_read(std::vector>&& sources, + std::vector&& metadatas, + std::size_t output_limit, + std::size_t input_limit = 0) +{ + auto const read_opts = cudf::io::parquet_reader_options::builder().build(); + auto reader = cudf::io::chunked_parquet_reader( + output_limit, input_limit, std::move(sources), std::move(metadatas), read_opts); + + auto num_chunks = 0; + auto out_tables = std::vector>{}; + + do { + auto chunk = reader.read_chunk(); + // If the input file is empty, the first call to `read_chunk` will return an empty table. + // Thus, we only check for non-empty output table from the second call. + if (num_chunks > 0) { + CUDF_EXPECTS(chunk.tbl->num_rows() != 0, "Number of rows in the new chunk is zero."); + } + ++num_chunks; + out_tables.emplace_back(std::move(chunk.tbl)); + } while (reader.has_next()); + + auto out_tviews = std::vector{}; + for (auto const& tbl : out_tables) { + out_tviews.emplace_back(tbl->view()); + } + + return std::pair(cudf::concatenate(out_tviews), num_chunks); +} + auto const read_table_and_nrows_per_source(cudf::io::chunked_parquet_reader const& reader) { auto out_tables = std::vector>{}; @@ -364,12 +395,18 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithString) // Test with zero limit: everything will be read in one chunk { - auto const [result, num_chunks] = chunked_read(filepath_no_null, 0); + // Separately materialize datasource and metadata and use them to construct the chunked reader + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath_no_null}); + auto metadatas = cudf::io::read_parquet_footers(sources); + auto const [result, num_chunks] = chunked_read(std::move(sources), std::move(metadatas), 0); EXPECT_EQ(num_chunks, 1); CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); } { - auto const [result, num_chunks] = chunked_read(filepath_with_nulls, 0); + // Separately materialize datasource and metadata and use them to construct the chunked reader + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath_with_nulls}); + auto metadatas = cudf::io::read_parquet_footers(sources); + auto const [result, num_chunks] = chunked_read(std::move(sources), std::move(metadatas), 0); EXPECT_EQ(num_chunks, 1); CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); } @@ -386,7 +423,11 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithString) // Test with a very small limit: 1 byte { - auto const [result, num_chunks] = chunked_read(filepath_no_null, 1); + // Separately materialize datasource and metadata and use them to construct the chunked reader + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath_no_null}); + auto metadatas = cudf::io::read_parquet_footers(sources); + + auto const [result, num_chunks] = chunked_read(std::move(sources), std::move(metadatas), 1); EXPECT_EQ(num_chunks, 3); CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null, *result); } @@ -441,7 +482,11 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadWithString) CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_with_nulls, *result); } { - auto const [result, num_chunks] = chunked_read(filepath_no_null_delta, 500'000); + // Separately materialize datasource and metadata and use them to construct the chunked reader + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath_no_null_delta}); + auto metadatas = cudf::io::read_parquet_footers(sources); + auto const [result, num_chunks] = + chunked_read(std::move(sources), std::move(metadatas), 500'000); // EXPECT_EQ(num_chunks, 2); CUDF_TEST_EXPECT_TABLES_EQUAL(*expected_no_null_delta, *result); } @@ -1500,11 +1545,17 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadOutOfBoundChunks) auto constexpr num_rows = 0; auto const [expected, filepath] = generate_input(num_rows, false); auto constexpr output_read_limit = 1'000; - auto const options = - cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}).build(); - auto const reader = - cudf::io::chunked_parquet_reader(output_read_limit, 0, options, cudf::get_default_stream()); - auto const [result, num_chunks] = read_chunks_with_while_loop(reader); + + auto const options = cudf::io::parquet_reader_options_builder().build(); + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath}); + auto metadatas = cudf::io::read_parquet_footers(sources); + auto const reader = cudf::io::chunked_parquet_reader(output_read_limit, + 0, + std::move(sources), + std::move(metadatas), + options, + cudf::get_default_stream()); + auto const [result, num_chunks] = read_chunks_with_while_loop(reader); auto const out_of_bound_table_chunk = reader.read_chunk().tbl; EXPECT_EQ(num_chunks, 1); @@ -1557,11 +1608,16 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) auto constexpr output_read_limit = 1'500; auto constexpr pass_read_limit = 3'500; - auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) - .skip_rows(rows_to_skip) - .build(); - auto const reader = cudf::io::chunked_parquet_reader( - output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + // Separately materialize datasource and metadata and use them to construct the chunked reader + auto const options = cudf::io::parquet_reader_options_builder().skip_rows(rows_to_skip).build(); + auto sources = cudf::io::make_datasources(cudf::io::source_info{filepath}); + auto metadatas = cudf::io::read_parquet_footers(sources); + auto const reader = cudf::io::chunked_parquet_reader(output_read_limit, + pass_read_limit, + std::move(sources), + std::move(metadatas), + options, + cudf::get_default_stream()); auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 85adc9d12a8..8a6a3990325 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2504,7 +2504,6 @@ TEST_F(ParquetMetadataReaderTest, TestBasic) .metadata(std::move(expected_metadata)); cudf::io::write_parquet(out_opts); - // Single file auto const test_parquet_metadata = [&](int num_sources) { auto meta = read_parquet_metadata(cudf::io::source_info{std::vector(num_sources, filepath)}); @@ -2542,6 +2541,45 @@ TEST_F(ParquetMetadataReaderTest, TestBasic) test_parquet_metadata(3); } +TEST_F(ParquetMetadataReaderTest, TestPreMaterializedMetadata) +{ + auto const num_rows = 1200; + + auto ints = random_values(num_rows); + auto floats = random_values(num_rows); + column_wrapper int_col(ints.begin(), ints.end()); + column_wrapper float_col(floats.begin(), floats.end()); + + table_view input_table({int_col, float_col}); + auto filepath = temp_env->get_temp_filepath("PreMaterializedMetadata.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, input_table).build(); + cudf::io::write_parquet(out_opts); + + auto const test_parquet_metadata = [&](int num_sources) { + auto const source_info = cudf::io::source_info{std::vector(num_sources, filepath)}; + auto datasources = cudf::io::make_datasources(source_info); + auto metadatas = cudf::io::read_parquet_footers(datasources); + EXPECT_EQ(metadatas.size(), num_sources); + auto const rows_in_metadatas = + std::accumulate(metadatas.begin(), metadatas.end(), 0, [](auto acc, auto const& metadata) { + return acc + metadata.num_rows; + }); + EXPECT_EQ(rows_in_metadatas, num_sources * num_rows); + + auto const options = cudf::io::parquet_reader_options::builder(source_info).build(); + auto const read = cudf::io::read_parquet(std::move(datasources), std::move(metadatas), options); + auto const expected = cudf::io::read_parquet(options); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected.tbl->view(), read.tbl->view()); + }; + + // Test with single file + test_parquet_metadata(1); + // Test with multiple files + test_parquet_metadata(3); +} + TEST_F(ParquetMetadataReaderTest, TestNested) { auto const num_rows = 1200; diff --git a/docs/cudf/source/conf.py b/docs/cudf/source/conf.py index d9ece9128e3..ac3ec01c384 100644 --- a/docs/cudf/source/conf.py +++ b/docs/cudf/source/conf.py @@ -455,9 +455,6 @@ def _generate_namespaces(namespaces): "deprecated", # TODO: This is currently in a src file but perhaps should be public "orc::column_statistics", - # Sphinx doesn't know how to distinguish between the ORC and Parquet - # definitions because Breathe doesn't to preserve namespaces for enums. - "TypeKind", # Span subclasses access base class members "base::", }