Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/byte_stream_split_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/rle_encoding_internal.h"
Expand Down Expand Up @@ -408,9 +409,20 @@ int PlainDecoder<DType>::DecodeArrow(
VisitBitRuns(valid_bits, valid_bits_offset, num_values,
[&](int64_t position, int64_t run_length, bool is_valid) {
if (is_valid) {
#if ARROW_LITTLE_ENDIAN
RETURN_NOT_OK(builder->AppendValues(
reinterpret_cast<const value_type*>(data), run_length));
data += run_length * sizeof(value_type);
#else
// On big-endian systems, we need to byte-swap each value
// since Parquet data is stored in little-endian format
for (int64_t i = 0; i < run_length; ++i) {
value_type value = ::arrow::bit_util::FromLittleEndian(
SafeLoadAs<value_type>(data));
RETURN_NOT_OK(builder->Append(value));
data += sizeof(value_type);
}
#endif
} else {
RETURN_NOT_OK(builder->AppendNulls(run_length));
}
Expand Down Expand Up @@ -460,7 +472,24 @@ inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
}
// If bytes_to_decode == 0, data could be null
if (bytes_to_decode > 0) {
#if ARROW_LITTLE_ENDIAN
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
#else
// On big-endian systems, we need to byte-swap each value
// since Parquet data is stored in little-endian format.
// Only apply to integer and floating-point types that have FromLittleEndian support.
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
std::is_same_v<T, float> || std::is_same_v<T, double>) {
for (int i = 0; i < num_values; ++i) {
out[i] = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<T>(data));
data += sizeof(T);
}
} else {
// For other types (bool, Int96, etc.), just do memcpy
memcpy(out, data, static_cast<size_t>(bytes_to_decode));
}
#endif
}
return static_cast<int>(bytes_to_decode);
}
Expand All @@ -473,7 +502,7 @@ static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
if (ARROW_PREDICT_FALSE(data_size < 4)) {
ParquetException::EofException();
}
const int32_t len = SafeLoadAs<int32_t>(data);
const int32_t len = ::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data));
if (len < 0) {
throw ParquetException("Invalid BYTE_ARRAY value");
}
Expand Down Expand Up @@ -775,7 +804,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
// 2. the running `value_len > estimated_data_length` check below.
// This precondition follows from those two checks.
DCHECK_GE(len_, 4);
auto value_len = SafeLoadAs<int32_t>(data_);
auto value_len =
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
// This check also ensures that `value_len <= len_ - 4` due to the way
// `estimated_data_length` is computed.
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > estimated_data_length)) {
Expand Down Expand Up @@ -826,7 +856,8 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType> {
return Status::Invalid(
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
}
auto value_len = SafeLoadAs<int32_t>(data_);
auto value_len =
::arrow::bit_util::FromLittleEndian(SafeLoadAs<int32_t>(data_));
if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) {
return Status::Invalid(
"Invalid or truncated PLAIN-encoded BYTE_ARRAY data");
Expand Down Expand Up @@ -1625,9 +1656,17 @@ class DeltaBitPackDecoder : public TypedDecoderImpl<DType> {
for (int j = 0; j < values_decode; ++j) {
// Addition between min_delta, packed int and last_value should be treated as
// unsigned addition. Overflow is as expected.
#if ARROW_LITTLE_ENDIAN
buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
static_cast<UT>(last_value_);
last_value_ = buffer[i + j];
#else
UT temp = static_cast<UT>(min_delta_) +
static_cast<UT>(static_cast<uint64_t>(buffer[i + j])) +
static_cast<UT>(last_value_);
buffer[i + j] = static_cast<T>(temp);
last_value_ = static_cast<T>(temp);
#endif
}
values_remaining_current_mini_block_ -= values_decode;
i += values_decode;
Expand Down Expand Up @@ -2315,6 +2354,17 @@ class ByteStreamSplitDecoder<FLBAType> : public ByteStreamSplitDecoderBase<FLBAT
const int num_decoded = this->DecodeRaw(decode_out, max_values);
DCHECK_EQ(num_decoded, max_values);

#if !ARROW_LITTLE_ENDIAN
// On big-endian, ByteStreamSplitDecode (DoMergeStreams) reverses stream positions
// to produce numeric values in native byte order. For FLBA (opaque byte arrays),
// we need to undo this reversal to preserve the original byte sequence.
const int type_length = this->type_length_;
for (int i = 0; i < num_decoded; ++i) {
uint8_t* value_ptr = decode_out + static_cast<int64_t>(type_length) * i;
std::reverse(value_ptr, value_ptr + type_length);
}
#endif

for (int i = 0; i < num_decoded; ++i) {
buffer[i] =
FixedLenByteArray(decode_out + static_cast<int64_t>(this->type_length_) * i);
Expand Down
110 changes: 97 additions & 13 deletions cpp/src/parquet/encoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {

void UnsafePutByteArray(const void* data, uint32_t length) {
DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
sink_.UnsafeAppend(&length, sizeof(uint32_t));
uint32_t length_le = ::arrow::bit_util::ToLittleEndian(length);
sink_.UnsafeAppend(&length_le, sizeof(uint32_t));
sink_.UnsafeAppend(data, static_cast<int64_t>(length));
unencoded_byte_array_data_bytes_ += length;
}
Expand Down Expand Up @@ -201,7 +202,37 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
template <typename DType>
void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
if (num_values > 0) {
#if ARROW_LITTLE_ENDIAN
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
#else
// On big-endian systems, except for bool type, we need to byte-swap each value,
// since Parquet data must be stored in little-endian format.
if constexpr (std::is_arithmetic_v<T> && !(std::is_same_v<T, bool>)) {
constexpr int kSmallBufferSize = 128;
T* temp_data = nullptr;
std::array<T, kSmallBufferSize> small_buffer;
std::unique_ptr<::arrow::Buffer> heap_buffer;

// Use stack memory for smaller buffer sizes
if (num_values <= kSmallBufferSize) {
temp_data = small_buffer.data();
} else {
// Use heap memory for larger sizes
PARQUET_ASSIGN_OR_THROW(
heap_buffer,
::arrow::AllocateBuffer(num_values * sizeof(T), this->memory_pool()));
temp_data = reinterpret_cast<T*>(heap_buffer->mutable_data());
}

for (int i = 0; i < num_values; ++i) {
temp_data[i] = ::arrow::bit_util::ToLittleEndian(buffer[i]);
}
PARQUET_THROW_NOT_OK(sink_.Append(temp_data, num_values * sizeof(T)));
} else {
// For other types (Int96, etc.), just do memcpy
PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
}
#endif
}
}

Expand All @@ -224,18 +255,38 @@ void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
constexpr auto value_size = sizeof(value_type);
auto raw_values = checked_cast<const ArrayType&>(values).raw_values();

if (values.null_count() == 0) {
// no nulls, just dump the data
PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
} else {
PARQUET_THROW_NOT_OK(
sink->Reserve((values.length() - values.null_count()) * value_size));
const int64_t len = values.length();
const int64_t nulls = values.null_count();
const int64_t valid_count = len - nulls;

for (int64_t i = 0; i < values.length(); i++) {
if (values.IsValid(i)) {
sink->UnsafeAppend(&raw_values[i], value_size);
}
#if ARROW_LITTLE_ENDIAN
// Fast path: no nulls → bulk append
if (nulls == 0) {
PARQUET_THROW_NOT_OK(sink->Append(raw_values, len * value_size));
return;
}
#endif

// Reserve only once
PARQUET_THROW_NOT_OK(sink->Reserve(valid_count * value_size));

// Fallback path: need to check nulls OR endian conversion
for (int64_t i = 0; i < len; ++i) {
if (!values.IsValid(i)) continue;

#if ARROW_LITTLE_ENDIAN
// Little-endian, nulls exist → per-element append
sink->UnsafeAppend(&raw_values[i], value_size);
#else
// Big-endian logic
if constexpr (std::is_arithmetic_v<value_type> &&
!(std::is_same_v<value_type, bool>)) {
auto le_value = ::arrow::bit_util::ToLittleEndian(raw_values[i]);
sink->UnsafeAppend(&le_value, value_size);
} else {
sink->UnsafeAppend(&raw_values[i], value_size);
}
#endif
}
}

Expand Down Expand Up @@ -648,17 +699,36 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {

template <typename DType>
void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) const {
// For primitive types, only a memcpy
// For primitive types, copy values with endianness conversion
DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
#if ARROW_LITTLE_ENDIAN
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
#else
// On big-endian systems, we need to byte-swap each value
// since Parquet data must be stored in little-endian format.
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint32_t> ||
std::is_same_v<T, int64_t> || std::is_same_v<T, uint64_t> ||
std::is_same_v<T, float> || std::is_same_v<T, double>) {
std::vector<T> temp(memo_table_.size());
memo_table_.CopyValues(0 /* start_pos */, temp.data());
T* out = reinterpret_cast<T*>(buffer);
for (size_t i = 0; i < temp.size(); ++i) {
out[i] = ::arrow::bit_util::ToLittleEndian(temp[i]);
}
} else {
// For other types (Int96, etc.), just do memcpy
memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
}
#endif
}

// ByteArray and FLBA already have the dictionary encoded in their data heaps
template <>
void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) const {
memo_table_.VisitValues(0, [&buffer](::std::string_view v) {
uint32_t len = static_cast<uint32_t>(v.length());
memcpy(buffer, &len, sizeof(len));
uint32_t len_le = ::arrow::bit_util::ToLittleEndian(len);
memcpy(buffer, &len_le, sizeof(len_le));
buffer += sizeof(len);
memcpy(buffer, v.data(), len);
buffer += len;
Expand Down Expand Up @@ -923,6 +993,8 @@ class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {

void Put(const T* buffer, int num_values) override {
if (num_values > 0) {
// ByteStreamSplitEncode (DoSplitStreams) handles endianness correctly,
// so we can directly append the native byte representation
PARQUET_THROW_NOT_OK(
this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
num_values * static_cast<int64_t>(sizeof(T))));
Expand Down Expand Up @@ -963,10 +1035,22 @@ class ByteStreamSplitEncoder<FLBAType> : public ByteStreamSplitEncoderBase<FLBAT
if (byte_width_ > 0) {
const int64_t total_bytes = static_cast<int64_t>(num_values) * byte_width_;
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
#if !ARROW_LITTLE_ENDIAN
// On big-endian, reverse bytes before encoding to compensate for
// DoSplitStreams reversal, ensuring FLBA bytes are preserved as-is
std::vector<uint8_t> temp_buffer(byte_width_);
#endif
for (int i = 0; i < num_values; ++i) {
// Write the result to the output stream
DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
#if !ARROW_LITTLE_ENDIAN
// Reverse bytes before appending
std::reverse_copy(buffer[i].ptr, buffer[i].ptr + byte_width_,
temp_buffer.begin());
sink_.UnsafeAppend(temp_buffer.data(), byte_width_);
#else
sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
#endif
}
}
this->num_values_in_buffer_ += num_values;
Expand Down
Loading