Skip to content

Commit 3547e1c

Browse files
committed
Implement direct reads of Parquet RLE data into Arrow REE for string types
1 parent 57c329f commit 3547e1c

28 files changed

+796
-33
lines changed

cpp/src/arrow/compute/kernels/scalar_cast_internal.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ Status UnpackDictionary(KernelContext* ctx, const ExecSpan& batch, ExecResult* o
209209
return Status::OK();
210210
}
211211

212+
Status DecodeRunEndEncoded(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
213+
Datum input(batch[0].array.ToArrayData());
214+
ARROW_ASSIGN_OR_RAISE(Datum decoded, RunEndDecode(input, ctx->exec_context()));
215+
out->value = std::move(decoded.array());
216+
return Status::OK();
217+
}
218+
212219
Status OutputAllNull(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
213220
// TODO(wesm): there is no good reason to have to use ArrayData here, so we
214221
// should clean this up later. This is used in the dict<null>->null cast
@@ -286,6 +293,12 @@ static bool CanCastFromDictionary(Type::type type_id) {
286293
is_fixed_size_binary(type_id));
287294
}
288295

296+
static bool CanCastFromRunEndEncoded(Type::type type_id) {
297+
return (is_primitive(type_id) ||
298+
is_base_binary_like(type_id) ||
299+
is_fixed_size_binary(type_id));
300+
}
301+
289302
void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* func) {
290303
// From null to this type
291304
ScalarKernel kernel;
@@ -303,6 +316,13 @@ void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* fun
303316
MemAllocation::NO_PREALLOCATE));
304317
}
305318

319+
// From run-end-encoded to this type
320+
if (CanCastFromRunEndEncoded(out_type_id)) {
321+
DCHECK_OK(func->AddKernel(Type::RUN_END_ENCODED, {InputType(Type::RUN_END_ENCODED)}, out_ty,
322+
DecodeRunEndEncoded, NullHandling::COMPUTED_NO_PREALLOCATE,
323+
MemAllocation::NO_PREALLOCATE));
324+
}
325+
306326
// From extension type to this type
307327
DCHECK_OK(func->AddKernel(Type::EXTENSION, {InputType(Type::EXTENSION)}, out_ty,
308328
CastFromExtension, NullHandling::COMPUTED_NO_PREALLOCATE,

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
115115
auto column_index = metadata.schema()->ColumnIndex(name);
116116
properties.set_read_dictionary(column_index, true);
117117
}
118+
for (const std::string& name : format.reader_options.ree_columns) {
119+
auto column_index = metadata.schema()->ColumnIndex(name);
120+
properties.set_read_ree(column_index, true);
121+
}
118122
properties.set_coerce_int96_timestamp_unit(
119123
format.reader_options.coerce_int96_timestamp_unit);
120124
properties.set_binary_type(format.reader_options.binary_type);
@@ -445,6 +449,7 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {
445449

446450
// FIXME implement comparison for decryption options
447451
return (reader_options.dict_columns == other_reader_options.dict_columns &&
452+
reader_options.ree_columns == other_reader_options.ree_columns &&
448453
reader_options.coerce_int96_timestamp_unit ==
449454
other_reader_options.coerce_int96_timestamp_unit &&
450455
reader_options.binary_type == other_reader_options.binary_type &&

cpp/src/arrow/dataset/file_parquet.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
8989
///
9090
/// @{
9191
std::unordered_set<std::string> dict_columns;
92+
std::unordered_set<std::string> ree_columns;
9293
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
9394
Type::type binary_type = Type::BINARY;
9495
Type::type list_type = Type::LIST;

cpp/src/arrow/dataset/file_parquet_test.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,17 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) {
271271
AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false);
272272
}
273273

274+
TEST_F(TestParquetFileFormat, InspectReeEncoded) {
275+
auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
276+
auto source = GetFileSource(reader.get());
277+
278+
format_->reader_options.ree_columns = {"utf8"};
279+
ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));
280+
281+
Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))});
282+
AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false);
283+
}
284+
274285
TEST_F(TestParquetFileFormat, IsSupported) { TestIsSupported(); }
275286

