Skip to content

Commit 1447729

Browse files
committed
Support selection vector for kernel execution
1 parent 5227fdd commit 1447729

File tree

8 files changed

+1252
-201
lines changed

8 files changed

+1252
-201
lines changed

cpp/src/arrow/compute/exec.cc

Lines changed: 133 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
#include "arrow/util/logging_internal.h"
5151
#include "arrow/util/thread_pool.h"
5252
#include "arrow/util/vector.h"
53+
#include "arrow/visit_data_inline.h"
5354

5455
namespace arrow {
5556

@@ -359,6 +360,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
359360
have_all_scalars_ = CheckIfAllScalar(batch);
360361
promote_if_all_scalars_ = promote_if_all_scalars;
361362
position_ = 0;
363+
selection_position_ = 0;
362364
length_ = batch.length;
363365
chunk_indexes_.clear();
364366
chunk_indexes_.resize(args_->size(), 0);
@@ -367,6 +369,12 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
367369
value_offsets_.clear();
368370
value_offsets_.resize(args_->size(), 0);
369371
max_chunksize_ = std::min(length_, max_chunksize);
372+
selection_vector_ = batch.selection_vector.get();
373+
if (selection_vector_) {
374+
selection_length_ = selection_vector_->length();
375+
} else {
376+
selection_length_ = 0;
377+
}
370378
return Status::OK();
371379
}
372380

@@ -403,7 +411,7 @@ int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* spa
403411
return iteration_size;
404412
}
405413

406-
bool ExecSpanIterator::Next(ExecSpan* span) {
414+
bool ExecSpanIterator::Next(ExecSpan* span, SelectionVectorSpan* selection_span) {
407415
if (!initialized_) {
408416
span->length = 0;
409417

@@ -442,6 +450,13 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
442450
PromoteExecSpanScalars(span);
443451
}
444452

453+
if (!have_all_scalars_ || promote_if_all_scalars_) {
454+
if (selection_vector_) {
455+
DCHECK_NE(selection_span, nullptr);
456+
*selection_span = SelectionVectorSpan(selection_vector_->indices());
457+
}
458+
}
459+
445460
initialized_ = true;
446461
} else if (position_ == length_) {
447462
// We've emitted at least one span and we're at the end so we are done
@@ -465,6 +480,23 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
465480
}
466481
}
467482

483+
// Then the selection span
484+
if (selection_vector_) {
485+
DCHECK_NE(selection_span, nullptr);
486+
auto indices_begin = selection_vector_->indices() + selection_position_;
487+
auto indices_end = selection_vector_->indices() + selection_vector_->length();
488+
DCHECK_LE(indices_begin, indices_end);
489+
auto chunk_row_id_end = position_ + iteration_size;
490+
int64_t num_indices = 0;
491+
while (indices_begin + num_indices < indices_end &&
492+
*(indices_begin + num_indices) < chunk_row_id_end) {
493+
++num_indices;
494+
}
495+
selection_span->SetSlice(selection_position_, num_indices,
496+
static_cast<int32_t>(position_));
497+
selection_position_ += num_indices;
498+
}
499+
468500
position_ += iteration_size;
469501
DCHECK_LE(position_, length_);
470502
return true;
@@ -694,7 +726,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
694726
// Skip empty chunks
695727
continue;
696728
}
697-
arrays.emplace_back(val.make_array());
729+
if (val.is_chunked_array()) {
730+
for (const auto& chunk : val.chunked_array()->chunks()) {
731+
arrays.emplace_back(chunk);
732+
}
733+
} else {
734+
DCHECK(val.is_array());
735+
arrays.emplace_back(val.make_array());
736+
}
698737
}
699738
return std::make_shared<ChunkedArray>(std::move(arrays), type.GetSharedPtr());
700739
}
@@ -781,17 +820,41 @@ class KernelExecutorImpl : public KernelExecutor {
781820
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
782821
public:
783822
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
784-
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
785-
786823
if (batch.length == 0) {
787824
// For zero-length batches, we do nothing except return a zero-length
788825
// array of the correct output type
789826
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
790827
MakeArrayOfNull(output_type_.GetSharedPtr(), /*length=*/0,
791828
exec_context()->memory_pool()));
829+
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
792830
return EmitResult(result->data(), listener);
793831
}
794832

