Skip to content

Commit a2941dd

Browse files
authored
GH-43041: [C++][Python] Read/write Parquet BYTE_ARRAY as Large/View types directly (#46532)
### Rationale for this change Parquet has almost no support for LargeBinary and BinaryView data: * on writing, those types are not supported at all * on reading, data is decoded as regular Binary data with automatic chunking; if the stored Arrow schema points to a LargeBinary field, the data is later cast to that type ### What changes are included in this PR? * Refactor the BYTE_ARRAY column decoders to allow decoding directly into a LargeBinaryBuilder or a BinaryViewBuilder * Add a `binary_type` setting to `ArrowReaderProperties` to change the type that BYTE_ARRAY columns are decoded to by default * Support reading Parquet GEOMETRY types with a LargeBinary or BinaryView storage * Add benchmarks for reading and writing BinaryView data from/to Parquet * Add the corresponding Python bindings ### Are these changes tested? Yes. ### Are there any user-facing changes? New APIs and improved functionality. * GitHub Issue: #43041 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 1943911 commit a2941dd

38 files changed

+1049
-663
lines changed

cpp/src/arrow/compute/expression_test.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ const std::shared_ptr<Schema> kBoringSchema = schema({
6767
field("ts_s_utc", timestamp(TimeUnit::SECOND, "UTC")),
6868
});
6969

70-
#define EXPECT_OK ARROW_EXPECT_OK
71-
7270
Expression cast(Expression argument, std::shared_ptr<DataType> to_type) {
7371
return call("cast", {std::move(argument)},
7472
compute::CastOptions::Safe(std::move(to_type)));

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
116116
}
117117
properties.set_coerce_int96_timestamp_unit(
118118
format.reader_options.coerce_int96_timestamp_unit);
119+
properties.set_binary_type(format.reader_options.binary_type);
119120
return properties;
120121
}
121122

@@ -443,7 +444,8 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {
443444
// FIXME implement comparison for decryption options
444445
return (reader_options.dict_columns == other_reader_options.dict_columns &&
445446
reader_options.coerce_int96_timestamp_unit ==
446-
other_reader_options.coerce_int96_timestamp_unit);
447+
other_reader_options.coerce_int96_timestamp_unit &&
448+
reader_options.binary_type == other_reader_options.binary_type);
447449
}
448450

449451
ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties)

cpp/src/arrow/dataset/file_parquet.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
9090
/// @{
9191
std::unordered_set<std::string> dict_columns;
9292
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
93+
Type::type binary_type = Type::BINARY;
9394
/// @}
9495
} reader_options;
9596

@@ -242,8 +243,7 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
242243
/// ScanOptions.
243244
std::shared_ptr<parquet::ReaderProperties> reader_properties;
244245
/// Arrow reader properties. Not all properties are respected: batch_size comes from
245-
/// ScanOptions. Additionally, dictionary columns come from
246-
/// ParquetFileFormat::ReaderOptions::dict_columns.
246+
/// ScanOptions. Additionally, other options come from ParquetFileFormat::ReaderOptions.
247247
std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
248248
/// A configuration structure that provides decryption properties for a dataset
249249
std::shared_ptr<ParquetDecryptionConfig> parquet_decryption_config = NULLPTR;

cpp/src/arrow/testing/gtest_util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@
9898
<< _st.ToString(); \
9999
} while (false)
100100

101+
#define EXPECT_OK ARROW_EXPECT_OK
102+
103+
#define EXPECT_OK_NO_THROW(expr) EXPECT_NO_THROW(EXPECT_OK(expr))
104+
101105
#define ASSERT_NOT_OK(expr) \
102106
for (::arrow::Status _st = ::arrow::internal::GenericToStatus((expr)); _st.ok();) \
103107
FAIL() << "'" ARROW_STRINGIFY(expr) "' did not failed" << _st.ToString()

cpp/src/arrow/type_traits.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,6 +1237,22 @@ constexpr bool is_binary(Type::type type_id) {
12371237
return false;
12381238
}
12391239

1240+
/// \brief Check for a binary or binary view (non-string) type
1241+
///
1242+
/// \param[in] type_id the type-id to check
1243+
/// \return whether type-id is a binary type one
1244+
constexpr bool is_binary_or_binary_view(Type::type type_id) {
1245+
switch (type_id) {
1246+
case Type::BINARY:
1247+
case Type::LARGE_BINARY:
1248+
case Type::BINARY_VIEW:
1249+
return true;
1250+
default:
1251+
break;
1252+
}
1253+
return false;
1254+
}
1255+
12401256
/// \brief Check for a string type
12411257
///
12421258
/// \param[in] type_id the type-id to check
@@ -1252,6 +1268,22 @@ constexpr bool is_string(Type::type type_id) {
12521268
return false;
12531269
}
12541270

