Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class parquet_reader_options {

// Path in schema of column to read; `nullopt` is all
std::optional<std::vector<std::string>> _columns;
// Indices of top-level columns to read; `nullopt` is all (cannot be used alongside `_columns`)
std::optional<std::vector<cudf::size_type>> _column_indices;

// List of individual row groups to read (ignored if empty)
std::vector<std::vector<size_type>> _row_groups;
Expand Down Expand Up @@ -228,6 +230,13 @@ class parquet_reader_options {
*/
[[nodiscard]] auto const& get_columns() const { return _columns; }

/**
* @brief Returns indices of top-level columns to be read, if set.
*
* @return Indices of top-level columns to be read; `nullopt` if the option is not set
*/
[[nodiscard]] auto const& get_column_indices() const { return _column_indices; }

/**
* @brief Returns list of individual row groups to be read.
*
Expand Down Expand Up @@ -269,8 +278,9 @@ class parquet_reader_options {
* Applies the same list of column names across all sources. Unlike `set_row_groups`,
* which allows per-source configuration, `set_columns` applies globally.
*
* Columns that do not exist in the input files will be ignored silently.
* The output table will only include the columns that are actually found.
* Columns that do not exist in the input files will be ignored silently and the output table will
* only include the columns that are actually found. This behavior can be changed by setting
* `enable_ignore_missing_columns` to false.
*
* To select a nested column (e.g., a struct member), use dot notation.
*
Expand All @@ -282,7 +292,30 @@ class parquet_reader_options {
*
* @param col_names A vector of column names to attempt to read from each input source.
*/
void set_columns(std::vector<std::string> col_names) { _columns = std::move(col_names); }
void set_columns(std::vector<std::string> col_names)
{
CUDF_EXPECTS(not _column_indices.has_value(),
"Cannot select columns by indices and names simultaneously");
_columns = std::move(col_names);
}

/**
* @brief Sets the indices of top-level columns to be read from all input sources.
*
* Applies the same list of top-level column indices across all sources. Unlike `set_row_groups`,
* which allows per-source configuration, `set_column_indices` applies globally.
*
* Note that `set_column_indices` can only be used to select top-level columns. unlike
* `set_columns` which can also select nested columns.
*
* @param col_indices A vector of column indices to attempt to read from each input source.
*/
void set_column_indices(std::vector<cudf::size_type> col_indices)
{
CUDF_EXPECTS(not _columns.has_value(),
"Cannot select columns by indices and names simultaneously");
_column_indices = std::move(col_indices);
}

/**
* @brief Specifies which row groups to read from each input source.
Expand Down Expand Up @@ -453,10 +486,26 @@ class parquet_reader_options_builder {
*/
parquet_reader_options_builder& columns(std::vector<std::string> col_names)
{
CUDF_EXPECTS(not options._column_indices.has_value(),
"Cannot select columns by indices and names simultaneously");
options._columns = std::move(col_names);
return *this;
}

/**
* @brief Sets the indices of top-level columns to be read from all input sources.
*
* @param col_indices A vector of column indices to attempt to read from each input source.
* @return this for chaining
*/
parquet_reader_options_builder& column_indices(std::vector<cudf::size_type> col_indices)
{
CUDF_EXPECTS(not options._columns.has_value(),
"Cannot select columns by indices and names simultaneously");
options._column_indices = std::move(col_indices);
return *this;
}

/**
* @brief Sets vector of individual row groups to read.
*
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ void hybrid_scan_reader_impl::select_columns(read_columns_mode read_columns_mode
// Using as is from:
// https://github.com/rapidsai/cudf/blob/a8b25cd205dc5d04b9918dcb0b3abd6b8c4e4a74/cpp/src/io/parquet/reader_impl.cpp#L556-L569
std::optional<std::vector<std::string>> filter_only_columns_names;
if (options.get_filter().has_value() and options.get_columns().has_value()) {
if (options.get_filter().has_value() and
(options.get_columns().has_value() or options.get_column_indices().has_value())) {
auto select_column_names = get_column_projection(options);
filter_only_columns_names = cudf::io::parquet::detail::get_column_names_in_expression(
options.get_filter(), *(options.get_columns()));
options.get_filter(), *select_column_names);
_num_filter_only_columns = filter_only_columns_names->size();
}
std::tie(_input_columns, _output_buffers, _output_column_schemas) =
Expand Down Expand Up @@ -152,8 +154,9 @@ void hybrid_scan_reader_impl::select_columns(read_columns_mode read_columns_mode
} else {
if (_is_payload_columns_selected) { return; }

auto select_column_names = get_column_projection(options);
std::tie(_input_columns, _output_buffers, _output_column_schemas) =
_extended_metadata->select_payload_columns(options.get_columns(),
_extended_metadata->select_payload_columns(select_column_names,
_filter_columns_names,
_use_pandas_metadata,
_strings_to_categorical,
Expand Down
40 changes: 37 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,19 @@ reader_impl::reader_impl(std::size_t chunk_read_limit,
_reader_column_schema = options.get_column_schema();

// Select only columns required by the options and filter
auto select_column_names = get_column_projection(options);

std::optional<std::vector<std::string>> filter_columns_names;
if (options.get_filter().has_value() and options.get_columns().has_value()) {
if (options.get_filter().has_value() and
(options.get_columns().has_value() or options.get_column_indices().has_value())) {
// list, struct, dictionary are not supported by AST filter yet.
// extract columns not present in get_columns() & keep count to remove at end.
filter_columns_names =
get_column_names_in_expression(options.get_filter(), *(options.get_columns()));
get_column_names_in_expression(options.get_filter(), *select_column_names);
_num_filter_only_columns = filter_columns_names->size();
}
std::tie(_input_columns, _output_buffers, _output_column_schemas) =
_metadata->select_columns(options.get_columns(),
_metadata->select_columns(select_column_names,
filter_columns_names,
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
Expand Down Expand Up @@ -810,6 +813,37 @@ std::vector<size_t> reader_impl::calculate_output_num_rows_per_source(size_t con
return num_rows_per_source;
}

std::optional<std::vector<std::string>> reader_impl::get_column_projection(
parquet_reader_options const& options) const
{
auto const has_column_names = options.get_columns().has_value();
auto const has_column_indices = options.get_column_indices().has_value();

CUDF_EXPECTS(not(has_column_names and has_column_indices),
"Cannot select columns by both names and indices simultaneously");

// No column selection specified. Return nullopt indicating all columns to be selected
if (not has_column_names and not has_column_indices) {
return std::nullopt;
} else if (has_column_names) {
return options.get_columns();
} else {
auto const ignore_missing_columns = options.is_enabled_ignore_missing_columns();
std::vector<std::string> col_names;
auto const& top_level_schema_indices = _metadata->get_schema(0).children_idx;
for (auto const index : options.get_column_indices().value()) {
auto const is_valid_index =
std::cmp_greater_equal(index, 0) and std::cmp_less(index, top_level_schema_indices.size());
CUDF_EXPECTS(ignore_missing_columns or is_valid_index,
"Encountered an invalid col index in the top-level column selection");
if (is_valid_index) {
col_names.emplace_back(_metadata->get_schema(top_level_schema_indices[index]).name);
}
}
return std::make_optional(std::move(col_names));
}
}

table_with_metadata reader_impl::finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ class reader_impl {
[[nodiscard]] std::vector<size_t> calculate_output_num_rows_per_source(size_t chunk_start_row,
size_t chunk_num_rows);

/**
* @brief Computes the names of columns to be read from the file, if specified.
*
* @param options The reader options
* @return Names of columns to be read from the file if specified, `nullopt` otherwise
*/
[[nodiscard]] std::optional<std::vector<std::string>> get_column_projection(
parquet_reader_options const& options) const;

rmm::cuda_stream_view _stream;
rmm::device_async_resource_ref _mr{cudf::get_current_device_resource_ref()};

Expand Down
99 changes: 73 additions & 26 deletions cpp/tests/io/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -396,10 +396,10 @@ TEST_F(ParquetReaderTest, ReorderedColumns)
cudf::io::write_parquet(opts);

{
// read them out of order
// read them out of order using indices
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.columns({"d", "a", "b", "c"});
.column_indices({3, 0, 1, 2});
auto result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->view().column(0), d);
Expand All @@ -422,10 +422,10 @@ TEST_F(ParquetReaderTest, ReorderedColumns)
}

{
// read them out of order
// read them out of order using indices
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.columns({"d", "c", "b", "a"});
.column_indices({3, 2, 1, 0});
auto result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->view().column(0), d);
Expand Down Expand Up @@ -987,22 +987,37 @@ TEST_F(ParquetReaderTest, EmptyOutput)
TEST_F(ParquetReaderTest, EmptyColumnsParam)
{
srand(31337);
auto const expected = create_random_fixed_table<int>(2, 4, false);

std::vector<char> out_buffer;
cudf::io::parquet_writer_options args =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&out_buffer}, *expected);
cudf::io::write_parquet(args);
{
auto const table = create_random_fixed_table<int>(2, 4, false);
cudf::io::parquet_writer_options args =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&out_buffer}, *table);
cudf::io::write_parquet(args);
}

cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(
cudf::io::source_info{cudf::host_span<std::byte const>{
reinterpret_cast<std::byte const*>(out_buffer.data()), out_buffer.size()}})
.columns({});
auto const result = cudf::io::read_parquet(read_opts);
{
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(
cudf::io::source_info{cudf::host_span<std::byte const>{
reinterpret_cast<std::byte const*>(out_buffer.data()), out_buffer.size()}})
.columns({});
auto const result = cudf::io::read_parquet(read_opts);

EXPECT_EQ(result.tbl->num_columns(), 0);
EXPECT_EQ(result.tbl->num_rows(), 0);
}

{
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(
cudf::io::source_info{cudf::host_span<std::byte const>{
reinterpret_cast<std::byte const*>(out_buffer.data()), out_buffer.size()}})
.column_indices({});
auto const result = cudf::io::read_parquet(read_opts);

EXPECT_EQ(result.tbl->num_columns(), 0);
EXPECT_EQ(result.tbl->num_rows(), 0);
EXPECT_EQ(result.tbl->num_columns(), 0);
EXPECT_EQ(result.tbl->num_rows(), 0);
}
}

TEST_F(ParquetReaderTest, BinaryAsStrings)
Expand Down Expand Up @@ -1321,13 +1336,26 @@ TEST_F(ParquetReaderTest, ReorderedReadMultipleFiles)
cudf::io::write_parquet(out_opts2);

