Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,21 @@ class datasource {
};
};

/**
* @brief Constructs datasources from dataset source information
*
* @ingroup io_datasources
*
* @param info Dataset source information
* @param offset Starting byte offset from which data will be read (default zero)
* @param max_size_estimate Upper estimate of the data range that will be read (default zero,
* which means the entire file after `offset`)
* @return Constructed vector of datasource objects
*/
std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just declared this helper in the header file. It's already defined in functions.cpp

size_t offset = 0,
size_t max_size_estimate = 0);

/** @} */ // end of group
} // namespace io
} // namespace CUDF_EXPORT cudf
15 changes: 15 additions & 0 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ class reader {
* @brief Constructor from an array of datasources
*
* @param sources Input `datasource` objects to read the dataset from
* @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
std::vector<FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down Expand Up @@ -134,13 +136,15 @@ class chunked_reader : private reader {
* @param pass_read_limit Limit on total amount of memory used for temporary computations during
* loading, or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down Expand Up @@ -248,6 +252,17 @@ class writer {
* metadata.
*/
parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> const> sources);

/**
* @brief Constructs FileMetaData objects from parquet dataset
*
* @param sources Input `datasource` objects to read the dataset from
*
* @return List of FileMetaData objects, one per parquet source
*/
std::vector<parquet::FileMetaData> read_parquet_footers(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could rename this t oread_parquet_raw_metadata as well

host_span<std::unique_ptr<datasource> const> sources);

} // namespace parquet::detail
} // namespace io
} // namespace CUDF_EXPORT cudf
9 changes: 5 additions & 4 deletions cpp/include/cudf/io/orc_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace io {
* @file
*/

//! ORC data type
using cudf::io::orc::TypeKind;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore for docs build


/**
* @brief Holds column names and buffers containing raw file-level and stripe-level statistics.
*
Expand Down Expand Up @@ -224,9 +227,7 @@ struct orc_column_schema {
* @param type ORC type
* @param children child columns (empty for non-nested types)
*/
orc_column_schema(std::string_view name,
orc::TypeKind type,
std::vector<orc_column_schema> children)
orc_column_schema(std::string_view name, TypeKind type, std::vector<orc_column_schema> children)
: _name{name}, _type_kind{type}, _children{std::move(children)}
{
}
Expand Down Expand Up @@ -282,7 +283,7 @@ struct orc_column_schema {

private:
std::string _name;
orc::TypeKind _type_kind;
TypeKind _type_kind;
std::vector<orc_column_schema> _children;
};

Expand Down
83 changes: 83 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,34 @@ table_with_metadata read_parquet(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Reads a Parquet dataset into a set of columns using pre-existing Parquet datasources and
* file metadatas.
*
* The following code snippet demonstrates how to read a dataset from a file:
* @code
* auto sources = cudf::io::make_datasources(cudf::io::source_info("dataset.parquet"));
* auto metadatas = cudf::io::read_parquet_metadata(sources);
* auto options = cudf::io::parquet_reader_options::builder();
* auto result = cudf::io::read_parquet(std::move(sources), std::move(metadatas), options);
* @endcode
*
* @param sources Input `datasource` objects to read the dataset from
* @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_parquet(
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief The chunked parquet reader class to read Parquet file iteratively in to a series of
* tables, chunk by chunk.
Expand Down Expand Up @@ -705,6 +733,30 @@ class chunked_parquet_reader {
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Constructor for chunked reader using pre-existing Parquet datasources and
* file metadatas.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, and an additional parameter to specify the size byte limit of the
* output table for each reading.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Constructor for chunked reader.
*
Expand All @@ -731,6 +783,37 @@ class chunked_parquet_reader {
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Constructor for chunked reader using pre-existing Parquet datasources and
* file metadatas.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the
* output table for each reading, and a byte limit on the amount of temporary memory to use
* when reading. pass_read_limit affects how many row groups we can read at a time by limiting
* the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an
* absolute limit - if a single row group cannot fit within the limit given, it will still be
* loaded.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
* `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Destructor, destroying the internal reader instance.
*
Expand Down
15 changes: 14 additions & 1 deletion cpp/include/cudf/io/parquet_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include <cudf/io/datasource.hpp>
#include <cudf/io/parquet_schema.hpp>
#include <cudf/io/types.hpp>
#include <cudf/utilities/export.hpp>
Expand Down Expand Up @@ -262,13 +263,25 @@ class parquet_metadata {
*
* @ingroup io_readers
*
* @param src_info Dataset source
* @param src_info Dataset source information
*
* @return parquet_metadata with parquet schema, number of rows, number of row groups and key-value
* metadata
*/
parquet_metadata read_parquet_metadata(source_info const& src_info);

/**
* @brief Constructs FileMetaData objects from parquet dataset
*
* @ingroup io_readers
*
* @param sources Input `datasource` objects to read the dataset from
*
* @return List of FileMetaData objects, one per parquet source
*/
std::vector<parquet::FileMetaData> read_parquet_metadata(
cudf::host_span<std::unique_ptr<cudf::io::datasource> const> sources);

/** @} */ // end of group
} // namespace io
} // namespace CUDF_EXPORT cudf
88 changes: 80 additions & 8 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cudf/io/orc_metadata.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/parquet_metadata.hpp>
#include <cudf/io/parquet_schema.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>

Expand Down Expand Up @@ -150,11 +151,12 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder(
return chunked_parquet_writer_options_builder{sink};
}

namespace {

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move it out of the anonymous namespace

* @copydoc cudf::io::make_datasources
*/
std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info,
size_t offset = 0,
size_t max_size_estimate = 0)
size_t offset,
size_t max_size_estimate)
{
switch (info.type()) {
case io_type::FILEPATH: {
Expand Down Expand Up @@ -188,6 +190,8 @@ std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info
}
}

namespace {

std::vector<std::unique_ptr<data_sink>> make_datasinks(sink_info const& info)
{
switch (info.type()) {
Expand Down Expand Up @@ -612,8 +616,22 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader =
std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr);
auto reader = std::make_unique<detail_parquet::reader>(
std::move(datasources), std::vector<parquet::FileMetaData>{}, options, stream, mr);

return reader->read();
}

table_with_metadata read_parquet(std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();

auto reader = std::make_unique<detail_parquet::reader>(
Copy link
Contributor

@pmattione-nvidia pmattione-nvidia Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to moving out of these vectors? what if you want to call read_parquet() twice on the same file? One of the points is to reuse the metadata for multiple calls, right? and if we're just doing copies instead, we should probably be passing in std::span's instead of vectors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with that approach as well. @wence- @JigaoLuo does any of your use cases involve reading the same file via (datasource and metadata) again and again?

Copy link
Contributor

@pmattione-nvidia pmattione-nvidia Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abellina can you comment on what we need for spark here? would we share the parquet metadata between spark tasks?

Copy link
Contributor

@abellina abellina Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, tasks do not share data when reading from parquet, and we should not need to re-read the same file or same buffer multiple times during normal operations. We can technically call read_parquet multiple times if we OOM on the same host buffer, but that is an error condition and we are likely to recreate the reader from scratch.

Does this change affect chunking at all? that's the one case where I could see an OOM causing us to call into cuDF to re-read that chunk.

Copy link
Member Author

@mhaseeb123 mhaseeb123 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is just a new interface to the parquet readers (both chunked and non-chunked) where you can skip creating datasource and reading in metadata and directly pass in existing ones.

One advantage could be that you can save some time if reading the same file but with different options (make sure to create a local copy of metadatas and datasources as needed as we are using move semantics)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't we have to reread the same file multiple times if we hit the chunked read memory limit? wouldn't we want to reuse the datasource and metadata to read each chunk in that case?

Copy link
Member Author

@mhaseeb123 mhaseeb123 Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the move semantics here are ok since we are passing in unique ptrs to datasources which get destructed with the reader. I could change the PR to move datasources but copy metadatas but that seems inconsistent. The current state allows the user to create a copy of both the datasource and metadata explicitly and pass in as needed instead of always.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, avoiding the metadata copies can actually impact performance. When I removed an accidental copy in read_parquet_metadata, the PDS-H overall got measurably faster (single digit %).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with the user explicitly having to opt in to copying things if they need to.

std::move(datasources), std::move(parquet_metadatas), options, stream, mr);

return reader->read();
}
Expand All @@ -626,6 +644,13 @@ parquet_metadata read_parquet_metadata(source_info const& src_info)
return detail_parquet::read_parquet_metadata(datasources);
}

std::vector<parquet::FileMetaData> read_parquet_metadata(
host_span<std::unique_ptr<cudf::io::datasource> const> sources)
{
CUDF_FUNC_RANGE();
return detail_parquet::read_parquet_footers(sources);
}

/**
* @copydoc cudf::io::merge_row_group_metadata
*/
Expand Down Expand Up @@ -700,8 +725,33 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(
chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)}
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
std::vector<parquet::FileMetaData>{},
options,
stream,
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(
std::size_t chunk_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
std::move(datasources),
std::move(parquet_metadatas),
options,
stream,
mr)}
{
}

Expand All @@ -716,6 +766,28 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
std::vector<parquet::FileMetaData>{},
options,
stream,
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources,
std::vector<parquet::FileMetaData>&& parquet_metadatas,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
std::move(datasources),
std::move(parquet_metadatas),
options,
stream,
mr)}
Expand Down
Loading