1271+
/// \brief Check for a string or string view type
1272+
///
1273+
/// \param[in] type_id the type-id to check
1274+
/// \return whether type-id is a string type one
1275+
constexpr bool is_string_or_string_view(Type::type type_id) {
1276+
switch (type_id) {
1277+
case Type::STRING:
1278+
case Type::LARGE_STRING:
1279+
case Type::STRING_VIEW:
1280+
return true;
1281+
default:
1282+
break;
1283+
}
1284+
return false;
1285+
}
1286+
12551287
/// \brief Check for a binary-view-like type (i.e. string view and binary view)
12561288
///
12571289
/// \param[in] type_id the type-id to check

cpp/src/arrow/util/bit_run_reader.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,26 @@ using ReverseSetBitRunReader = BaseSetBitRunReader</*Reverse=*/true>;
457457

458458
// Functional-style bit run visitors.
459459

460+
template <typename Visit>
461+
inline Status VisitBitRuns(const uint8_t* bitmap, int64_t offset, int64_t length,
462+
Visit&& visit) {
463+
if (bitmap == NULLPTR) {
464+
// Assuming all set (as in a null bitmap)
465+
return visit(static_cast<int64_t>(0), length, true);
466+
}
467+
BitRunReader reader(bitmap, offset, length);
468+
int64_t position = 0;
469+
while (true) {
470+
const auto run = reader.NextRun();
471+
if (run.length == 0) {
472+
break;
473+
}
474+
ARROW_RETURN_NOT_OK(visit(position, run.length, run.set));
475+
position += run.length;
476+
}
477+
return Status::OK();
478+
}
479+
460480
// XXX: Try to make this function small so the compiler can inline and optimize
461481
// the `visit` function, which is normally a hot loop with vectorizable code.
462482
// - don't inline SetBitRunReader constructor, it doesn't hurt performance

cpp/src/parquet/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,12 @@ configure_file(parquet_version.h.in "${CMAKE_CURRENT_BINARY_DIR}/parquet_version
369369
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet_version.h"
370370
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet")
371371

372+
set_source_files_properties(public_api_test.cc PROPERTIES SKIP_UNITY_BUILD_INCLUSION ON)
373+
372374
add_parquet_test(internals-test
373375
SOURCES
374376
bloom_filter_reader_test.cc
375377
bloom_filter_test.cc
376-
encoding_test.cc
377378
geospatial/statistics_test.cc
378379
geospatial/util_internal_test.cc
379380
metadata_test.cc
@@ -384,7 +385,7 @@ add_parquet_test(internals-test
384385
statistics_test.cc
385386
types_test.cc)
386387

387-
set_source_files_properties(public_api_test.cc PROPERTIES SKIP_UNITY_BUILD_INCLUSION ON)
388+
add_parquet_test(encoding-test SOURCES encoding_test.cc)
388389

389390
add_parquet_test(reader-test
390391
SOURCES

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 86 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "arrow/testing/gtest_util.h"
4848
#include "arrow/testing/random.h"
4949
#include "arrow/testing/util.h"
50+
#include "arrow/type_fwd.h"
5051
#include "arrow/type_traits.h"
5152
#include "arrow/util/checked_cast.h"
5253
#include "arrow/util/config.h" // for ARROW_CSV definition
@@ -71,6 +72,7 @@
7172
#include "parquet/column_writer.h"
7273
#include "parquet/file_writer.h"
7374
#include "parquet/page_index.h"
75+
#include "parquet/properties.h"
7476
#include "parquet/test_util.h"
7577

7678
using arrow::Array;
@@ -88,6 +90,7 @@ using arrow::DictionaryArray;
8890
using arrow::ListArray;
8991
using arrow::PrimitiveArray;
9092
using arrow::ResizableBuffer;
93+
using arrow::Result;
9194
using arrow::Scalar;
9295
using arrow::Status;
9396
using arrow::Table;
@@ -621,15 +624,28 @@ class ParquetIOTestBase : public ::testing::Test {
621624
return ParquetFileWriter::Open(sink_, schema);
622625
}
623626

627+
Result<std::unique_ptr<FileReader>> ReaderFromBuffer(
628+
const std::shared_ptr<Buffer>& buffer,
629+
const ArrowReaderProperties& properties = default_arrow_reader_properties()) {
630+
FileReaderBuilder builder;
631+
std::unique_ptr<FileReader> out;
632+
RETURN_NOT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
633+
RETURN_NOT_OK(builder.memory_pool(::arrow::default_memory_pool())
634+
->properties(properties)
635+
->Build(&out));
636+
return out;
637+
}
638+
639+
Result<std::unique_ptr<FileReader>> ReaderFromSink(
640+
const ArrowReaderProperties& properties = default_arrow_reader_properties()) {
641+
ARROW_ASSIGN_OR_RAISE(auto buffer, sink_->Finish());
642+
return ReaderFromBuffer(buffer, properties);
643+
}
644+
624645
void ReaderFromSink(
625646
std::unique_ptr<FileReader>* out,
626647
const ArrowReaderProperties& properties = default_arrow_reader_properties()) {
627-
ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish());
628-
FileReaderBuilder builder;
629-
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
630-
ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool())
631-
->properties(properties)
632-
->Build(out));
648+
ASSERT_OK_NO_THROW(ReaderFromSink(properties).Value(out));
633649
}
634650

