Skip to content

Commit 03a1867

Browse files
authored
GH-46676: [C++][Python][Parquet] Allow reading Parquet LIST data as LargeList directly (#46678)
### Rationale for this change When reading a Parquet LIST logical type (or a repeated field without a logical type), Parquet C++ automatically reads it as a Arrow List array. However, this can in some cases run into the 32-bit offsets limit. We'd like to be able to choose to read as LargeList instead, even if there is no serialized Arrow schema in the Parquet file. ### What changes are included in this PR? * Add a Parquet read option `list_type` to select which Arrow type to read LIST / repeated Parquet columns into * Fix an index truncation bug when writing a huge single chunk of data to Parquet ### Are these changes tested? Yes, the functionality is tested. However, I wasn't able to write a unit test that wouldn't consume a horrendous amount of time or memory writing/reading a list with offsets larger than 2**32. ### Are there any user-facing changes? No, only an API improvement. * GitHub Issue: #46676 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 832bfa1 commit 03a1867

17 files changed

+632
-412
lines changed

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
117117
properties.set_coerce_int96_timestamp_unit(
118118
format.reader_options.coerce_int96_timestamp_unit);
119119
properties.set_binary_type(format.reader_options.binary_type);
120+
properties.set_list_type(format.reader_options.list_type);
120121
return properties;
121122
}
122123

@@ -445,7 +446,8 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {
445446
return (reader_options.dict_columns == other_reader_options.dict_columns &&
446447
reader_options.coerce_int96_timestamp_unit ==
447448
other_reader_options.coerce_int96_timestamp_unit &&
448-
reader_options.binary_type == other_reader_options.binary_type);
449+
reader_options.binary_type == other_reader_options.binary_type &&
450+
reader_options.list_type == other_reader_options.list_type);
449451
}
450452

451453
ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties)

cpp/src/arrow/dataset/file_parquet.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
9191
std::unordered_set<std::string> dict_columns;
9292
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
9393
Type::type binary_type = Type::BINARY;
94+
Type::type list_type = Type::LIST;
9495
/// @}
9596
} reader_options;
9697

cpp/src/parquet/CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,17 +402,18 @@ add_parquet_test(writer-test
402402

403403
add_parquet_test(chunker-test SOURCES chunker_internal_test.cc)
404404

405-
add_parquet_test(arrow-test
405+
add_parquet_test(arrow-reader-writer-test
406406
SOURCES
407-
arrow/arrow_metadata_test.cc
408407
arrow/arrow_reader_writer_test.cc
409-
arrow/arrow_schema_test.cc
410408
arrow/arrow_statistics_test.cc
411409
arrow/variant_test.cc)
412410

413411
add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
414412
arrow/reconstruct_internal_test.cc)
415413

414+
add_parquet_test(arrow-metadata-test SOURCES arrow/arrow_metadata_test.cc
415+
arrow/arrow_schema_test.cc)
416+
416417
if(PARQUET_REQUIRE_ENCRYPTION)
417418
add_parquet_test(encryption-test
418419
SOURCES

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 75 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ static constexpr int LARGE_SIZE = 10000;
124124

125125
static constexpr uint32_t kDefaultSeed = 0;
126126

127+
struct ListCase {
128+
::arrow::Type::type type_id;
129+
std::function<std::shared_ptr<::arrow::DataType>(std::shared_ptr<::arrow::Field>)>
130+
type_factory;
131+
};
132+
133+
static const std::vector<ListCase> kListCases = {
134+
{::arrow::Type::LIST,
135+
[](std::shared_ptr<::arrow::Field> field) { return ::arrow::list(field); }},
136+
{::arrow::Type::LARGE_LIST,
137+
[](std::shared_ptr<::arrow::Field> field) { return ::arrow::large_list(field); }},
138+
};
139+
127140
std::shared_ptr<const LogicalType> get_logical_type(const DataType& type) {
128141
switch (type.id()) {
129142
case ArrowId::UINT8:
@@ -426,10 +439,13 @@ void CheckConfiguredRoundtrip(
426439
const std::shared_ptr<::parquet::WriterProperties>& writer_properties =
427440
::parquet::default_writer_properties(),
428441
const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties =
429-
default_arrow_writer_properties()) {
442+
default_arrow_writer_properties(),
443+
const ArrowReaderProperties& arrow_reader_properties =
444+
default_arrow_reader_properties()) {
430445
std::shared_ptr<Table> actual_table;
431446
ASSERT_NO_FATAL_FAILURE(DoRoundtrip(input_table, input_table->num_rows(), &actual_table,
432-
writer_properties, arrow_writer_properties));
447+
writer_properties, arrow_writer_properties,
448+
arrow_reader_properties));
433449
if (expected_table) {
434450
ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*actual_table->schema(),
435451
*expected_table->schema(),
@@ -446,14 +462,18 @@ void CheckConfiguredRoundtrip(
446462
void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
447463
int64_t row_group_size, const std::vector<int>& column_subset,
448464
std::shared_ptr<Table>* out,
449-
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
450-
default_arrow_writer_properties()) {
465+
const std::shared_ptr<ArrowWriterProperties>&
466+
arrow_writer_properties = default_arrow_writer_properties(),
467+
const ArrowReaderProperties& arrow_reader_properties =
468+
default_arrow_reader_properties()) {
451469
std::shared_ptr<Buffer> buffer;
452470
ASSERT_NO_FATAL_FAILURE(
453-
WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));
471+
WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer));
454472

455-
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
456-
::arrow::default_memory_pool()));
473+
std::unique_ptr<FileReader> reader;
474+
FileReaderBuilder builder;
475+
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
476+
ASSERT_OK(builder.properties(arrow_reader_properties)->Build(&reader));
457477