833+
if (batch.selection_vector && !kernel_->selective_exec) {
834+
return ExecuteSelectiveDense(batch, listener);
835+
}
836+
837+
return ExecuteBatch(batch, listener);
838+
}
839+
840+
Datum WrapResults(const std::vector<Datum>& inputs,
841+
const std::vector<Datum>& outputs) override {
842+
// If execution yielded multiple chunks (because large arrays were split
843+
// based on the ExecContext parameters, then the result is a ChunkedArray
844+
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
845+
return ToChunkedArray(outputs, output_type_);
846+
} else {
847+
// Outputs have just one element
848+
return outputs[0];
849+
}
850+
}
851+
852+
protected:
853+
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
854+
DCHECK(!batch.selection_vector || kernel_->selective_exec);
855+
856+
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
857+
795858
// If the executor is configured to produce a single large Array output for
796859
// kernels supporting preallocation, then we do so up front and then
797860
// iterate over slices of that large array. Otherwise, we preallocate prior
@@ -811,19 +874,40 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
811874
}
812875
}
813876

814-
Datum WrapResults(const std::vector<Datum>& inputs,
815-
const std::vector<Datum>& outputs) override {
816-
// If execution yielded multiple chunks (because large arrays were split
817-
// based on the ExecContext parameters, then the result is a ChunkedArray
818-
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
819-
return ToChunkedArray(outputs, output_type_);
820-
} else {
821-
// Outputs have just one element
822-
return outputs[0];
877+
Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) {
878+
DCHECK(batch.selection_vector && !kernel_->selective_exec);
879+
880+
if (CheckIfAllScalar(batch)) {
881+
ExecBatch input = batch;
882+
input.selection_vector = nullptr;
883+
return ExecuteBatch(input, listener);
823884
}
885+
886+
std::vector<Datum> values(batch.num_values());
887+
for (int i = 0; i < batch.num_values(); ++i) {
888+
if (batch[i].is_scalar()) {
889+
// Skip Take for scalars since it is not currently supported.
890+
values[i] = batch[i];
891+
continue;
892+
}
893+
ARROW_ASSIGN_OR_RAISE(values[i],
894+
Take(batch[i], *batch.selection_vector->data(),
895+
TakeOptions{/*boundcheck=*/false}, exec_context()));
896+
}
897+
ARROW_ASSIGN_OR_RAISE(
898+
ExecBatch input,
899+
ExecBatch::Make(std::move(values), batch.selection_vector->length()));
900+
901+
DatumAccumulator dense_listener;
902+
RETURN_NOT_OK(ExecuteBatch(input, &dense_listener));
903+
Datum dense_result = WrapResults(input.values, dense_listener.values());
904+
905+
ARROW_ASSIGN_OR_RAISE(auto result,
906+
Scatter(dense_result, *batch.selection_vector->data(),
907+
ScatterOptions{/*max_index=*/batch.length - 1}));
908+
return listener->OnResult(std::move(result));
824909
}
825910