// read in both files swapping the columns
auto read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{{filepath1, filepath2}})
.columns({"_col1", "_col0"});
auto result = cudf::io::read_parquet(read_opts);
auto sliced = cudf::slice(result.tbl->view(), {0, num_rows, num_rows, 2 * num_rows});
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[0], swapped1);
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[1], swapped2);
{
auto read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{{filepath1, filepath2}})
.columns({"_col1", "_col0"});
auto result = cudf::io::read_parquet(read_opts);
auto sliced = cudf::slice(result.tbl->view(), {0, num_rows, num_rows, 2 * num_rows});
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[0], swapped1);
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[1], swapped2);
}

// read in both files swapping the columns using indices
{
auto read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{{filepath1, filepath2}})
.column_indices({1, 0});
auto result = cudf::io::read_parquet(read_opts);
auto sliced = cudf::slice(result.tbl->view(), {0, num_rows, num_rows, 2 * num_rows});
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[0], swapped1);
CUDF_TEST_EXPECT_TABLES_EQUAL(sliced[1], swapped2);
}
}

TEST_F(ParquetReaderTest, NoFilter)
Expand Down Expand Up @@ -1453,6 +1481,13 @@ TEST_F(ParquetReaderTest, FilterWithColumnProjection)
.filter(read_expr);
auto result = cudf::io::read_parquet(read_opts);
CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);

// Repeat but select columns using indices instead of names
read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.column_indices({2})
.filter(read_expr);
result = cudf::io::read_parquet(read_opts);
CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);
}

{ // column_reference in parquet filter (indices as per order of column projection)
Expand All @@ -1465,7 +1500,13 @@ TEST_F(ParquetReaderTest, FilterWithColumnProjection)
.columns({"col_double", "col_uint32"})
.filter(read_ref_expr);
auto result = cudf::io::read_parquet(read_opts);
CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);
CUDF_TEST_EXPECT_TABLES_EQUAL(*(cudf::io::read_parquet(read_opts).tbl), *expected);

// Repeat but select columns using indices instead of names
read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.column_indices({2, 0})
.filter(read_ref_expr);
CUDF_TEST_EXPECT_TABLES_EQUAL(*(cudf::io::read_parquet(read_opts).tbl), *expected);
}

// Error cases
Expand All @@ -1477,6 +1518,12 @@ TEST_F(ParquetReaderTest, FilterWithColumnProjection)
.columns({"col_double", "col_uint32"})
.filter(read_ref_expr);
EXPECT_THROW(cudf::io::read_parquet(read_opts), cudf::logic_error);

// Repeat but select columns using indices instead of names
read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.column_indices({2, 0})
.filter(read_ref_expr);
EXPECT_THROW(cudf::io::read_parquet(read_opts), cudf::logic_error);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ cdef class ParquetReaderOptions:
cpdef void set_num_rows(self, int64_t nrows)
cpdef void set_skip_rows(self, int64_t skip_rows)
cpdef void set_columns(self, list col_names)
cpdef void set_column_indices(self, list col_indices)
cpdef void set_filter(self, Expression filter)
cpdef void set_source(self, SourceInfo src)
cpdef bool is_enabled_use_jit_filter(self)
Expand All @@ -61,6 +62,7 @@ cdef class ParquetReaderOptionsBuilder:
cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val)
cpdef ParquetReaderOptionsBuilder filter(self, Expression filter)
cpdef ParquetReaderOptionsBuilder columns(self, list col_names)
cpdef ParquetReaderOptionsBuilder column_indices(self, list col_indices)
cpdef ParquetReaderOptionsBuilder use_jit_filter(self, bool use_jit_filter)
cpdef build(self)

Expand Down
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ParquetReaderOptions:
def set_num_rows(self, nrows: int): ...
def set_skip_rows(self, skip_rows: int): ...
def set_columns(self, col_names: list[str]): ...
def set_column_indices(self, col_indices: list[int]): ...
def set_filter(self, filter: Expression): ...
def set_source(self, src: SourceInfo) -> None: ...
def is_enabled_use_jit_filter(self) -> bool: ...
Expand Down
Loading
Loading