458478
reader->set_use_threads(use_threads);
459479
if (column_subset.size() > 0) {
@@ -468,15 +488,15 @@ void DoRoundTripWithBatches(
468488
const std::shared_ptr<Table>& table, bool use_threads, int64_t row_group_size,
469489
const std::vector<int>& column_subset, std::shared_ptr<Table>* out,
470490
const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties =
471-
default_arrow_writer_properties()) {
491+
default_arrow_writer_properties(),
492+
ArrowReaderProperties arrow_reader_properties = default_arrow_reader_properties()) {
472493
std::shared_ptr<Buffer> buffer;
473494
ASSERT_NO_FATAL_FAILURE(
474495
WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer));
475496

476497
std::unique_ptr<FileReader> reader;
477498
FileReaderBuilder builder;
478499
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
479-
ArrowReaderProperties arrow_reader_properties;
480500
arrow_reader_properties.set_batch_size(row_group_size - 1);
481501
ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool())
482502
->properties(arrow_reader_properties)
@@ -497,23 +517,24 @@ void DoRoundTripWithBatches(
497517
ASSERT_OK_AND_ASSIGN(*out, Table::FromRecordBatchReader(batch_reader.get()));
498518
}
499519

500-
void CheckSimpleRoundtrip(
501-
const std::shared_ptr<Table>& table, int64_t row_group_size,
502-
const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties =
503-
default_arrow_writer_properties()) {
520+
void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
521+
const std::shared_ptr<ArrowWriterProperties>&
522+
arrow_writer_properties = default_arrow_writer_properties(),
523+
const ArrowReaderProperties& arrow_reader_properties =
524+
default_arrow_reader_properties()) {
504525
std::shared_ptr<Table> result;
505-
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, false /* use_threads */,
506-
row_group_size, {}, &result,
507-
arrow_writer_properties));
526+
ASSERT_NO_FATAL_FAILURE(
527+
DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
528+
arrow_writer_properties, arrow_reader_properties));
508529
::arrow::AssertSchemaEqual(*table->schema(), *result->schema(),
509530
/*check_metadata=*/false);
510531
ASSERT_OK(result->ValidateFull());
511532

