Skip to content

Commit 41aec32

Browse files
GH-48202: [C++][Parquet] Fix encoder & decoder logic to enable Parquet DB support on s390x
1 parent d16ba00 commit 41aec32

File tree

2 files changed

+150
-16
lines changed

2 files changed

+150
-16
lines changed

cpp/src/parquet/decoder.cc

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "arrow/util/bitmap_ops.h"
4242
#include "arrow/util/byte_stream_split_internal.h"
4343
#include "arrow/util/checked_cast.h"
44+
#include "arrow/util/endian.h"
4445
#include "arrow/util/int_util_overflow.h"
4546
#include "arrow/util/logging_internal.h"
4647
#include "arrow/util/rle_encoding_internal.h"
@@ -406,9 +407,20 @@ int PlainDecoder<DType>::DecodeArrow(
406407
VisitBitRuns(valid_bits, valid_bits_offset, num_values,
407408
[&](int64_t position, int64_t run_length, bool is_valid) {
408409
if (is_valid) {
410+
#if ARROW_LITTLE_ENDIAN
409411
RETURN_NOT_OK(builder->AppendValues(
410412
reinterpret_cast<const value_type*>(data), run_length));
411413
data += run_length * sizeof(value_type);
414+
#else
415+
// On big-endian systems, we need to byte-swap each value
416+
// since Parquet data is stored in little-endian format
417+
for (int64_t i = 0; i < run_length; ++i) {
418+
value_type value = ::arrow::bit_util::FromLittleEndian(
419+
SafeLoadAs<value_type>(data));
420+
RETURN_NOT_OK(builder->Append(value));
421+
data += sizeof(value_type);
422+
}
423+
#endif
412424
} else {
413425
RETURN_NOT_OK(builder->AppendNulls(run_length));
414426
}
@@ -458,7 +470,24 @@ inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
458470
}
459471
// If bytes_to_decode == 0, data could be null
460472
if (bytes_to_decode > 0) {
473+
#if ARROW_LITTLE_ENDIAN
461474
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
475+
#else
476+
// On big-endian systems, we need to byte-swap each value
477+
// since Parquet data is stored in little-endian format.
478+
// Only apply to integer and floating-point types that have FromLittleEndian support.
479+
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
480+
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
481+
std::is_same_v<T, float> || std::is_same_v<T, double>) {
482+
for (int i = 0; i < num_values; ++i) {
483+
out[i] = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<T>(data));
484+
data += sizeof(T);
485+
}
486+
} else {
487+
// For other types (bool, Int96, etc.), just do memcpy
488+
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
489+
}
490+
#endif
462491
}
463492
return static_cast<int>(bytes_to_decode);
464493
}
@@ -471,7 +500,7 @@ static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
471500
if (ARROW_PREDICT_FALSE(data_size < 4)) {
472501
ParquetException::EofException();
473502
}
474-
const int32_t len = SafeLoadAs<int32_t>(data);
503+
const int32_t len = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data));
475504
if (len < 0) {
476505
throw ParquetException("Invalid BYTE_ARRAY value");
477506
}
@@ -772,7 +801,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
772801
return Status::Invalid(
773802
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
774803
}
775-
auto value_len = SafeLoadAs<int32_t>(data_);
804+
auto value_len =
805+
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
776806
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) {
777807
return Status::Invalid(
778808
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
@@ -817,7 +847,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
817847
return Status::Invalid(
818848
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
819849
}
820-
auto value_len = SafeLoadAs<int32_t>(data_);
850+
auto value_len =
851+
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
821852
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) {
822853
return Status::Invalid(
823854
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
@@ -1616,9 +1647,17 @@ class DeltaBitPackDecoder : public TypedDecoderImpl<DType> {
16161647
for (int j = 0; j < values_decode; ++j) {
16171648
// Addition between min_delta, packed int and last_value should be treated as
16181649
// unsigned addition. Overflow is as expected.
1650+
#if ARROW_LITTLE_ENDIAN
16191651
buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
16201652
static_cast<UT>(last_value_);
16211653
last_value_ = buffer[i + j];
1654+
#else
1655+
UT temp = static_cast<UT>(min_delta_) +
1656+
static_cast<UT>(static_cast<uint64_t>(buffer[i + j])) +
1657+
static_cast<UT>(last_value_);
1658+
buffer[i + j] = static_cast<T>(temp);
1659+
last_value_ = static_cast<T>(temp);
1660+
#endif
16221661
}
16231662
values_remaining_current_mini_block_ -= values_decode;
16241663
i += values_decode;
@@ -2306,6 +2345,17 @@ class ByteStreamSplitDecoder<FLBAType> : public ByteStreamSplitDecoderBase<FLBAT
23062345
const int num_decoded = this->DecodeRaw(decode_out, max_values);
23072346
DCHECK_EQ(num_decoded, max_values);
23082347

2348+
#if !ARROW_LITTLE_ENDIAN
2349+
// On big-endian, ByteStreamSplitDecode (DoMergeStreams) reverses stream positions
2350+
// to produce numeric values in native byte order. For FLBA (opaque byte arrays),
2351+
// we need to undo this reversal to preserve the original byte sequence.
2352+
const int type_length = this->type_length_;
2353+
for (int i = 0; i < num_decoded; ++i) {
2354+
uint8_t* value_ptr = decode_out + static_cast<int64_t>(type_length) * i;
2355+
std::reverse(value_ptr, value_ptr + type_length);
2356+
}
2357+
#endif
2358+
23092359
for (int i = 0; i < num_decoded; ++i) {
23102360
buffer[i] =
23112361
FixedLenByteArray(decode_out + static_cast<int64_t>(this->type_length_) * i);

cpp/src/parquet/encoder.cc

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
162162

163163
void UnsafePutByteArray(const void* data, uint32_t length) {
164164
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
165-
sink_.UnsafeAppend(&length, sizeof(uint32_t));
165+
uint32_t length_le = ::arrow::bit_util::ToLittleEndian(length);
166+
sink_.UnsafeAppend(&length_le, sizeof(uint32_t));
166167
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
167168
unencoded_byte_array_data_bytes_ += length;
168169
}
@@ -201,7 +202,37 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
201202
template <typename DType>
202203
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
203204
if (num_values > 0) {
205+
#if ARROW_LITTLE_ENDIAN
204206
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
207+
#else
208+
// On big-endian systems, except for bool type, we need to byte-swap each value,
209+
// since Parquet data must be stored in little-endian format.
210+
if constexpr (std::is_arithmetic_v<T> && !(std::is_same_v<T, bool>)) {
211+
constexpr int kSmallBufferSize = 128;
212+
T* temp_data = nullptr;
213+
std::array<T, kSmallBufferSize> small_buffer;
214+
std::unique_ptr<::arrow::Buffer> heap_buffer;
215+
216+
// Use stack memory for smaller buffer sizes
217+
if (num_values <= kSmallBufferSize) {
218+
temp_data = small_buffer.data();
219+
} else {
220+
// Use heap memory for larger sizes
221+
PARQUET_ASSIGN_OR_THROW(
222+
heap_buffer,
223+
::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
224+
temp_data = reinterpret_cast<T*>(heap_buffer->mutable_data());
225+
}
226+
227+
for (int i = 0; i < num_values; ++i) {
228+
temp_data[i] = ::arrow::bit_util::ToLittleEndian(buffer[i]);
229+
}
230+
PARQUET_THROW_NOT_OK(sink_.Append(temp_data, num_values * sizeof(T)));
231+
} else {
232+
// For other types (Int96, etc.), just do memcpy
233+
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
234+
}
235+
#endif
205236
}
206237
}
207238

@@ -224,18 +255,38 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
224255
constexpr auto value_size = sizeof(value_type);
225256
auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
226257

227-
if (values.null_count() == 0) {
228-
// no nulls, just dump the data
229-
PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
230-
} else {
231-
PARQUET_THROW_NOT_OK(
232-
sink->Reserve((values.length() - values.null_count()) * value_size));
258+
const int64_t len = values.length();
259+
const int64_t nulls = values.null_count();
260+
const int64_t valid_count = len - nulls;
233261

234-
for (int64_t i = 0; i < values.length(); i++) {
235-
if (values.IsValid(i)) {
236-
sink->UnsafeAppend(&raw_values[i], value_size);
237-
}
262+
#if ARROW_LITTLE_ENDIAN
263+
// Fast path: no nulls → bulk append
264+
if (nulls == 0) {
265+
PARQUET_THROW_NOT_OK(sink->Append(raw_values, len * value_size));
266+
return;
267+
}
268+
#endif
269+
270+
// Reserve only once
271+
PARQUET_THROW_NOT_OK(sink->Reserve(valid_count * value_size));
272+
273+
// Fallback path: need to check nulls OR endian conversion
274+
for (int64_t i = 0; i < len; ++i) {
275+
if (!values.IsValid(i)) continue;
276+
277+
#if ARROW_LITTLE_ENDIAN
278+
// Little-endian, nulls exist → per-element append
279+
sink->UnsafeAppend(&raw_values[i], value_size);
280+
#else
281+
// Big-endian logic
282+
if constexpr (std::is_arithmetic_v<value_type> &&
283+
!(std::is_same_v<value_type, bool>)) {
284+
auto le_value = ::arrow::bit_util::ToLittleEndian(raw_values[i]);
285+
sink->UnsafeAppend(&le_value, value_size);
286+
} else {
287+
sink->UnsafeAppend(&raw_values[i], value_size);
238288
}
289+
#endif
239290
}
240291
}
241292

@@ -649,17 +700,36 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
649700

650701
template <typename DType>
651702
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) const {
652-
// For primitive types, only a memcpy
703+
// For primitive types, copy values with endianness conversion
653704
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
705+
#if ARROW_LITTLE_ENDIAN
654706
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
707+
#else
708+
// On big-endian systems, we need to byte-swap each value
709+
// since Parquet data must be stored in little-endian format.
710+
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
711+
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
712+
std::is_same_v<T, float> || std::is_same_v<T, double>) {
713+
std::vector<T> temp(memo_table_.size());
714+
memo_table_.CopyValues(0 /* start_pos */, temp.data());
715+
T* out = reinterpret_cast<T*>(buffer);
716+
for (size_t i = 0; i < temp.size(); ++i) {
717+
out[i] = ::arrow::bit_util::ToLittleEndian(temp[i]);
718+
}
719+
} else {
720+
// For other types (Int96, etc.), just do memcpy
721+
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
722+
}
723+
#endif
655724
}
656725

657726
// ByteArray and FLBA already have the dictionary encoded in their data heaps
658727
template <>
659728
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) const {
660729
memo_table_.VisitValues(0, [&buffer](::std::string_view v) {
661730
uint32_t len = static_cast<uint32_t>(v.length());
662-
memcpy(buffer, &len, sizeof(len));
731+
uint32_t len_le = ::arrow::bit_util::ToLittleEndian(len);
732+
memcpy(buffer, &len_le, sizeof(len_le));
663733
buffer += sizeof(len);
664734
memcpy(buffer, v.data(), len);
665735
buffer += len;
@@ -924,6 +994,8 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {
924994

925995
void Put(const T* buffer, int num_values) override {
926996
if (num_values > 0) {
997+
// ByteStreamSplitEncode (DoSplitStreams) handles endianness correctly,
998+
// so we can directly append the native byte representation
927999
PARQUET_THROW_NOT_OK(
9281000
this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
9291001
num_values * static_cast<int64_t>(sizeof(T))));
@@ -964,10 +1036,22 @@ class ByteStreamSplitEncoder<FLBAType> : public ByteStreamSplitEncoderBase<FLBAT
9641036
if (byte_width_ > 0) {
9651037
const int64_t total_bytes = static_cast<int64_t>(num_values) * byte_width_;
9661038
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
1039+
#if !ARROW_LITTLE_ENDIAN
1040+
// On big-endian, reverse bytes before encoding to compensate for
1041+
// DoSplitStreams reversal, ensuring FLBA bytes are preserved as-is
1042+
std::vector<uint8_t> temp_buffer(byte_width_);
1043+
#endif
9671044
for (int i = 0; i < num_values; ++i) {
9681045
// Write the result to the output stream
9691046
DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
1047+
#if !ARROW_LITTLE_ENDIAN
1048+
// Reverse bytes before appending
1049+
std::reverse_copy(buffer[i].ptr, buffer[i].ptr + byte_width_,
1050+
temp_buffer.begin());
1051+
sink_.UnsafeAppend(temp_buffer.data(), byte_width_);
1052+
#else
9701053
sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
1054+
#endif
9711055
}
9721056
}
9731057
this->num_values_in_buffer_ += num_values;

0 commit comments

Comments
 (0)