635651
void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader,
@@ -647,16 +663,20 @@ class ParquetIOTestBase : public ::testing::Test {
647663
ASSERT_OK((*out)->ValidateFull());
648664
}
649665

650-
void ReadAndCheckSingleColumnFile(const Array& values) {
666+
void ReadAndCheckSingleColumnFile(std::unique_ptr<FileReader> file_reader,
667+
const Array& values) {
651668
std::shared_ptr<Array> out;
652-
653-
std::unique_ptr<FileReader> reader;
654-
ReaderFromSink(&reader);
655-
ReadSingleColumnFile(std::move(reader), &out);
656-
669+
ReadSingleColumnFile(std::move(file_reader), &out);
657670
AssertArraysEqual(values, *out);
658671
}
659672

673+
void ReadAndCheckSingleColumnFile(
674+
const Array& values,
675+
const ArrowReaderProperties& properties = default_arrow_reader_properties()) {
676+
ASSERT_OK_AND_ASSIGN(auto file_reader, ReaderFromSink(properties));
677+
ReadAndCheckSingleColumnFile(std::move(file_reader), values);
678+
}
679+
660680
void ReadTableFromFile(std::unique_ptr<FileReader> reader, bool expect_metadata,
661681
std::shared_ptr<Table>* out) {
662682
ASSERT_OK_NO_THROW(reader->ReadTable(out));
@@ -776,8 +796,16 @@ class TestReadDecimals : public ParquetIOTestBase {
776796
/*rep_levels=*/nullptr, byte_arrays.data());
777797
column_writer->Close();
778798
file_writer->Close();
799+
ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish());
779800

780-
ReadAndCheckSingleColumnFile(expected);
801+
// The binary_type setting shouldn't affect the results
802+
for (auto binary_type : {::arrow::Type::BINARY, ::arrow::Type::LARGE_BINARY,
803+
::arrow::Type::BINARY_VIEW}) {
804+
ArrowReaderProperties properties;
805+
properties.set_binary_type(binary_type);
806+
ASSERT_OK_AND_ASSIGN(auto reader, ReaderFromBuffer(buffer, properties));
807+
ReadAndCheckSingleColumnFile(std::move(reader), expected);
808+
}
781809
}
782810
};
783811

@@ -1390,50 +1418,56 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
13901418
AssertArraysEqual(*values, *chunked_array->chunk(0));
13911419
}
13921420

