Skip to content

Commit a444380

Browse files
authored
GH-44345: [C++][Parquet] Add Decimal32/64 support to Parquet (#47427)
### Rationale for this change As described in #44345, `Decimal32`/`Decimal64` have been implemented but Parquet has poor support. This change allows to write `Decimal32`/`Decimal64` into Parquet file the same way as `Decimal128`/`Decimal256` and to read `Decimal32`/`Decimal64` from an existing Parquet file. ### What changes are included in this PR? 1. Support writing `Decimal32`/`Decimal64` as `INT32`/`INT64`/`BYTE_ARRAY`/`FIXED_LEN_BYTE_ARRAY` into Parquet file. 2. Support reading Parquet column with logical type Decimal. Either reading type from metadata or infering Arrow Decimal type is supported. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. A flag named `smallest_decimal_enabled_` is added in `ArrowReaderProperties`. To maintain backward compatibility, only when the flag is `true`, Arrow will infer Decimal with small precision to `Decimal32`/`Decimal64` instead of `Decimal128`. * GitHub Issue: #44345 Authored-by: Zehua Zou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent d80e6ff commit a444380

File tree

9 files changed

+314
-226
lines changed

9 files changed

+314
-226
lines changed

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 109 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -795,9 +795,10 @@ class ParquetIOTestBase : public ::testing::Test {
795795

796796
class TestReadDecimals : public ParquetIOTestBase {
797797
public:
798-
void CheckReadFromByteArrays(const std::shared_ptr<const LogicalType>& logical_type,
799-
const std::vector<std::vector<uint8_t>>& values,
800-
const Array& expected) {
798+
void CheckReadFromByteArrays(
799+
const std::shared_ptr<const LogicalType>& logical_type,
800+
const std::vector<std::vector<uint8_t>>& values, const Array& expected,
801+
ArrowReaderProperties properties = default_arrow_reader_properties()) {
801802
std::vector<ByteArray> byte_arrays(values.size());
802803
std::transform(values.begin(), values.end(), byte_arrays.begin(),
803804
[](const std::vector<uint8_t>& bytes) {
@@ -822,7 +823,6 @@ class TestReadDecimals : public ParquetIOTestBase {
822823
// The binary_type setting shouldn't affect the results
823824
for (auto binary_type : {::arrow::Type::BINARY, ::arrow::Type::LARGE_BINARY,
824825
::arrow::Type::BINARY_VIEW}) {
825-
ArrowReaderProperties properties;
826826
properties.set_binary_type(binary_type);
827827
ASSERT_OK_AND_ASSIGN(auto reader, ReaderFromBuffer(buffer, properties));
828828
ReadAndCheckSingleColumnFile(std::move(reader), expected);
@@ -833,6 +833,44 @@ class TestReadDecimals : public ParquetIOTestBase {
833833
// The Decimal roundtrip tests always go through the FixedLenByteArray path,
834834
// check the ByteArray case manually.
835835

836+
TEST_F(TestReadDecimals, Decimal32ByteArray) {
837+
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
838+
// 123456
839+
{1, 226, 64},
840+
// 987654
841+
{15, 18, 6},
842+
// -123456
843+
{255, 254, 29, 192},
844+
};
845+
846+
ArrowReaderProperties properties = default_arrow_reader_properties();
847+
properties.set_smallest_decimal_enabled(true);
848+
849+
auto expected =
850+
ArrayFromJSON(::arrow::decimal32(6, 3), R"(["123.456", "987.654", "-123.456"])");
851+
CheckReadFromByteArrays(LogicalType::Decimal(6, 3), big_endian_decimals, *expected,
852+
properties);
853+
}
854+
855+
TEST_F(TestReadDecimals, Decimal64ByteArray) {
856+
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
857+
// 123456
858+
{1, 226, 64},
859+
// 987654
860+
{15, 18, 6},
861+
// -123456
862+
{255, 255, 255, 255, 255, 254, 29, 192},
863+
};
864+
865+
ArrowReaderProperties properties = default_arrow_reader_properties();
866+
properties.set_smallest_decimal_enabled(true);
867+
868+
auto expected =
869+
ArrayFromJSON(::arrow::decimal64(16, 3), R"(["123.456", "987.654", "-123.456"])");
870+
CheckReadFromByteArrays(LogicalType::Decimal(16, 3), big_endian_decimals, *expected,
871+
properties);
872+
}
873+
836874
TEST_F(TestReadDecimals, Decimal128ByteArray) {
837875
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
838876
// 123456
@@ -3044,18 +3082,19 @@ TEST(ArrowReadWrite, NestedRequiredField) {
30443082
/*row_group_size=*/8);
30453083
}
30463084

3047-
TEST(ArrowReadWrite, Decimal256) {
3048-
using ::arrow::Decimal256;
3085+
TEST(ArrowReadWrite, Decimal) {
30493086
using ::arrow::field;
30503087

3051-
auto type = ::arrow::decimal256(8, 4);
3052-
30533088
const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
30543089
"-9999.9999", "9999.9999"])";
3055-
auto array = ::arrow::ArrayFromJSON(type, json);
3056-
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3057-
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
3058-
CheckSimpleRoundtrip(table, 2, props_store_schema);
3090+
3091+
for (auto type : {::arrow::decimal32(8, 4), ::arrow::decimal64(8, 4),
3092+
::arrow::decimal128(8, 4), ::arrow::decimal256(8, 4)}) {
3093+
auto array = ::arrow::ArrayFromJSON(type, json);
3094+
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3095+
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
3096+
CheckSimpleRoundtrip(table, 2, props_store_schema);
3097+
}
30593098
}
30603099

30613100
TEST(ArrowReadWrite, DecimalStats) {
@@ -5468,6 +5507,64 @@ TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn)
54685507
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
54695508
}
54705509

5510+
template <typename TestType>
5511+
class TestIntegerAnnotateSmallestDecimalTypeParquetIO
5512+
: public TestIntegerAnnotateDecimalTypeParquetIO<TestType> {
5513+
public:
5514+
void ReadAndCheckSingleDecimalColumnFile(const Array& values) {
5515+
ArrowReaderProperties properties = default_arrow_reader_properties();
5516+
properties.set_smallest_decimal_enabled(true);
5517+
5518+
std::shared_ptr<Array> out;
5519+
std::unique_ptr<FileReader> reader;
5520+
this->ReaderFromSink(&reader, properties);
5521+
this->ReadSingleColumnFile(std::move(reader), &out);
5522+
5523+
if (values.type()->id() == out->type()->id()) {
5524+
AssertArraysEqual(values, *out);
5525+
} else {
5526+
auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(values.type());
5527+
5528+
ASSERT_OK_AND_ASSIGN(
5529+
const auto expected_values,
5530+
::arrow::compute::Cast(values, ::arrow::decimal256(decimal_type->precision(),
5531+
decimal_type->scale())));
5532+
ASSERT_OK_AND_ASSIGN(
5533+
const auto out_values,
5534+
::arrow::compute::Cast(*out, ::arrow::decimal256(decimal_type->precision(),
5535+
decimal_type->scale())));
5536+
5537+
ASSERT_EQ(expected_values->length(), out_values->length());
5538+
ASSERT_EQ(expected_values->null_count(), out_values->null_count());
5539+
ASSERT_TRUE(expected_values->Equals(*out_values));
5540+
}
5541+
}
5542+
};
5543+
5544+
using SmallestDecimalTestTypes = ::testing::Types<
5545+
Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<9>,
5546+
Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<9>,
5547+
Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<9>,
5548+
Decimal256WithPrecisionAndScale<18>>;
5549+
5550+
TYPED_TEST_SUITE(TestIntegerAnnotateSmallestDecimalTypeParquetIO,
5551+
SmallestDecimalTestTypes);
5552+
5553+
TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO,
5554+
SingleNonNullableDecimalColumn) {
5555+
std::shared_ptr<Array> values;
5556+
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
5557+
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
5558+
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
5559+
}
5560+
5561+
TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, SingleNullableDecimalColumn) {
5562+
std::shared_ptr<Array> values;
5563+
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values));
5564+
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
5565+
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
5566+
}
5567+
54715568
template <typename TestType>
54725569
class TestBufferedParquetIO : public TestParquetIO<TestType> {
54735570
public:

cpp/src/parquet/arrow/reader_internal.cc

Lines changed: 67 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ using arrow::Decimal128Type;
6969
using arrow::Decimal256;
7070
using arrow::Decimal256Array;
7171
using arrow::Decimal256Type;
72+
using arrow::Decimal32;
73+
using arrow::Decimal32Array;
74+
using arrow::Decimal32Type;
75+
using arrow::Decimal64;
76+
using arrow::Decimal64Array;
77+
using arrow::Decimal64Type;
7278
using arrow::Field;
7379
using arrow::Int32Array;
7480
using arrow::ListArray;
@@ -153,7 +159,8 @@ static Status FromInt32Statistics(const Int32Statistics& statistics,
153159
const LogicalType& logical_type,
154160
std::shared_ptr<::arrow::Scalar>* min,
155161
std::shared_ptr<::arrow::Scalar>* max) {
156-
ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type));
162+
ARROW_ASSIGN_OR_RAISE(auto type,
163+
FromInt32(logical_type, default_arrow_reader_properties()));
157164

158165
switch (logical_type.type()) {
159166
case LogicalType::Type::INT:
@@ -175,7 +182,8 @@ static Status FromInt64Statistics(const Int64Statistics& statistics,
175182
const LogicalType& logical_type,
176183
std::shared_ptr<::arrow::Scalar>* min,
177184
std::shared_ptr<::arrow::Scalar>* max) {
178-
ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type));
185+
ARROW_ASSIGN_OR_RAISE(auto type,
186+
FromInt64(logical_type, default_arrow_reader_properties()));
179187

180188
switch (logical_type.type()) {
181189
case LogicalType::Type::INT:
@@ -600,17 +608,10 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
600608
return ::arrow::Status::OK();
601609
}
602610

603-
template <typename DecimalArrayType>
604-
struct DecimalTypeTrait;
605-
606-
template <>
607-
struct DecimalTypeTrait<::arrow::Decimal128Array> {
608-
using value = ::arrow::Decimal128;
609-
};
610-
611-
template <>
612-
struct DecimalTypeTrait<::arrow::Decimal256Array> {
613-
using value = ::arrow::Decimal256;
611+
template <typename DecimalArrayType,
612+
typename = ::arrow::enable_if_decimal<typename DecimalArrayType::TypeClass>>
613+
struct DecimalTypeTrait {
614+
using value = typename ::arrow::TypeTraits<typename DecimalArrayType::TypeClass>::CType;
614615
};
615616

616617
template <typename DecimalArrayType, typename ParquetType>
@@ -725,16 +726,17 @@ struct DecimalConverter<DecimalArrayType, ByteArrayType> {
725726
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
726727
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
727728
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
728-
template <
729-
typename DecimalArrayType, typename ParquetIntegerType,
730-
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
731-
std::is_same<ParquetIntegerType, Int64Type>::value>>
729+
template <typename DecimalArrayType, typename ParquetIntegerType,
730+
typename = ::arrow::enable_if_t<::arrow::internal::IsOneOf<
731+
ParquetIntegerType, Int32Type, Int64Type>::value>>
732732
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
733733
const std::shared_ptr<Field>& field, Datum* out) {
734-
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
735-
// specifically distinguish between decimal byte widths.
736-
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 ||
737-
field->type()->id() == ::arrow::Type::DECIMAL256);
734+
using ArrayTypeClass = typename DecimalArrayType::TypeClass;
735+
using DecimalValue = typename DecimalTypeTrait<DecimalArrayType>::value;
736+
737+
// Decimal32, Decimal64, Decimal128 and Decimal256 are only Arrow constructs. Parquet
738+
// does not specifically distinguish between decimal byte widths.
739+
DCHECK(field->type()->id() == ArrayTypeClass::type_id);
738740

739741
const int64_t length = reader->values_written();
740742

@@ -745,25 +747,17 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
745747

746748
const auto values = reinterpret_cast<const ElementType*>(reader->values());
747749

748-
const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*field->type());
750+
const auto& decimal_type = checked_cast<const ArrayTypeClass&>(*field->type());
749751
const int64_t type_length = decimal_type.byte_width();
750752

751753
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
752754
uint8_t* out_ptr = data->mutable_data();
753755

754-
using ::arrow::bit_util::FromLittleEndian;
755-
756756
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
757757
// sign/zero extend int32_t values, otherwise a no-op
758758
const auto value = static_cast<int64_t>(values[i]);
759-
760-
if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
761-
::arrow::Decimal128 decimal(value);
762-
decimal.ToBytes(out_ptr);
763-
} else {
764-
::arrow::Decimal256 decimal(value);
765-
decimal.ToBytes(out_ptr);
766-
}
759+
DecimalValue decimal(value);
760+
decimal.ToBytes(out_ptr);
767761
}
768762

