Skip to content

Commit 0292132

Browse files
committed
GH-45847: [C++] Optimize Parquet column reader by fusing decoding and counting
This PR implements the optimization to fuse definition level decoding with counting in the Parquet column reader, addressing the TODO in column_reader.cc.
1 parent 4fc9f9e commit 0292132

File tree

4 files changed

+162
-6
lines changed

4 files changed

+162
-6
lines changed

cpp/src/arrow/util/rle_encoding_internal.h

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,23 @@ class RleRunDecoder {
308308
return to_read;
309309
}
310310

311+
/// Get a batch of values and count how many equal match_value
312+
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
313+
rle_size_t value_bit_width,
314+
value_type match_value, int64_t* out_count) {
315+
if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) {
316+
return 0;
317+
}
318+
319+
const auto to_read = std::min(remaining_count_, batch_size);
320+
std::fill(out, out + to_read, value_);
321+
if (value_ == match_value) {
322+
*out_count += to_read;
323+
}
324+
remaining_count_ -= to_read;
325+
return to_read;
326+
}
327+
311328
private:
312329
value_type value_ = {};
313330
rle_size_t remaining_count_ = 0;
@@ -377,6 +394,15 @@ class BitPackedRunDecoder {
377394
return steps;
378395
}
379396

397+
/// Get a batch of values and count how many equal match_value
398+
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
399+
rle_size_t value_bit_width,
400+
value_type match_value, int64_t* out_count) {
401+
auto steps = GetBatch(out, batch_size, value_bit_width);
402+
*out_count += std::count(out, out + steps, match_value);
403+
return steps;
404+
}
405+
380406
private:
381407
/// The pointer to the beginning of the run
382408
const uint8_t* data_ = nullptr;
@@ -438,6 +464,10 @@ class RleBitPackedDecoder {
438464
/// left or if an error occurred.
439465
[[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size);
440466

467+
/// Get a batch of values and count how many equal match_value
468+
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
469+
value_type match_value, int64_t* out_count);
470+
441471
/// Like GetBatch but add spacing for null entries.
442472
///
443473
/// Null entries will be set to an arbistrary value to avoid leaking private data.
@@ -483,6 +513,18 @@ class RleBitPackedDecoder {
483513
decoder_);
484514
}
485515

516+
/// Get a batch of values from the current run and return the number elements read.
517+
[[nodiscard]] rle_size_t RunGetBatchWithCount(value_type* out, rle_size_t batch_size,
518+
value_type match_value,
519+
int64_t* out_count) {
520+
return std::visit(
521+
[&](auto& dec) {
522+
return dec.GetBatchWithCount(out, batch_size, value_bit_width_, match_value,
523+
out_count);
524+
},
525+
decoder_);
526+
}
527+
486528
/// Call the parser with a single callable for all event types.
487529
template <typename Callable>
488530
void ParseWithCallable(Callable&& func);
@@ -1474,4 +1516,49 @@ inline void RleBitPackedEncoder::Clear() {
14741516
bit_writer_.Clear();
14751517
}
14761518

1519+
template <typename T>
1520+
auto RleBitPackedDecoder<T>::GetBatchWithCount(value_type* out, rle_size_t batch_size,
1521+
value_type match_value, int64_t* out_count)
1522+
-> rle_size_t {
1523+
using ControlFlow = RleBitPackedParser::ControlFlow;
1524+
1525+
rle_size_t values_read = 0;
1526+
1527+
// Remaining from a previous call that would have left some unread data from a run.
1528+
if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
1529+
const auto read = RunGetBatchWithCount(out, batch_size, match_value, out_count);
1530+
values_read += read;
1531+
out += read;
1532+
1533+
// Either we fulfilled all the batch to be read or we finished remaining run.
1534+
if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
1535+
return values_read;
1536+
}
1537+
ARROW_DCHECK(run_remaining() == 0);
1538+
}
1539+
1540+
ParseWithCallable([&](auto run) {
1541+
using RunDecoder = typename decltype(run)::template DecoderType<value_type>;
1542+
1543+
ARROW_DCHECK_LT(values_read, batch_size);
1544+
RunDecoder decoder(run, value_bit_width_);
1545+
const auto read =
1546+
decoder.GetBatchWithCount(out, batch_size - values_read, value_bit_width_,
1547+
match_value, out_count);
1548+
ARROW_DCHECK_LE(read, batch_size - values_read);
1549+
values_read += read;
1550+
out += read;
1551+
1552+
// Stop reading and store remaining decoder
1553+
if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {
1554+
decoder_ = std::move(decoder);
1555+
return ControlFlow::Break;
1556+
}
1557+
1558+
return ControlFlow::Continue;
1559+
});
1560+
1561+
return values_read;
1562+
}
1563+
14771564
} // namespace arrow::util

