Skip to content

Commit a2f6f94

Browse files
committed
[C++][Dataset] Apply stripe-selected ORC scanning with conservative null/overflow handling
1 parent 237419a commit a2f6f94

File tree

4 files changed

+122
-23
lines changed

4 files changed

+122
-23
lines changed

cpp/src/arrow/adapters/orc/adapter.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,31 @@ class ORCFileReader::Impl {
642642
pool_);
643643
}
644644

645+
Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
646+
int64_t batch_size, const std::vector<std::string>& include_names) {
647+
if (current_row_ >= NumberOfRows()) {
648+
return nullptr;
649+
}
650+
651+
liborc::RowReaderOptions opts = DefaultRowReaderOptions();
652+
if (!include_names.empty()) {
653+
RETURN_NOT_OK(SelectNames(&opts, include_names));
654+
}
655+
StripeInformation stripe_info{0, 0, 0, 0};
656+
RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
657+
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
658+
std::unique_ptr<liborc::RowReader> row_reader;
659+
660+
ORC_BEGIN_CATCH_NOT_OK
661+
row_reader = reader_->createRowReader(opts);
662+
row_reader->seekToRow(current_row_);
663+
current_row_ = stripe_info.first_row_id + stripe_info.num_rows;
664+
ORC_END_CATCH_NOT_OK
665+
666+
return std::make_shared<OrcStripeReader>(std::move(row_reader), schema, batch_size,
667+
pool_);
668+
}
669+
645670
Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
646671
int64_t batch_size, const std::vector<std::string>& include_names) {
647672
liborc::RowReaderOptions opts = DefaultRowReaderOptions();
@@ -746,6 +771,11 @@ Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
746771
return impl_->NextStripeReader(batch_size, include_indices);
747772
}
748773

774+
Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
775+
int64_t batch_size, const std::vector<std::string>& include_names) {
776+
return impl_->NextStripeReader(batch_size, include_names);
777+
}
778+
749779
int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
750780

751781
int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }

cpp/src/arrow/adapters/orc/adapter.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,22 @@ class ARROW_EXPORT ORCFileReader {
163163
Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
164164
int64_t batch_size, const std::vector<int>& include_indices);
165165

166+
/// \brief Get a stripe level record batch iterator.
167+
///
168+
/// Each record batch will have up to `batch_size` rows.
169+
/// NextStripeReader serves as a fine-grained alternative to ReadStripe
170+
/// which may cause OOM issues by loading the whole stripe into memory.
171+
///
172+
/// Note this will only read rows for the current stripe, not the entire
173+
/// file.
174+
///
175+
/// \param[in] batch_size the maximum number of rows in each record batch
176+
/// \param[in] include_names the selected field names to read, if not empty
177+
/// (otherwise all fields are read)
178+
/// \return the stripe reader
179+
Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
180+
int64_t batch_size, const std::vector<std::string>& include_names);
181+
166182
/// \brief Get a record batch iterator for the entire file.
167183
///
168184
/// Each record batch will have up to `batch_size` rows.

cpp/src/arrow/dataset/file_orc.cc

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "arrow/util/logging.h"
3232
#include "arrow/util/string.h"
3333
#include "arrow/util/thread_pool.h"
34+
#include <limits>
3435
#include <numeric>
3536

3637
namespace arrow {
@@ -87,6 +88,7 @@ class OrcScanTask {
8788
const FileFormat& format,
8889
const ScanOptions& scan_options,
8990
const std::shared_ptr<FileFragment>& fragment) {
91+
ARROW_UNUSED(format);
9092
ARROW_ASSIGN_OR_RAISE(
9193
auto reader,
9294
OpenORCReader(source, std::make_shared<ScanOptions>(scan_options)));
@@ -123,23 +125,47 @@ class OrcScanTask {
123125
std::iota(stripe_indices.begin(), stripe_indices.end(), 0);
124126
}
125127

126-
// For this PR, we read all stripes but the infrastructure is in place
127-
// A future PR can add GetRecordBatchReader overload with stripe_indices
128-
std::shared_ptr<RecordBatchReader> record_batch_reader;
129-
ARROW_ASSIGN_OR_RAISE(
130-
record_batch_reader,
131-
reader->GetRecordBatchReader(scan_options.batch_size, included_fields));
128+
if (stripe_indices.empty()) {
129+
return MakeEmptyIterator<std::shared_ptr<RecordBatch>>();
130+
}
132131

133-
return RecordBatchIterator(Impl{std::move(record_batch_reader)});
132+
return RecordBatchIterator(Impl{std::move(reader), std::move(included_fields),
133+
std::move(stripe_indices), scan_options.batch_size,
134+
0, nullptr});
134135
}
135136

136137
Result<std::shared_ptr<RecordBatch>> Next() {
137-
std::shared_ptr<RecordBatch> batch;
138-
RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch));
139-
return batch;
138+
while (true) {
139+
if (current_stripe_reader_) {
140+
std::shared_ptr<RecordBatch> batch;
141+
RETURN_NOT_OK(current_stripe_reader_->ReadNext(&batch));
142+
if (batch) {
143+
return batch;
144+
}
145+
current_stripe_reader_.reset();
146+
++next_stripe_index_;
147+
continue;
148+
}
149+
150+
if (next_stripe_index_ >= stripe_indices_.size()) {
151+
return IterationEnd<std::shared_ptr<RecordBatch>>();
152+
}
153+
154+
const auto stripe = stripe_indices_[next_stripe_index_];
155+
const auto stripe_info = reader_->GetStripeInformation(stripe);
156+
RETURN_NOT_OK(reader_->Seek(stripe_info.first_row_id));
157+
ARROW_ASSIGN_OR_RAISE(
158+
current_stripe_reader_,
159+
reader_->NextStripeReader(batch_size_, included_fields_));
160+
}
140161
}
141162