1393-
using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>;
1394-
1395-
TEST_F(TestLargeBinaryParquetIO, Basics) {
1396-
const char* json = "[\"foo\", \"\", null, \"\xff\"]";
1397-
1398-
const auto large_type = ::arrow::large_binary();
1399-
const auto narrow_type = ::arrow::binary();
1400-
const auto large_array = ::arrow::ArrayFromJSON(large_type, json);
1401-
const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json);
1402-
1403-
// When the original Arrow schema isn't stored, a LargeBinary array
1404-
// is decoded as Binary (since there is no specific Parquet logical
1405-
// type for it).
1406-
this->RoundTripSingleColumn(large_array, narrow_array,
1407-
default_arrow_writer_properties());
1421+
class TestBinaryLikeParquetIO : public ParquetIOTestBase {
1422+
public:
1423+
void CheckRoundTrip(std::string_view json, ::arrow::Type::type binary_type,
1424+
const std::shared_ptr<DataType>& specific_type,
1425+
const std::shared_ptr<DataType>& fallback_type) {
1426+
const auto specific_array = ::arrow::ArrayFromJSON(specific_type, json);
1427+
const auto fallback_array = ::arrow::ArrayFromJSON(fallback_type, json);
1428+
1429+
// When the original Arrow schema isn't stored, the array is decoded as
1430+
// the fallback type (since there is no specific Parquet logical
1431+
// type for it).
1432+
this->RoundTripSingleColumn(specific_array, /*expected=*/fallback_array,
1433+
default_arrow_writer_properties());
1434+
1435+
// When the original Arrow schema isn't stored and a binary_type is set,
1436+
// the array is decoded as the specific type.
1437+
ArrowReaderProperties reader_properties;
1438+
reader_properties.set_binary_type(binary_type);
1439+
this->RoundTripSingleColumn(specific_array, /*expected=*/specific_array,
1440+
default_arrow_writer_properties(), reader_properties);
1441+
this->RoundTripSingleColumn(fallback_array, /*expected=*/specific_array,
1442+
default_arrow_writer_properties(), reader_properties);
1443+
1444+
// When the original Arrow schema is stored, the array is decoded as the
1445+
// specific type.
1446+
const auto writer_properties =
1447+
ArrowWriterProperties::Builder().store_schema()->build();
1448+
this->RoundTripSingleColumn(specific_array, /*expected=*/specific_array,
1449+
writer_properties);
1450+
}
1451+
};
14081452

1409-
// When the original Arrow schema is stored, the LargeBinary array
1410-
// is read back as LargeBinary.
1411-
const auto arrow_properties =
1412-
::parquet::ArrowWriterProperties::Builder().store_schema()->build();
1413-
this->RoundTripSingleColumn(large_array, large_array, arrow_properties);
1453+
TEST_F(TestBinaryLikeParquetIO, LargeBinary) {
1454+
CheckRoundTrip("[\"foo\", \"\", null, \"\xff\"]", ::arrow::Type::LARGE_BINARY,
1455+
::arrow::large_binary(), ::arrow::binary());
14141456
}
14151457

1416-
using TestLargeStringParquetIO = TestParquetIO<::arrow::LargeStringType>;
1417-
1418-
TEST_F(TestLargeStringParquetIO, Basics) {
1419-
const char* json = R"(["foo", "", null, "bar"])";
1420-
1421-
const auto large_type = ::arrow::large_utf8();
1422-
const auto narrow_type = ::arrow::utf8();
1423-
const auto large_array = ::arrow::ArrayFromJSON(large_type, json);
1424-
const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json);
1458+
TEST_F(TestBinaryLikeParquetIO, BinaryView) {
1459+
CheckRoundTrip("[\"foo\", \"\", null, \"\xff\"]", ::arrow::Type::BINARY_VIEW,
1460+
::arrow::binary_view(), ::arrow::binary());
1461+
}
14251462

1426-
// When the original Arrow schema isn't stored, a LargeBinary array
1427-
// is decoded as Binary (since there is no specific Parquet logical
1428-
// type for it).
1429-
this->RoundTripSingleColumn(large_array, narrow_array,
1430-
default_arrow_writer_properties());
1463+
TEST_F(TestBinaryLikeParquetIO, LargeString) {
1464+
CheckRoundTrip(R"(["foo", "", null, "bar"])", ::arrow::Type::LARGE_BINARY,
1465+
::arrow::large_utf8(), ::arrow::utf8());
1466+
}
14311467

1432-
// When the original Arrow schema is stored, the LargeBinary array
1433-
// is read back as LargeBinary.
1434-
const auto arrow_properties =
1435-
::parquet::ArrowWriterProperties::Builder().store_schema()->build();
1436-
this->RoundTripSingleColumn(large_array, large_array, arrow_properties);
1468+
TEST_F(TestBinaryLikeParquetIO, StringView) {
1469+
CheckRoundTrip(R"(["foo", "", null, "bar"])", ::arrow::Type::BINARY_VIEW,
1470+
::arrow::utf8_view(), ::arrow::utf8());
14371471
}
14381472

14391473
using TestJsonParquetIO = TestParquetIO<::arrow::extension::JsonExtensionType>;

0 commit comments

Comments
 (0)