826-
protected:
827911
Status EmitResult(std::shared_ptr<ArrayData> out, ExecListener* listener) {
828912
if (span_iterator_.have_all_scalars()) {
829913
// ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to
@@ -842,6 +926,11 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
842926
// eventually skip the creation of ArrayData altogether
843927
std::shared_ptr<ArrayData> preallocation;
844928
ExecSpan input;
929+
SelectionVectorSpan selection;
930+
SelectionVectorSpan* selection_ptr = nullptr;
931+
if (span_iterator_.have_selection_vector()) {
932+
selection_ptr = &selection;
933+
}
845934
ExecResult output;
846935
ArraySpan* output_span = output.array_span_mutable();
847936

@@ -853,10 +942,10 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
853942
output_span->SetMembers(*preallocation);
854943
output_span->offset = 0;
855944
int64_t result_offset = 0;
856-
while (span_iterator_.Next(&input)) {
945+
while (span_iterator_.Next(&input, selection_ptr)) {
857946
// Set absolute output span position and length
858947
output_span->SetSlice(result_offset, input.length);
859-
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
948+
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
860949
result_offset = span_iterator_.position();
861950
}
862951

@@ -866,18 +955,19 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
866955
// Fully preallocating, but not contiguously
867956
// We preallocate (maybe) only for the output of processing the current
868957
// chunk
869-
while (span_iterator_.Next(&input)) {
958+
while (span_iterator_.Next(&input, selection_ptr)) {
870959
ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(input.length));
871960
output_span->SetMembers(*preallocation);
872-
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
961+
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
873962
// Emit the result for this chunk
874963
RETURN_NOT_OK(EmitResult(std::move(preallocation), listener));
875964
}
876965
return Status::OK();
877966
}
878967
}
879968

880-
Status ExecuteSingleSpan(const ExecSpan& input, ExecResult* out) {
969+
Status ExecuteSingleSpan(const ExecSpan& input, const SelectionVectorSpan* selection,
970+
ExecResult* out) {
881971
ArraySpan* result_span = out->array_span_mutable();
882972
if (output_type_.type->id() == Type::NA) {
883973
result_span->null_count = result_span->length;
@@ -888,7 +978,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
888978
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
889979
result_span->null_count = 0;
890980
}
891-
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, out));
981+
RETURN_NOT_OK(InvokeKernel(input, selection, out));
892982
// Output type didn't change
893983
DCHECK(out->is_array_span());
894984
return Status::OK();
@@ -903,8 +993,13 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
903993
// We will eventually delete the Scalar output path per
904994
// ARROW-16757.
905995
ExecSpan input;
996+
SelectionVectorSpan selection;
997+
SelectionVectorSpan* selection_ptr = nullptr;
998+
if (span_iterator_.have_selection_vector()) {
999+
selection_ptr = &selection;
1000+
}
9061001
ExecResult output;
907-
while (span_iterator_.Next(&input)) {
1002+
while (span_iterator_.Next(&input, selection_ptr)) {
9081003
ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length));
9091004
DCHECK(output.is_array_data());
9101005

@@ -917,7 +1012,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
9171012
out_arr->null_count = 0;
9181013
}
9191014

920-
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, &output));
1015+
RETURN_NOT_OK(InvokeKernel(input, selection_ptr, &output));
9211016

9221017
// Output type didn't change
9231018
DCHECK(output.is_array_data());
@@ -983,6 +1078,15 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
9831078
return Status::OK();
9841079
}
9851080

1081+
Status InvokeKernel(const ExecSpan& input, const SelectionVectorSpan* selection,
1082+
ExecResult* out) {
1083+
if (selection) {
1084+
DCHECK_NE(kernel_->selective_exec, nullptr);
1085+
return kernel_->selective_exec(kernel_ctx_, input, *selection, out);
1086+
}
1087+
return kernel_->exec(kernel_ctx_, input, out);
1088+
}
1089+
9861090
// Used to account for the case where we do not preallocate a
9871091
// validity bitmap because the inputs are all non-null and we're
9881092
// using NullHandling::INTERSECTION to compute the validity bitmap
@@ -1352,11 +1456,14 @@ SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
13521456

13531457
SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {}
13541458

1355-
int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
1459+
int64_t SelectionVector::length() const { return data_->length; }
13561460