276287
TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { TestWrite(); }
@@ -617,6 +628,25 @@ TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderDictEncoded) {
617628
}
618629
ASSERT_EQ(row_count, expected_rows());
619630
}
631+
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderReeEncoded) {
632+
auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
633+
auto source = GetFileSource(reader.get());
634+
635+
SetSchema(reader->schema()->fields());
636+
SetFilter(literal(true));
637+
format_->reader_options.ree_columns = {"utf8"};
638+
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
639+
640+
int64_t row_count = 0;
641+
Schema expected_schema({field("utf8", run_end_encoded(int32(), utf8()))});
642+
643+
for (auto maybe_batch : PhysicalBatches(fragment)) {
644+
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
645+
row_count += batch->num_rows();
646+
AssertSchemaEqual(*batch->schema(), expected_schema, /* check_metadata = */ false);
647+
}
648+
ASSERT_EQ(row_count, expected_rows());
649+
}
620650
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderPreBuffer) {
621651
auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
622652
auto source = GetFileSource(reader.get());

cpp/src/arrow/util/rle_encoding_internal.h

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,14 @@ class RleBitPackedDecoder {
426426
/// values.
427427
[[nodiscard]] bool Get(value_type* val);
428428

429+
// Get the next logical value and num_repeats within the specified batch_size.
430+
[[nodiscard]] bool GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size);
431+
432+
/// Like GetNextValueAndNumRepeats but add spacing for null entries.
433+
[[nodiscard]] bool GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null,
434+
int* num_repeats, int batch_size,
435+
const uint8_t* valid_bits, int64_t valid_bits_offset);
436+
429437
/// Get a batch of values return the number of decoded elements.
430438
/// May write fewer elements to the output than requested if there are not enough values
431439
/// left or if an error occurred.
@@ -722,7 +730,6 @@ void RleBitPackedDecoder<T>::ParseWithCallable(Callable&& func) {
722730
auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); }
723731
auto OnRleRun(RleRun run) { return func(std::move(run)); }
724732
} handler{std::move(func)};
725-
726733
parser_.Parse(std::move(handler));
727734
}
728735

@@ -731,6 +738,69 @@ bool RleBitPackedDecoder<T>::Get(value_type* val) {
731738
return GetBatch(val, 1) == 1;
732739
}
733740

741+
template <typename T>
742+
bool RleBitPackedDecoder<T>::GetNextValueAndNumRepeats(value_type* val, int* num_repeats, int batch_size) {
743+
using ControlFlow = RleBitPackedParser::ControlFlow;
744+
745+
if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
746+
if (std::holds_alternative<BitPackedRunDecoder<value_type>>(decoder_)) {
747+
auto& decoder = std::get<BitPackedRunDecoder<value_type>>(decoder_);
748+
*num_repeats = 1;
749+
return decoder.Get(val, value_bit_width_);
750+
} else {
751+
auto& decoder = std::get<RleRunDecoder<value_type>>(decoder_);
752+
*num_repeats = std::min(decoder.remaining(), batch_size);
753+
ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats);
754+
return decoder.Get(val, value_bit_width_);
755+
}
756+
}
757+
758+
bool read_new_value = false;
759+
760+
ParseWithCallable([&](auto run) {
761+
if constexpr(std::is_same_v<decltype(run), BitPackedRun>) {
762+
BitPackedRunDecoder<T> decoder(run, value_bit_width_);
763+
read_new_value = decoder.Get(val, value_bit_width_);
764+
*num_repeats = 1;
765+
decoder_ = std::move(decoder);
766+
return ControlFlow::Break;
767+
}
768+
else {
769+
RleRunDecoder<T> decoder(run, value_bit_width_);
770+
*num_repeats = std::min(decoder.remaining(), batch_size);
771+
read_new_value = decoder.Get(val, value_bit_width_);
772+
ARROW_DCHECK_EQ(decoder.Advance(*num_repeats, value_bit_width_), *num_repeats);
773+
decoder_ = std::move(decoder);
774+
return ControlFlow::Break;
775+
}
776+
});
777+
778+
return read_new_value;
779+
}
780+
781+
template <typename T>
782+
bool RleBitPackedDecoder<T>::GetNextValueAndNumRepeatsSpaced(value_type* val, bool* is_null,
783+
int* num_repeats, int batch_size,
784+
const uint8_t* valid_bits, int64_t valid_bits_offset) {
785+
arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
786+
/*length=*/batch_size);
787+
arrow::internal::BitRun valid_run = bit_reader.NextRun();
788+
while (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
789+
valid_run = bit_reader.NextRun();
790+
}
791+
ARROW_DCHECK_GT(batch_size, 0);
792+
ARROW_DCHECK_GT(valid_run.length, 0);
793+
if (valid_run.set) {
794+
return GetNextValueAndNumRepeats(
795+
val, num_repeats,
796+
static_cast<int>(std::min(valid_run.length, static_cast<int64_t>(batch_size))));
797+
} else {
798+
*is_null = true;
799+
*num_repeats = static_cast<int>(valid_run.length);
800+
}
801+
return true;
802+
}
803+
734804
template <typename T>
735805
auto RleBitPackedDecoder<T>::GetBatch(value_type* out, rle_size_t batch_size)
736806
-> rle_size_t {

cpp/src/parquet/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,10 @@ add_parquet_test(arrow-reader-writer-test
406406
SOURCES
407407
arrow/arrow_reader_writer_test.cc
408408
arrow/arrow_statistics_test.cc
409-
arrow/variant_test.cc)
409+
arrow/variant_test.cc
410+
EXTRA_LINK_LIBS
411+
arrow_compute_core_testing
412+
arrow_acero_testing)
410413

411414
add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
412415
arrow/reconstruct_internal_test.cc)

0 commit comments

Comments
 (0)