512533
::arrow::AssertTablesEqual(*table, *result, false);
513534

514-
ASSERT_NO_FATAL_FAILURE(DoRoundTripWithBatches(table, false /* use_threads */,
515-
row_group_size, {}, &result,
516-
arrow_writer_properties));
535+
ASSERT_NO_FATAL_FAILURE(
536+
DoRoundTripWithBatches(table, false /* use_threads */, row_group_size, {}, &result,
537+
arrow_writer_properties, arrow_reader_properties));
517538
::arrow::AssertSchemaEqual(*table->schema(), *result->schema(),
518539
/*check_metadata=*/false);
519540
ASSERT_OK(result->ValidateFull());
@@ -3198,8 +3219,22 @@ TEST(ArrowReadWrite, LargeList) {
31983219
[7, 8, 9]])";
31993220
auto array = ::arrow::ArrayFromJSON(type, json);
32003221
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3201-
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
3202-
CheckSimpleRoundtrip(table, 2, props_store_schema);
3222+
{
3223+
// If the schema is stored, the large_list is restored regardless of
3224+
// the list_type setting
3225+
for (auto list_type : {::arrow::Type::LIST, ::arrow::Type::LARGE_LIST}) {
3226+
ArrowReaderProperties reader_props;
3227+
reader_props.set_list_type(list_type);
3228+
auto writer_props = ArrowWriterProperties::Builder().store_schema()->build();
3229+
CheckSimpleRoundtrip(table, 2, writer_props, reader_props);
3230+
}
3231+
}
3232+
{
3233+
// If the schema is not stored, large_list is read depending on the list_type setting
3234+
ArrowReaderProperties reader_props;
3235+
reader_props.set_list_type(::arrow::Type::LARGE_LIST);
3236+
CheckSimpleRoundtrip(table, 2, default_arrow_writer_properties(), reader_props);
3237+
}
32033238
}
32043239

32053240
TEST(ArrowReadWrite, FixedSizeList) {
@@ -3224,20 +3259,25 @@ TEST(ArrowReadWrite, ListOfStructOfList2) {
32243259
using ::arrow::list;
32253260
using ::arrow::struct_;
32263261

3227-
auto type =
3228-
list(field("item",
3229-
struct_({field("a", ::arrow::int16(), /*nullable=*/false),
3230-
field("b", list(::arrow::int64()), /*nullable=*/false)}),
3231-
/*nullable=*/false));
3232-
3233-
const char* json = R"([
3234-
[{"a": 123, "b": [1, 2, 3]}],
3235-
null,
3236-
[],
3237-
[{"a": 456, "b": []}, {"a": 789, "b": [null]}, {"a": 876, "b": [4, 5, 6]}]])";
3238-
auto array = ::arrow::ArrayFromJSON(type, json);
3239-
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3240-
CheckSimpleRoundtrip(table, 2);
3262+
for (const auto& list_case : kListCases) {
3263+
auto type = list_case.type_factory(
3264+
field("item",
3265+
struct_({field("a", ::arrow::int16(), /*nullable=*/false),
3266+
field("b", list_case.type_factory(field("item", ::arrow::int64())),
3267+
/*nullable=*/false)}),
3268+
/*nullable=*/false));
3269+
3270+
const char* json = R"([
3271+
[{"a": 123, "b": [1, 2, 3]}],
3272+
null,
3273+
[],
3274+
[{"a": 456, "b": []}, {"a": 789, "b": [null]}, {"a": 876, "b": [4, 5, 6]}]])";
3275+
auto array = ::arrow::ArrayFromJSON(type, json);
3276+
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3277+
ArrowReaderProperties reader_props;
3278+
reader_props.set_list_type(list_case.type_id);
3279+
CheckSimpleRoundtrip(table, 2, default_arrow_writer_properties(), reader_props);
3280+
}
32413281
}
32423282

32433283
TEST(ArrowReadWrite, StructOfLists) {

0 commit comments

Comments
 (0)