1357-
Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
1358-
const BooleanArray& arr) {
1359-
return Status::NotImplemented("FromMask");
1461+
void SelectionVectorSpan::SetSlice(int64_t offset, int64_t length,
1462+
int32_t index_back_shift) {
1463+
DCHECK_NE(indices_, nullptr);
1464+
offset_ = offset;
1465+
length_ = length;
1466+
index_back_shift_ = index_back_shift;
13601467
}
13611468

13621469
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,

cpp/src/arrow/compute/exec.h

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,39 @@ class ARROW_EXPORT SelectionVector {
140140

141141
explicit SelectionVector(const Array& arr);
142142

143-
/// \brief Create SelectionVector from boolean mask
144-
static Result<std::shared_ptr<SelectionVector>> FromMask(const BooleanArray& arr);
145-
143+
std::shared_ptr<ArrayData> data() const { return data_; }
146144
const int32_t* indices() const { return indices_; }
147-
int32_t length() const;
145+
int64_t length() const;
148146

149147
private:
150148
std::shared_ptr<ArrayData> data_;
151149
const int32_t* indices_;
152150
};
153151

152+
class ARROW_EXPORT SelectionVectorSpan {
153+
public:
154+
explicit SelectionVectorSpan(const int32_t* indices = NULLPTR, int64_t length = 0,
155+
int64_t offset = 0, int32_t index_back_shift = 0)
156+
: indices_(indices),
157+
length_(length),
158+
offset_(offset),
159+
index_back_shift_(index_back_shift) {}
160+
161+
void SetSlice(int64_t offset, int64_t length, int32_t index_back_shift = 0);
162+
163+
int32_t operator[](int64_t i) const {
164+
return indices_[i + offset_] - index_back_shift_;
165+
}
166+
167+
int64_t length() const { return length_; }
168+
169+
private:
170+
const int32_t* indices_;
171+
int64_t length_;
172+
int64_t offset_;
173+
int32_t index_back_shift_;
174+
};
175+
154176
/// An index to represent that a batch does not belong to an ordered stream
155177
constexpr int64_t kUnsequencedIndex = -1;
156178

@@ -173,8 +195,11 @@ constexpr int64_t kUnsequencedIndex = -1;
173195

174196
struct ARROW_EXPORT ExecBatch {
175197
ExecBatch() = default;
176-
ExecBatch(std::vector<Datum> values, int64_t length)
177-
: values(std::move(values)), length(length) {}
198+
ExecBatch(std::vector<Datum> values, int64_t length,
199+
std::shared_ptr<SelectionVector> selection_vector = NULLPTR)
200+
: values(std::move(values)),
201+
length(length),
202+
selection_vector(std::move(selection_vector)) {}
178203

179204
explicit ExecBatch(const RecordBatch& batch);
180205

@@ -196,13 +221,6 @@ struct ARROW_EXPORT ExecBatch {
196221
/// exec function for processing.
197222
std::vector<Datum> values;
198223

199-
/// A deferred filter represented as an array of indices into the values.
200-
///
201-
/// For example, the filter [true, true, false, true] would be represented as
202-
/// the selection vector [0, 1, 3]. When the selection vector is set,
203-
/// ExecBatch::length is equal to the length of this array.
204-
std::shared_ptr<SelectionVector> selection_vector;
205-
206224
/// A predicate Expression guaranteed to evaluate to true for all rows in this batch.
207225
Expression guarantee = literal(true);
208226

@@ -218,6 +236,13 @@ struct ARROW_EXPORT ExecBatch {
218236
/// whether any values are Scalar.
219237
int64_t length = 0;
220238

239+
/// A deferred filter represented as an array of indices into the values.
240+
///
241+
/// For example, the filter [true, true, false, true] would be represented as
242+
/// the selection vector [0, 1, 3]. When the selection vector is set,
243+
/// ExecBatch::length is equal to the length of this array.
244+
std::shared_ptr<SelectionVector> selection_vector;
245+
221246
/// \brief index of this batch in a sorted stream of batches
222247
///
223248
/// This index must be strictly monotonic starting at 0 without gaps or

0 commit comments

Comments
 (0)