Skip to content

Commit 65f7787

Browse files
authored
GH-47955: [C++][Parquet] Support reading INT-encoded Decimal stats as Arrow scalar (#48001)
### Rationale for this change The `StatisticsAsScalars` function, which allows converting Parquet statistics (min/max values) for a given logical type into Arrow scalars, did not support DECIMAL columns with physical type INT32 or INT64. ### Are these changes tested? Yes, by expanded unit test. ### Are there any user-facing changes? No, just a bug fix. * GitHub Issue: #47955 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 055c2f4 commit 65f7787

File tree

2 files changed

+105
-71
lines changed

2 files changed

+105
-71
lines changed

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -400,16 +400,23 @@ using ParquetDataType = PhysicalType<test_traits<T>::parquet_enum>;
400400
template <typename T>
401401
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
402402

403+
Result<std::shared_ptr<Buffer>> WriteTableToBuffer(
404+
const std::shared_ptr<Table>& table, int64_t row_group_size,
405+
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
406+
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
407+
default_arrow_writer_properties()) {
408+
auto sink = CreateOutputStream();
409+
ARROW_RETURN_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), sink,
410+
row_group_size, properties, arrow_properties));
411+
return sink->Finish();
412+
}
413+
403414
void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
404415
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
405416
std::shared_ptr<Buffer>* out) {
406-
auto sink = CreateOutputStream();
407-
408417
auto write_props = WriterProperties::Builder().write_batch_size(100)->build();
409-
410-
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
411-
row_group_size, write_props, arrow_properties));
412-
ASSERT_OK_AND_ASSIGN(*out, sink->Finish());
418+
ASSERT_OK_AND_ASSIGN(
419+
*out, WriteTableToBuffer(table, row_group_size, write_props, arrow_properties));
413420
}
414421

415422
void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
@@ -3101,27 +3108,33 @@ TEST(ArrowReadWrite, DecimalStats) {
31013108
using ::arrow::Decimal128;
31023109
using ::arrow::field;
31033110

3104-
auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0);
3105-
3106-
const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
3107-
auto array = ::arrow::ArrayFromJSON(type, json);
3108-
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
3111+
// Try various precisions to trigger encoding as different physical types:
3112+
// - precision 8 should use INT32
3113+
// - precision 18 should use INT64
3114+
// - precision 35 should use FIXED_LEN_BYTE_ARRAY
3115+
for (const int precision : {8, 18, 35}) {
3116+
auto type = ::arrow::decimal128(precision, /*scale=*/0);
31093117

3110-
std::shared_ptr<Buffer> buffer;
3111-
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
3112-
default_arrow_writer_properties(), &buffer));
3118+
const char* json =
3119+
R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
3120+
auto array = ::arrow::ArrayFromJSON(type, json);
3121+
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
31133122

3114-
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
3115-
::arrow::default_memory_pool()));
3123+
auto props = WriterProperties::Builder().enable_store_decimal_as_integer()->build();
3124+
ASSERT_OK_AND_ASSIGN(auto buffer,
3125+
WriteTableToBuffer(table, /*row_group_size=*/100, props));
3126+
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
3127+
::arrow::default_memory_pool()));
31163128

3117-
std::shared_ptr<Scalar> min, max;
3118-
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
3129+
std::shared_ptr<Scalar> min, max;
3130+
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
31193131

3120-
std::shared_ptr<Scalar> expected_min, expected_max;
3121-
ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
3122-
ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
3123-
::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
3124-
::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
3132+
std::shared_ptr<Scalar> expected_min, expected_max;
3133+
ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
3134+
ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
3135+
::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
3136+
::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
3137+
}
31253138
}
31263139