cpp/src/arrow/util/rle_encoding_test.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,4 +1280,48 @@ TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) {
12801280
DoTestGetBatchSpacedRoundtrip<uint64_t>();
12811281
}
12821282

1283+
TEST(Rle, GetBatchWithCount) {
1284+
const int bit_width = 1;
1285+
uint8_t buffer[100];
1286+
RleBitPackedEncoder encoder(buffer, sizeof(buffer), bit_width);
1287+
1288+
// 30 1s
1289+
for (int i = 0; i < 30; ++i) {
1290+
bool result = encoder.Put(1);
1291+
EXPECT_TRUE(result);
1292+
}
1293+
// 20 0s
1294+
for (int i = 0; i < 20; ++i) {
1295+
bool result = encoder.Put(0);
1296+
EXPECT_TRUE(result);
1297+
}
1298+
// 10 1s
1299+
for (int i = 0; i < 10; ++i) {
1300+
bool result = encoder.Put(1);
1301+
EXPECT_TRUE(result);
1302+
}
1303+
encoder.Flush();
1304+
1305+
RleBitPackedDecoder<int> decoder(buffer, sizeof(buffer), bit_width);
1306+
std::vector<int> values(60);
1307+
int64_t count = 0;
1308+
1309+
// Read first 40 values. Should be 30 1s and 10 0s.
1310+
// Count 1s.
1311+
int read = decoder.GetBatchWithCount(values.data(), 40, 1, &count);
1312+
EXPECT_EQ(read, 40);
1313+
EXPECT_EQ(count, 30);
1314+
for (int i = 0; i < 30; ++i) EXPECT_EQ(values[i], 1);
1315+
for (int i = 30; i < 40; ++i) EXPECT_EQ(values[i], 0);
1316+
1317+
// Read next 20 values. Should be 10 0s and 10 1s.
1318+
// Count 1s.
1319+
count = 0;
1320+
read = decoder.GetBatchWithCount(values.data(), 20, 1, &count);
1321+
EXPECT_EQ(read, 20);
1322+
EXPECT_EQ(count, 10);
1323+
for (int i = 0; i < 10; ++i) EXPECT_EQ(values[i], 0);
1324+
for (int i = 10; i < 20; ++i) EXPECT_EQ(values[i], 1);
1325+
}
1326+
12831327
} // namespace arrow::util

cpp/src/parquet/column_reader.cc

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,30 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) {
191191
return num_decoded;
192192
}
193193

194+
int LevelDecoder::DecodeAndCount(int batch_size, int16_t* levels, int64_t* count) {
195+
int num_decoded = 0;
196+
197+
int num_values = std::min(num_values_remaining_, batch_size);
198+
if (encoding_ == Encoding::RLE) {
199+
num_decoded =
200+
rle_decoder_->GetBatchWithCount(levels, num_values, max_level_, count);
201+
} else {
202+
num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
203+
*count += std::count(levels, levels + num_decoded, max_level_);
204+
}
205+
if (num_decoded > 0) {
206+
internal::MinMax min_max = internal::FindMinMax(levels, num_decoded);
207+
if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) {
208+
std::stringstream ss;
209+
ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max
210+
<< " out of range. Max Level: " << max_level_;
211+
throw ParquetException(ss.str());
212+
}
213+
}
214+
num_values_remaining_ -= num_decoded;
215+
return num_decoded;
216+
}
217+
194218
ReaderProperties default_reader_properties() {
195219
static ReaderProperties default_reader_properties;
196220
return default_reader_properties;
@@ -1006,14 +1030,11 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
10061030

10071031
// If the field is required and non-repeated, there are no definition levels
10081032
if (this->max_def_level_ > 0 && def_levels != nullptr) {
1009-
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
1010-
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
1033+
if (ARROW_PREDICT_FALSE(this->definition_level_decoder_.DecodeAndCount(
1034+
batch_size, def_levels, non_null_values_to_read) != batch_size)) {
10111035
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
10121036
}
1013-
// TODO(wesm): this tallying of values-to-decode can be performed with better
1014-
// cache-efficiency if fused with the level decoding.
1015-
*non_null_values_to_read +=
1016-
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
1037+
*num_def_levels = batch_size;
10171038
} else {
10181039
// Required field, read all values
10191040
if (num_def_levels != nullptr) {

cpp/src/parquet/column_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ class PARQUET_EXPORT LevelDecoder {
9292
// Decodes a batch of levels into an array and returns the number of levels decoded
9393
int Decode(int batch_size, int16_t* levels);
9494

95+
/// Decodes a batch of levels into an array and counts the number of levels equal to
96+
/// max_level_
97+
int DecodeAndCount(int batch_size, int16_t* levels, int64_t* count);
98+
9599
private:
96100
int bit_width_;
97101
int num_values_remaining_;

0 commit comments

Comments
 (0)