142-
std::shared_ptr<RecordBatchReader> record_batch_reader_;
163+
std::unique_ptr<adapters::orc::ORCFileReader> reader_;
164+
std::vector<std::string> included_fields_;
165+
std::vector<int> stripe_indices_;
166+
int64_t batch_size_;
167+
size_t next_stripe_index_;
168+
std::shared_ptr<RecordBatchReader> current_stripe_reader_;
143169
};
144170

145171
return Impl::Make(fragment_->source(),
@@ -319,10 +345,13 @@ Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
319345

320346
// Open reader if not already cached
321347
if (!cached_reader_) {
322-
ARROW_ASSIGN_OR_RAISE(auto input,
323-
arrow::io::RandomAccessFile::Open(source().path()));
324-
ARROW_ASSIGN_OR_RAISE(cached_reader_,
325-
adapters::orc::ORCFileReader::Open(input, arrow::default_memory_pool()));
348+
auto lock = metadata_mutex_.Lock();
349+
if (!cached_reader_) {
350+
ARROW_ASSIGN_OR_RAISE(auto input, source().Open());
351+
ARROW_ASSIGN_OR_RAISE(
352+
cached_reader_,
353+
adapters::orc::ORCFileReader::Open(std::move(input), arrow::default_memory_pool()));
354+
}
326355
}
327356

328357
// Process each field referenced in predicate (lazy evaluation)
@@ -349,8 +378,8 @@ Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
349378
}
350379
statistics_expressions_complete_[field_index] = true;
351380

352-
// PR4 limitation: only support INT64
353-
if (field->type()->id() != Type::INT64) {
381+
// Support INT32 and INT64 types
382+
if (field->type()->id() != Type::INT32 && field->type()->id() != Type::INT64) {
354383
continue; // Unsupported type
355384
}
356385

@@ -370,6 +399,16 @@ Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
370399
continue; // No statistics
371400
}
372401

402+
auto field_expr = compute::field_ref(field_ref);
403+
const bool has_null = col_stats->hasNull();
404+
const bool is_all_null = has_null && col_stats->getNumberOfValues() == 0;
405+
406+
if (is_all_null) {
407+
// All values are null in this stripe for this column.
408+
FoldingAnd(&statistics_expressions_[stripe_idx], compute::is_null(field_expr));
409+
continue;
410+
}
411+
373412
const auto* int_stats =
374413
dynamic_cast<const liborc::IntegerColumnStatistics*>(col_stats);
375414
if (!int_stats || !int_stats->hasMinimum() || !int_stats->hasMaximum()) {
@@ -378,16 +417,28 @@ Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
378417

379418
int64_t min_value = int_stats->getMinimum();
380419
int64_t max_value = int_stats->getMaximum();
381-
bool has_null = col_stats->hasNull();
382420

383421
if (min_value > max_value) {
384422
continue; // Invalid statistics
385423
}
386424

387-
// Build guarantee expression (from PR2 logic)
388-
auto field_expr = compute::field_ref(field_ref);
389-
auto min_scalar = std::make_shared<Int64Scalar>(min_value);
390-
auto max_scalar = std::make_shared<Int64Scalar>(max_value);
425+
// Build guarantee expression
426+
std::shared_ptr<Scalar> min_scalar, max_scalar;
427+
428+
// Handle INT32 with overflow protection
429+
if (field->type()->id() == Type::INT32) {
430+
// Check for INT32 overflow
431+
if (min_value < std::numeric_limits<int32_t>::min() ||
432+
max_value > std::numeric_limits<int32_t>::max()) {
433+
// Statistics overflow - skip predicate pushdown for safety
434+
continue;
435+
}
436+
min_scalar = std::make_shared<Int32Scalar>(static_cast<int32_t>(min_value));
437+
max_scalar = std::make_shared<Int32Scalar>(static_cast<int32_t>(max_value));
438+
} else {
439+
min_scalar = std::make_shared<Int64Scalar>(min_value);
440+
max_scalar = std::make_shared<Int64Scalar>(max_value);
441+
}
391442

392443
auto min_expr = compute::greater_equal(field_expr, compute::literal(*min_scalar));
393444
auto max_expr = compute::less_equal(field_expr, compute::literal(*max_scalar));
@@ -421,6 +472,7 @@ Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
421472

422473
Result<std::vector<int>> OrcFileFragment::FilterStripes(
423474
const compute::Expression& predicate) {
475+
RETURN_NOT_OK(EnsureMetadataCached());
424476

425477
// Feature flag for disabling predicate pushdown
426478
if (auto env_var = arrow::internal::GetEnvVar("ARROW_ORC_DISABLE_PREDICATE_PUSHDOWN")) {

cpp/src/arrow/dataset/file_orc.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class ARROW_DS_EXPORT OrcFileFragment : public FileFragment {
8282
/// \brief Filter stripes based on predicate using stripe statistics
8383
///
8484
/// Returns indices of stripes where the predicate may be satisfied.
85-
/// Currently supports INT64 columns with greater-than operator only.
85+
/// Supports INT32/INT64 columns and conservative handling of missing or
86+
/// unsupported statistics.
8687
///
8788
/// \param predicate Arrow compute expression to evaluate
8889
/// \return Vector of stripe indices to read (0-based)

0 commit comments

Comments
 (0)