-
Notifications
You must be signed in to change notification settings - Fork 991
Allow parquet readers to use existing datasources and metadatas
#20693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
d01acf9
2c83509
2cb9738
67bd2bc
8bede77
b81c66a
ebce1a0
f7ce471
034983a
8b85949
e9e8976
3795225
0ef323c
d33c9ee
e056495
9e00764
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,11 +51,13 @@ class reader { | |
| * @brief Constructor from an array of datasources | ||
| * | ||
| * @param sources Input `datasource` objects to read the dataset from | ||
| * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty | ||
| * @param options Settings for controlling reading behavior | ||
| * @param stream CUDA stream used for device memory operations and kernel launches. | ||
| * @param mr Device memory resource to use for device memory allocation | ||
| */ | ||
| explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources, | ||
| std::vector<FileMetaData>&& parquet_metadatas, | ||
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr); | ||
|
|
@@ -134,13 +136,15 @@ class chunked_reader : private reader { | |
| * @param pass_read_limit Limit on total amount of memory used for temporary computations during | ||
| * loading, or `0` if there is no limit | ||
| * @param sources Input `datasource` objects to read the dataset from | ||
| * @param parquet_metadatas Pre-materialized Parquet file metadata(s). Read from sources if empty | ||
| * @param options Settings for controlling reading behavior | ||
| * @param stream CUDA stream used for device memory operations and kernel launches | ||
| * @param mr Device memory resource to use for device memory allocation | ||
| */ | ||
| explicit chunked_reader(std::size_t chunk_read_limit, | ||
| std::size_t pass_read_limit, | ||
| std::vector<std::unique_ptr<cudf::io::datasource>>&& sources, | ||
| std::vector<parquet::FileMetaData>&& parquet_metadatas, | ||
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr); | ||
|
|
@@ -248,6 +252,17 @@ class writer { | |
| * metadata. | ||
| */ | ||
| parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> const> sources); | ||
|
|
||
| /** | ||
| * @brief Constructs FileMetaData objects from parquet dataset | ||
| * | ||
| * @param sources Input `datasource` objects to read the dataset from | ||
| * | ||
| * @return List of FileMetaData objects, one per parquet source | ||
| */ | ||
| std::vector<parquet::FileMetaData> read_parquet_footers( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could rename this t o |
||
| host_span<std::unique_ptr<datasource> const> sources); | ||
|
|
||
| } // namespace parquet::detail | ||
| } // namespace io | ||
| } // namespace CUDF_EXPORT cudf | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,9 @@ namespace io { | |
| * @file | ||
| */ | ||
|
|
||
| //! ORC data type | ||
| using cudf::io::orc::TypeKind; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore for docs build |
||
|
|
||
| /** | ||
| * @brief Holds column names and buffers containing raw file-level and stripe-level statistics. | ||
| * | ||
|
|
@@ -224,9 +227,7 @@ struct orc_column_schema { | |
| * @param type ORC type | ||
| * @param children child columns (empty for non-nested types) | ||
| */ | ||
| orc_column_schema(std::string_view name, | ||
| orc::TypeKind type, | ||
| std::vector<orc_column_schema> children) | ||
| orc_column_schema(std::string_view name, TypeKind type, std::vector<orc_column_schema> children) | ||
| : _name{name}, _type_kind{type}, _children{std::move(children)} | ||
| { | ||
| } | ||
|
|
@@ -282,7 +283,7 @@ struct orc_column_schema { | |
|
|
||
| private: | ||
| std::string _name; | ||
| orc::TypeKind _type_kind; | ||
| TypeKind _type_kind; | ||
| std::vector<orc_column_schema> _children; | ||
| }; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| #include <cudf/io/orc_metadata.hpp> | ||
| #include <cudf/io/parquet.hpp> | ||
| #include <cudf/io/parquet_metadata.hpp> | ||
| #include <cudf/io/parquet_schema.hpp> | ||
| #include <cudf/utilities/default_stream.hpp> | ||
| #include <cudf/utilities/error.hpp> | ||
|
|
||
|
|
@@ -150,11 +151,12 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( | |
| return chunked_parquet_writer_options_builder{sink}; | ||
| } | ||
|
|
||
| namespace { | ||
|
|
||
| /** | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just move it out of the anonymous namespace |
||
| * @copydoc cudf::io::make_datasources | ||
| */ | ||
| std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info, | ||
| size_t offset = 0, | ||
| size_t max_size_estimate = 0) | ||
| size_t offset, | ||
| size_t max_size_estimate) | ||
| { | ||
| switch (info.type()) { | ||
| case io_type::FILEPATH: { | ||
|
|
@@ -188,6 +190,8 @@ std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info | |
| } | ||
| } | ||
|
|
||
| namespace { | ||
|
|
||
| std::vector<std::unique_ptr<data_sink>> make_datasinks(sink_info const& info) | ||
| { | ||
| switch (info.type()) { | ||
|
|
@@ -612,8 +616,22 @@ table_with_metadata read_parquet(parquet_reader_options const& options, | |
| CUDF_FUNC_RANGE(); | ||
|
|
||
| auto datasources = make_datasources(options.get_source()); | ||
| auto reader = | ||
| std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr); | ||
| auto reader = std::make_unique<detail_parquet::reader>( | ||
| std::move(datasources), std::vector<parquet::FileMetaData>{}, options, stream, mr); | ||
|
|
||
| return reader->read(); | ||
| } | ||
|
|
||
| table_with_metadata read_parquet(std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources, | ||
| std::vector<parquet::FileMetaData>&& parquet_metadatas, | ||
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| CUDF_FUNC_RANGE(); | ||
|
|
||
| auto reader = std::make_unique<detail_parquet::reader>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really want to moving out of these vectors? what if you want to call read_parquet() twice on the same file? One of the points is to reuse the metadata for multiple calls, right? and if we're just doing copies instead, we should probably be passing in std::span's instead of vectors.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @abellina can you comment on what we need for spark here? would we share the parquet metadata between spark tasks?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, tasks do not share data when reading from parquet, and we should not need to re-read the same file or same buffer multiple times during normal operations. We can technically call read_parquet multiple times if we OOM on the same host buffer, but that is an error condition and we are likely to recreate the reader from scratch. Does this change affect chunking at all? that's the one case where I could see an OOM causing us to call into cuDF to re-read that chunk.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, this is just a new interface to the parquet readers (both chunked and non-chunked) where you can skip creating One advantage could be that you can save some time if reading the same file but with different options (make sure to create a local copy of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wouldn't we have to reread the same file multiple times if we hit the chunked read memory limit? wouldn't we want to reuse the datasource and metadata to read each chunk in that case?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think the move semantics here are ok since we are passing in unique ptrs to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, avoiding the metadata copies can actually impact performance. When I removed an accidental copy in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am ok with the user explicitly having to opt in to copying things if they need to. |
||
| std::move(datasources), std::move(parquet_metadatas), options, stream, mr); | ||
|
|
||
| return reader->read(); | ||
| } | ||
|
|
@@ -626,6 +644,13 @@ parquet_metadata read_parquet_metadata(source_info const& src_info) | |
| return detail_parquet::read_parquet_metadata(datasources); | ||
| } | ||
|
|
||
| std::vector<parquet::FileMetaData> read_parquet_metadata( | ||
| host_span<std::unique_ptr<cudf::io::datasource> const> sources) | ||
| { | ||
| CUDF_FUNC_RANGE(); | ||
| return detail_parquet::read_parquet_footers(sources); | ||
| } | ||
|
|
||
| /** | ||
| * @copydoc cudf::io::merge_row_group_metadata | ||
| */ | ||
|
|
@@ -700,8 +725,33 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, | |
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| : reader{std::make_unique<detail_parquet::chunked_reader>( | ||
| chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)} | ||
| : reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit, | ||
| 0, | ||
| make_datasources(options.get_source()), | ||
| std::vector<parquet::FileMetaData>{}, | ||
| options, | ||
| stream, | ||
| mr)} | ||
| { | ||
| } | ||
|
|
||
| /** | ||
| * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader | ||
| */ | ||
| chunked_parquet_reader::chunked_parquet_reader( | ||
| std::size_t chunk_read_limit, | ||
| std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources, | ||
| std::vector<parquet::FileMetaData>&& parquet_metadatas, | ||
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| : reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit, | ||
| 0, | ||
| std::move(datasources), | ||
| std::move(parquet_metadatas), | ||
| options, | ||
| stream, | ||
| mr)} | ||
| { | ||
| } | ||
|
|
||
|
|
@@ -716,6 +766,28 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, | |
| : reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit, | ||
| pass_read_limit, | ||
| make_datasources(options.get_source()), | ||
| std::vector<parquet::FileMetaData>{}, | ||
| options, | ||
| stream, | ||
| mr)} | ||
| { | ||
| } | ||
|
|
||
| /** | ||
| * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader | ||
| */ | ||
| chunked_parquet_reader::chunked_parquet_reader( | ||
| std::size_t chunk_read_limit, | ||
| std::size_t pass_read_limit, | ||
| std::vector<std::unique_ptr<cudf::io::datasource>>&& datasources, | ||
| std::vector<parquet::FileMetaData>&& parquet_metadatas, | ||
| parquet_reader_options const& options, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| : reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit, | ||
| pass_read_limit, | ||
| std::move(datasources), | ||
| std::move(parquet_metadatas), | ||
| options, | ||
| stream, | ||
| mr)} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just declared this helper in the header file. It's already defined in
functions.cpp