Skip to content

Commit 8c94e80

Browse files
committed
Some review feedback
1 parent 7704cf3 commit 8c94e80

File tree

8 files changed

+95
-94
lines changed

8 files changed

+95
-94
lines changed

cpp/src/parquet/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,9 @@ add_arrow_lib(parquet
309309
STATIC_LINK_LIBS
310310
${PARQUET_STATIC_LINK_LIBS}
311311
STATIC_INSTALL_INTERFACE_LIBS
312-
${PARQUET_STATIC_INSTALL_INTERFACE_LIBS})
313-
314-
target_include_directories(parquet_objlib SYSTEM PRIVATE ${ARROW_SOURCE_DIR}/thirdparty/flatbuffers/include)
312+
${PARQUET_STATIC_INSTALL_INTERFACE_LIBS}
313+
PRIVATE_INCLUDES
314+
${ARROW_SOURCE_DIR}/thirdparty/flatbuffers/include)
315315

316316
if(WIN32 AND NOT (ARROW_TEST_LINKAGE STREQUAL "static"))
317317
add_library(parquet_test_support STATIC

cpp/src/parquet/file_reader.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,13 +441,13 @@ class SerializedFile : public ParquetFileReader::Contents {
441441
PARQUET_ASSIGN_OR_THROW(
442442
auto footer_buffer,
443443
source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
444-
if (properties_.read_metadata3()) {
444+
if (properties_.read_flatbuffer_metadata_if_present()) {
445445
// Try to extract flatbuffer metadata from footer
446446
std::string flatbuffer_data;
447447
auto result = ExtractFlatbuffer(footer_buffer, &flatbuffer_data);
448448
if (result.ok()) {
449-
int32_t required_or_consumed = *result;
450-
if (required_or_consumed > static_cast<int32_t>(footer_buffer->size())) {
449+
uint32_t required_or_consumed = *result;
450+
if (required_or_consumed > static_cast<uint32_t>(footer_buffer->size())) {
451451
PARQUET_ASSIGN_OR_THROW(
452452
footer_buffer,
453453
source_->ReadAt(source_size_ - required_or_consumed, required_or_consumed));
@@ -461,8 +461,8 @@ class SerializedFile : public ParquetFileReader::Contents {
461461
format3::GetFileMetaData(flatbuffer_data.data());
462462
auto thrift_metadata =
463463
std::make_unique<format::FileMetaData>(FromFlatbuffer(fb_metadata));
464-
file_metadata_ = FileMetaData::Make(
465-
std::move(thrift_metadata), static_cast<uint32_t>(*result), properties_);
464+
file_metadata_ =
465+
FileMetaData::Make(std::move(thrift_metadata), *result, properties_);
466466
return;
467467
}
468468
}

cpp/src/parquet/file_writer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ class PARQUET_EXPORT RowGroupWriter {
110110
};
111111

112112
PARQUET_EXPORT
113-
void WriteFileMetaData(const FileMetaData& file_metadata,
114-
::arrow::io::OutputStream* sink,
113+
void WriteFileMetaData(const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink,
115114
bool use_metadata3 = false);
116115

117116
PARQUET_EXPORT

cpp/src/parquet/metadata.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
#include <string>
2626
#include <vector>
2727

28+
#include "generated/parquet_types.h"
2829
#include "parquet/encryption/type_fwd.h"
2930
#include "parquet/index_location.h"
3031
#include "parquet/platform.h"
3132
#include "parquet/properties.h"
3233
#include "parquet/type_fwd.h"
33-
#include "generated/parquet_types.h"
3434

3535
namespace parquet {
3636

@@ -389,8 +389,8 @@ class PARQUET_EXPORT FileMetaData {
389389
const ReaderProperties& properties,
390390
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
391391

392-
explicit FileMetaData(std::unique_ptr<format::FileMetaData> metadata, uint32_t metadata_len,
393-
const ReaderProperties& properties);
392+
explicit FileMetaData(std::unique_ptr<format::FileMetaData> metadata,
393+
uint32_t metadata_len, const ReaderProperties& properties);
394394

395395
void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor);
396396
const std::shared_ptr<InternalFileDecryptor>& file_decryptor() const;

cpp/src/parquet/metadata3.cc

Lines changed: 50 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
#include <string>
2222
#include <vector>
2323

24+
#include "arrow/util/bit_util.h"
2425
#include "arrow/util/compression.h"
2526
#include "arrow/util/crc32.h"
2627
#include "arrow/util/endian.h"
2728
#include "arrow/util/ubsan.h"
2829
#include "arrow/util/unreachable.h"
29-
#include "parquet/file_writer.h"
3030
#include "generated/parquet_types.h"
31+
#include "parquet/file_writer.h"
3132
#include "parquet/thrift_internal.h"
3233

3334
namespace parquet {
@@ -92,30 +93,35 @@ static_assert(IsEnumEq(format::PageType::DICTIONARY_PAGE,
9293
constexpr double kMinCompressionRatio = 1.2;
9394

9495
constexpr uint8_t kExtUUID[16] = {0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef,
95-
0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef};
96+
0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef};
9697

9798
// Extended format compression codec (using same values as format3::CompressionCodec)
9899
enum class CompressionCodec : uint8_t {
99100
UNCOMPRESSED = 0,
100101
LZ4_RAW = 7,
101102
};
102103

103-
auto GetNumChildren(
104+
int32_t GetNumChildren(
104105
const flatbuffers::Vector<flatbuffers::Offset<format3::SchemaElement>>& s, size_t i) {
105106
return s.Get(i)->num_children();
106107
}
107108

108-
auto GetNumChildren(const std::vector<format::SchemaElement>& s, size_t i) {
109+
int32_t GetNumChildren(const std::vector<format::SchemaElement>& s, size_t i) {
109110
return s[i].num_children;
110111
}
111112

112-
auto GetName(const flatbuffers::Vector<flatbuffers::Offset<format3::SchemaElement>>& s,
113-
size_t i) {
113+
std::string GetName(
114+
const flatbuffers::Vector<flatbuffers::Offset<format3::SchemaElement>>& s, size_t i) {
114115
return s.Get(i)->name()->str();
115116
}
116117

117-
auto GetName(const std::vector<format::SchemaElement>& s, size_t i) { return s[i].name; }
118+
std::string GetName(const std::vector<format::SchemaElement>& s, size_t i) {
119+
return s[i].name;
120+
}
118121

122+
// Maps between column chunk indices (leaf columns only) and schema element indices
123+
// (all columns including groups). Also tracks parent relationships for building
124+
// column paths.
119125
class ColumnMap {
120126
public:
121127
template <typename Schema>
@@ -126,7 +132,9 @@ class ColumnMap {
126132
BuildParents(s);
127133
}
128134

135+
// Convert a column chunk index to the corresponding schema element index.
129136
size_t ToSchema(size_t cc_idx) const { return colchunk2schema_[cc_idx]; }
137+
// Convert a schema element index to its column chunk index, if it is a leaf column.
130138
std::optional<size_t> ToCc(size_t schema_idx) const {
131139
auto it =
132140
std::lower_bound(colchunk2schema_.begin(), colchunk2schema_.end(), schema_idx);
@@ -177,6 +185,9 @@ class ColumnMap {
177185
std::vector<uint32_t> parents_;
178186
};
179187

188+
// Packed representation of min/max statistics for a column chunk.
189+
// Values are split into lo4 (4 bytes), lo8 (8 bytes), and hi8 (8 bytes) parts
190+
// to allow compact flatbuffer encoding.
180191
struct MinMax {
181192
struct Packed {
182193
uint32_t lo4 = 0;
@@ -1137,80 +1148,62 @@ static std::string PackFlatbuffer(const std::string& in) {
11371148
uint8_t* const p = reinterpret_cast<uint8_t*>(out.data()) + n + 1;
11381149

11391150
// Compute and store checksums and lengths
1140-
uint32_t crc32 = ::arrow::internal::crc32(0, reinterpret_cast<const uint8_t*>(out.data()), n + 1);
1141-
StoreLE32(crc32, p + 0); // crc32(data .. compressor)
1142-
StoreLE32(n, p + 4); // compressed_len
1143-
StoreLE32(in.size(), p + 8); // raw_len
1151+
uint32_t crc32 =
1152+
::arrow::internal::crc32(0, reinterpret_cast<const uint8_t*>(out.data()), n + 1);
1153+
StoreLE32(crc32, p + 0); // crc32(data .. compressor)
1154+
StoreLE32(n, p + 4); // compressed_len
1155+
StoreLE32(in.size(), p + 8); // raw_len
11441156
uint32_t len_crc32 = ::arrow::internal::crc32(0, p + 4, 8);
1145-
StoreLE32(len_crc32, p + 12); // crc32(compressed_len .. raw_len)
1157+
StoreLE32(len_crc32, p + 12); // crc32(compressed_len .. raw_len)
11461158

11471159
// Store UUID identifier
11481160
std::memcpy(p + 16, kExtUUID, 16);
11491161
out.resize(n + 33);
11501162
return out;
11511163
}
11521164

1153-
inline uint8_t* WriteULEB64(uint64_t v, uint8_t* out) {
1154-
uint8_t* p = out;
1155-
do {
1156-
uint8_t b = v & 0x7F;
1157-
if (v < 0x80) {
1158-
*p++ = b;
1159-
return p;
1160-
}
1161-
*p++ = b | 0x80;
1162-
v >>= 7;
1163-
} while (true);
1164-
}
1165-
1166-
inline uint32_t CountLeadingZeros32(uint32_t v) {
1167-
if (v == 0) return 32;
1168-
uint32_t count = 0;
1169-
uint32_t mask = 0x80000000;
1170-
while ((v & mask) == 0) {
1171-
++count;
1172-
mask >>= 1;
1173-
}
1174-
return count;
1175-
}
1176-
1177-
inline int32_t ULEB32Len(uint32_t v) {
1178-
return 1 + ((32 - CountLeadingZeros32(v | 0x1)) * 9) / 64;
1179-
}
1180-
11811165
void AppendFlatbuffer(std::string flatbuffer, std::string* thrift) {
1166+
using ::arrow::bit_util::kMaxLEB128ByteLenFor;
1167+
using ::arrow::bit_util::WriteLEB128;
1168+
11821169
// Pack the flatbuffer with LZ4 compression and checksums
11831170
std::string packed = PackFlatbuffer(flatbuffer);
11841171

11851172
const uint32_t kFieldId = 32767;
1186-
int header_size = 1 + ULEB32Len(kFieldId) + ULEB32Len(packed.length());
1173+
// Max header: 1 (type byte) + max ULEB for field id + max ULEB for packed length
1174+
constexpr int32_t kMaxHeaderSize =
1175+
1 + kMaxLEB128ByteLenFor<uint32_t> + kMaxLEB128ByteLenFor<uint32_t>;
11871176

11881177
const size_t old_size = thrift->size();
1189-
thrift->resize(old_size + header_size + packed.size() + 1); // +1 for stop field
1178+
thrift->resize(old_size + kMaxHeaderSize + packed.size() + 1); // +1 for stop field
11901179

11911180
// Pointer to the new write position
11921181
uint8_t* p = reinterpret_cast<uint8_t*>(&(*thrift)[old_size]);
11931182

11941183
// Write the binary type indicator
11951184
*p++ = 0x08;
11961185

1197-
// Write field id and size using ULEB64
1198-
p = WriteULEB64(kFieldId, p);
1199-
p = WriteULEB64(static_cast<uint32_t>(packed.size()), p);
1186+
// Write field id and size using ULEB128
1187+
p += WriteLEB128(kFieldId, p, kMaxLEB128ByteLenFor<uint32_t>);
1188+
p += WriteLEB128(static_cast<uint32_t>(packed.size()), p,
1189+
kMaxLEB128ByteLenFor<uint32_t>);
12001190

12011191
// Copy the packed payload
12021192
std::memcpy(p, packed.data(), packed.size());
12031193
p += packed.size();
12041194

12051195
// Add stop field
12061196
*p = 0x00;
1207-
return;
1197+
1198+
// Trim to actual size (header may have been smaller than max)
1199+
thrift->resize(p - reinterpret_cast<uint8_t*>(thrift->data()) + 1);
12081200
}
12091201

1210-
::arrow::Result<int32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf, std::string* out_flatbuffer) {
1202+
::arrow::Result<uint32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf,
1203+
std::string* out_flatbuffer) {
12111204
if (buf->size() < 8) return 8;
12121205
PARQUET_THROW_NOT_OK(CheckMagicNumber(buf->data() + buf->size() - 4));
1213-
uint32_t md_len = LoadLE32(buf->data() + buf->size() -8);
1206+
uint32_t md_len = LoadLE32(buf->data() + buf->size() - 8);
12141207
if (md_len < 34) return 0;
12151208
if (buf->size() < 42) return 42; // 34 (metadata3 trailer) + 8 (len + PAR1)
12161209

@@ -1235,7 +1228,8 @@ ::arrow::Result<int32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf, std::str
12351228
}
12361229

12371230
// Verify data CRC
1238-
uint32_t expected_crc = ::arrow::internal::crc32(0, p - compressed_len, compressed_len + 1);
1231+
uint32_t expected_crc =
1232+
::arrow::internal::crc32(0, p - compressed_len, compressed_len + 1);
12391233
if (crc32_val != expected_crc) {
12401234
return ::arrow::Status::Invalid("Extended metadata data CRC mismatch");
12411235
}
@@ -1254,11 +1248,11 @@ ::arrow::Result<int32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf, std::str
12541248
return ::arrow::Status::Invalid("LZ4 length error: raw_len < compressed_len");
12551249
}
12561250
// Use Arrow's LZ4 codec for decompression
1257-
ARROW_ASSIGN_OR_RAISE(auto codec, ::arrow::util::Codec::Create(::arrow::Compression::LZ4));
1258-
ARROW_ASSIGN_OR_RAISE(
1259-
int64_t actual_size,
1260-
codec->Decompress(compressed_len, p - compressed_len, raw_len,
1261-
decompressed_data.data()));
1251+
ARROW_ASSIGN_OR_RAISE(auto codec,
1252+
::arrow::util::Codec::Create(::arrow::Compression::LZ4));
1253+
ARROW_ASSIGN_OR_RAISE(int64_t actual_size,
1254+
codec->Decompress(compressed_len, p - compressed_len, raw_len,
1255+
decompressed_data.data()));
12621256
if (static_cast<uint32_t>(actual_size) != raw_len) {
12631257
return ::arrow::Status::Invalid("LZ4 decompression failed: expected ", raw_len,
12641258
" bytes but got ", actual_size, " bytes");
@@ -1276,7 +1270,8 @@ ::arrow::Result<int32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf, std::str
12761270
}
12771271

12781272
ARROW_CHECK_NE(out_flatbuffer, nullptr);
1279-
out_flatbuffer->assign(reinterpret_cast<const char*>(decompressed_data.data()), raw_len);
1273+
out_flatbuffer->assign(reinterpret_cast<const char*>(decompressed_data.data()),
1274+
raw_len);
12801275

12811276
return compressed_len + 42;
12821277
}

cpp/src/parquet/metadata3.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,20 @@ bool ToFlatbuffer(format::FileMetaData* md, std::string* flatbuf);
3232
// The flatbuffer in `from` must be valid (such as one retured by `ToFlatbuffer`).
3333
format::FileMetaData FromFlatbuffer(const format3::FileMetaData* md);
3434

35-
3635
// Append/extract the flatbuffer from the footer as a thrift extension:
3736
// https://github.com/apache/parquet-format/blob/master/BinaryProtocolExtensions.md.
3837
//
3938
// `flatbuf` is the flatbuffer representation of the footer metadata.
40-
// `thrift` is the buffer containing the thrift representation of the footer metadata as its suffix.
39+
// `thrift` is the buffer containing the thrift representation of the footer metadata as
40+
// its suffix.
4141
//
4242
// Returns the number of bytes added.
4343
//
4444
// The extension itself is as follows:
4545
//
4646
// +-------------------+------------+--------------------------------------+----------------+---------+--------------------------------+------+
47-
// | compress(flatbuf) | compressor | crc(compress(flatbuf) .. compressor) | compressed_len | raw_len | crc(compressed_len .. raw_len) | UUID |
47+
// | compress(flatbuf) | compressor | crc(compress(flatbuf) .. compressor) |
48+
// compressed_len | raw_len | crc(compressed_len .. raw_len) | UUID |
4849
// +-------------------+------------+--------------------------------------+----------------+---------+--------------------------------+------+
4950
//
5051
// flatbuf: the flatbuffer representation of the footer metadata.
@@ -65,7 +66,7 @@ void AppendFlatbuffer(std::string flatbuffer, std::string* thrift);
6566
// Returns the size of the flatbuffer if found (and writes to out_flatbuffer),
6667
// returns 0 if no flatbuffer extension is present, or returns the required
6768
// buffer size if the input buffer is too small.
68-
::arrow::Result<int32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf, std::string* out_flatbuffer);
69-
70-
} // using namespace parquet
69+
::arrow::Result<uint32_t> ExtractFlatbuffer(std::shared_ptr<Buffer> buf,
70+
std::string* out_flatbuffer);
7171

72+
} // namespace parquet

0 commit comments

Comments
 (0)