769763
if (reader->nullable_values() && field->nullable()) {
@@ -802,6 +796,33 @@ Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
802796
return Status::OK();
803797
}
804798

799+
template <typename DecimalArrayType, typename... Args>
800+
Status TransferDecimalTo(Type::type physical_type, Args&&... args) {
801+
switch (physical_type) {
802+
case ::parquet::Type::INT32: {
803+
auto fn = DecimalIntegerTransfer<DecimalArrayType, Int32Type>;
804+
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
805+
} break;
806+
case ::parquet::Type::INT64: {
807+
auto fn = DecimalIntegerTransfer<DecimalArrayType, Int64Type>;
808+
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
809+
} break;
810+
case ::parquet::Type::BYTE_ARRAY: {
811+
auto fn = TransferDecimal<DecimalArrayType, ByteArrayType>;
812+
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
813+
} break;
814+
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
815+
auto fn = TransferDecimal<DecimalArrayType, FLBAType>;
816+
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
817+
} break;
818+
default:
819+
return Status::Invalid(
820+
"Physical type for decimal must be int32, int64, byte array, or fixed length "
821+
"binary");
822+
}
823+
return Status::OK();
824+
}
825+
805826
Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool,
806827
const std::shared_ptr<Field>& field, Datum* out) {
807828
static const auto binary_type = ::arrow::fixed_size_binary(2);
@@ -902,55 +923,22 @@ Status TransferColumnData(RecordReader* reader,
902923
}
903924
RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result));
904925
} break;
926+
case ::arrow::Type::DECIMAL32: {
927+
RETURN_NOT_OK(TransferDecimalTo<Decimal32Array>(descr->physical_type(), reader,
928+
pool, value_field, &result));
929+
} break;
930+
case ::arrow::Type::DECIMAL64: {
931+
RETURN_NOT_OK(TransferDecimalTo<Decimal64Array>(descr->physical_type(), reader,
932+
pool, value_field, &result));
933+
} break;
905934
case ::arrow::Type::DECIMAL128: {
906-
switch (descr->physical_type()) {
907-
case ::parquet::Type::INT32: {
908-
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
909-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
910-
} break;
911-
case ::parquet::Type::INT64: {
912-
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
913-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
914-
} break;
915-
case ::parquet::Type::BYTE_ARRAY: {
916-
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
917-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
918-
} break;
919-
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
920-
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
921-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
922-
} break;
923-
default:
924-
return Status::Invalid(
925-
"Physical type for decimal128 must be int32, int64, byte array, or fixed "
926-
"length binary");
927-
}
935+
RETURN_NOT_OK(TransferDecimalTo<Decimal128Array>(descr->physical_type(), reader,
936+
pool, value_field, &result));
937+
} break;
938+
case ::arrow::Type::DECIMAL256: {
939+
RETURN_NOT_OK(TransferDecimalTo<Decimal256Array>(descr->physical_type(), reader,
940+
pool, value_field, &result));
928941
} break;
929-
case ::arrow::Type::DECIMAL256:
930-
switch (descr->physical_type()) {
931-
case ::parquet::Type::INT32: {
932-
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
933-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
934-
} break;
935-
case ::parquet::Type::INT64: {
936-
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
937-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
938-
} break;
939-
case ::parquet::Type::BYTE_ARRAY: {
940-
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
941-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
942-
} break;
943-
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
944-
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
945-
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
946-
} break;
947-
default:
948-
return Status::Invalid(
949-
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
950-
"length binary");
951-
}
952-
break;
953-
954942
case ::arrow::Type::TIMESTAMP: {
955943
const ::arrow::TimestampType& timestamp_type =
956944
checked_cast<::arrow::TimestampType&>(*value_field->type());

0 commit comments

Comments
 (0)