31273140
TEST(ArrowReadWrite, NestedNullableField) {

cpp/src/parquet/arrow/reader_internal.cc

Lines changed: 69 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ using arrow::Table;
8686
using arrow::TimestampArray;
8787

8888
using ::arrow::bit_util::FromBigEndian;
89+
using ::arrow::bit_util::ToBigEndian;
8990
using ::arrow::internal::checked_cast;
9091
using ::arrow::internal::checked_pointer_cast;
9192
using ::arrow::internal::SafeLeftShift;
@@ -108,6 +109,62 @@ namespace {
108109
template <typename ArrowType>
109110
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
110111

112+
template <typename DecimalType>
113+
Result<std::shared_ptr<::arrow::Scalar>> DecimalScalarFromBigEndianBytes(
114+
std::string_view data, std::shared_ptr<DataType> arrow_type) {
115+
ARROW_ASSIGN_OR_RAISE(
116+
DecimalType decimal,
117+
DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
118+
static_cast<int32_t>(data.size())));
119+
return ::arrow::MakeScalar(std::move(arrow_type), decimal);
120+
}
121+
122+
// Extract Min and Max scalars from big-endian representation of Decimals.
123+
Status ExtractDecimalMinMaxFromBytes(std::string_view min_bytes,
124+
std::string_view max_bytes,
125+
const LogicalType& logical_type,
126+
std::shared_ptr<::arrow::Scalar>* min,
127+
std::shared_ptr<::arrow::Scalar>* max) {
128+
const DecimalLogicalType& decimal_type =
129+
checked_cast<const DecimalLogicalType&>(logical_type);
130+
131+
Result<std::shared_ptr<DataType>> maybe_type =
132+
Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
133+
std::shared_ptr<DataType> arrow_type;
134+
if (maybe_type.ok()) {
135+
arrow_type = maybe_type.ValueOrDie();
136+
ARROW_ASSIGN_OR_RAISE(
137+
*min, DecimalScalarFromBigEndianBytes<Decimal128>(min_bytes, arrow_type));
138+
ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal128>(
139+
max_bytes, std::move(arrow_type)));
140+
return Status::OK();
141+
}
142+
// Fallback to see if Decimal256 can represent the type.
143+
ARROW_ASSIGN_OR_RAISE(
144+
arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
145+
ARROW_ASSIGN_OR_RAISE(
146+
*min, DecimalScalarFromBigEndianBytes<Decimal256>(min_bytes, arrow_type));
147+
ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal256>(
148+
max_bytes, std::move(arrow_type)));
149+
150+
return Status::OK();
151+
}
152+
153+
template <typename Int>
154+
Status ExtractDecimalMinMaxFromInteger(Int min_value, Int max_value,
155+
const LogicalType& logical_type,
156+
std::shared_ptr<::arrow::Scalar>* min,
157+
std::shared_ptr<::arrow::Scalar>* max) {
158+
static_assert(std::is_integral_v<Int>);
159+
const Int min_be = ToBigEndian(min_value);
160+
const Int max_be = ToBigEndian(max_value);
161+
const auto min_bytes =
162+
std::string_view(reinterpret_cast<const char*>(&min_be), sizeof(min_be));
163+
const auto max_bytes =
164+
std::string_view(reinterpret_cast<const char*>(&max_be), sizeof(max_be));
165+
return ExtractDecimalMinMaxFromBytes(min_bytes, max_bytes, logical_type, min, max);
166+
}
167+
111168
template <typename CType, typename StatisticsType>
112169
Status MakeMinMaxScalar(const StatisticsType& statistics,
113170
std::shared_ptr<::arrow::Scalar>* min,
@@ -165,17 +222,19 @@ static Status FromInt32Statistics(const Int32Statistics& statistics,
165222
switch (logical_type.type()) {
166223
case LogicalType::Type::INT:
167224
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
168-
break;
169225
case LogicalType::Type::DATE:
170226
case LogicalType::Type::TIME:
171227
case LogicalType::Type::NONE:
172228
return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
173-
break;
229+
case LogicalType::Type::DECIMAL:
230+
return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(),
231+
logical_type, min, max);
174232
default:
175233
break;
176234
}
177235

178-
return Status::NotImplemented("Cannot extract statistics for type ");
236+
return Status::NotImplemented("Cannot extract statistics for INT32 with logical type ",
237+
logical_type.ToString());
179238
}
180239

181240
static Status FromInt64Statistics(const Int64Statistics& statistics,
@@ -188,66 +247,28 @@ static Status FromInt64Statistics(const Int64Statistics& statistics,
188247
switch (logical_type.type()) {
189248
case LogicalType::Type::INT:
190249
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
191-
break;
192250
case LogicalType::Type::TIME:
193251
case LogicalType::Type::TIMESTAMP:
194252
case LogicalType::Type::NONE:
195253
return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
196-
break;
254+
case LogicalType::Type::DECIMAL:
255+
return ExtractDecimalMinMaxFromInteger(statistics.min(), statistics.max(),
256+
logical_type, min, max);
197257
default:
198258
break;
199259
}
200260

201-
return Status::NotImplemented("Cannot extract statistics for type ");
202-
}
203-
204-
template <typename DecimalType>
205-
Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
206-
const std::string& data, std::shared_ptr<DataType> arrow_type) {
207-
ARROW_ASSIGN_OR_RAISE(
208-
DecimalType decimal,
209-
DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
210-
static_cast<int32_t>(data.size())));
211-
return ::arrow::MakeScalar(std::move(arrow_type), decimal);
212-
}
213-
214-
// Extracts Min and Max scalar from bytes like types (i.e. types where
215-
// decimal is encoded as little endian.
216-
Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
217-
const LogicalType& logical_type,
218-
std::shared_ptr<::arrow::Scalar>* min,
219-
std::shared_ptr<::arrow::Scalar>* max) {
220-
const DecimalLogicalType& decimal_type =
221-
checked_cast<const DecimalLogicalType&>(logical_type);
222-
223-
Result<std::shared_ptr<DataType>> maybe_type =
224-
Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
225-
std::shared_ptr<DataType> arrow_type;
226-
if (maybe_type.ok()) {
227-
arrow_type = maybe_type.ValueOrDie();
228-
ARROW_ASSIGN_OR_RAISE(
229-
*min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type));
230-
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(),
231-
std::move(arrow_type)));
232-
return Status::OK();
233-
}
234-
// Fallback to see if Decimal256 can represent the type.
235-
ARROW_ASSIGN_OR_RAISE(
236-
arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
237-
ARROW_ASSIGN_OR_RAISE(
238-
*min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type));
239-
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(),
240-
std::move(arrow_type)));
241-
242-
return Status::OK();
261+
return Status::NotImplemented("Cannot extract statistics for INT64 with logical type ",
262+
logical_type.ToString());
243263
}
244264

245265
Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
246266
std::shared_ptr<::arrow::Scalar>* min,
247267
std::shared_ptr<::arrow::Scalar>* max) {
248268
auto logical_type = statistics.descr()->logical_type();
249269
if (logical_type->type() == LogicalType::Type::DECIMAL) {
250-
return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max);
270+
return ExtractDecimalMinMaxFromBytes(statistics.EncodeMin(), statistics.EncodeMax(),
271+
*logical_type, min, max);
251272
}
252273
std::shared_ptr<::arrow::DataType> type;
253274
if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {

0 commit comments

Comments
 (0)