Skip to content

Commit 6e5186b

Browse files
committed
Support selection vector for kernel execution
1 parent fcfb33d commit 6e5186b

File tree

12 files changed

+1598
-221
lines changed

12 files changed

+1598
-221
lines changed

cpp/src/arrow/compute/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ add_arrow_compute_test(row_test
174174
EXTRA_LINK_LIBS
175175
arrow_compute_testing)
176176

177+
add_arrow_compute_benchmark(exec_benchmark)
178+
177179
add_arrow_compute_benchmark(function_benchmark)
178180

179181
add_subdirectory(kernels)

cpp/src/arrow/compute/exec.cc

Lines changed: 179 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
#include "arrow/util/logging_internal.h"
5151
#include "arrow/util/thread_pool.h"
5252
#include "arrow/util/vector.h"
53+
#include "arrow/visit_data_inline.h"
5354

5455
namespace arrow {
5556

@@ -359,6 +360,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
359360
have_all_scalars_ = CheckIfAllScalar(batch);
360361
promote_if_all_scalars_ = promote_if_all_scalars;
361362
position_ = 0;
363+
selection_position_ = 0;
362364
length_ = batch.length;
363365
chunk_indexes_.clear();
364366
chunk_indexes_.resize(args_->size(), 0);
@@ -367,6 +369,12 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
367369
value_offsets_.clear();
368370
value_offsets_.resize(args_->size(), 0);
369371
max_chunksize_ = std::min(length_, max_chunksize);
372+
selection_vector_ = batch.selection_vector.get();
373+
if (selection_vector_) {
374+
selection_length_ = selection_vector_->length();
375+
} else {
376+
selection_length_ = 0;
377+
}
370378
return Status::OK();
371379
}
372380

@@ -403,7 +411,7 @@ int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* spa
403411
return iteration_size;
404412
}
405413

406-
bool ExecSpanIterator::Next(ExecSpan* span) {
414+
bool ExecSpanIterator::Next(ExecSpan* span, SelectionVectorSpan* selection_span) {
407415
if (!initialized_) {
408416
span->length = 0;
409417

@@ -442,6 +450,13 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
442450
PromoteExecSpanScalars(span);
443451
}
444452

453+
if (!have_all_scalars_ || promote_if_all_scalars_) {
454+
if (selection_vector_) {
455+
DCHECK_NE(selection_span, nullptr);
456+
*selection_span = SelectionVectorSpan(selection_vector_->indices());
457+
}
458+
}
459+
445460
initialized_ = true;
446461
} else if (position_ == length_) {
447462
// We've emitted at least one span and we're at the end so we are done
@@ -465,6 +480,23 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
465480
}
466481
}
467482

483+
// Then the selection span
484+
if (selection_vector_) {
485+
DCHECK_NE(selection_span, nullptr);
486+
auto indices_begin = selection_vector_->indices() + selection_position_;
487+
auto indices_end = selection_vector_->indices() + selection_vector_->length();
488+
DCHECK_LE(indices_begin, indices_end);
489+
auto chunk_row_id_end = position_ + iteration_size;
490+
int64_t num_indices = 0;
491+
while (indices_begin + num_indices < indices_end &&
492+
*(indices_begin + num_indices) < chunk_row_id_end) {
493+
++num_indices;
494+
}
495+
selection_span->SetSlice(selection_position_, num_indices,
496+
static_cast<int32_t>(position_));
497+
selection_position_ += num_indices;
498+
}
499+
468500
position_ += iteration_size;
469501
DCHECK_LE(position_, length_);
470502
return true;
@@ -694,7 +726,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
694726
// Skip empty chunks
695727
continue;
696728
}
697-
arrays.emplace_back(val.make_array());
729+
if (val.is_chunked_array()) {
730+
for (const auto& chunk : val.chunked_array()->chunks()) {
731+
arrays.emplace_back(chunk);
732+
}
733+
} else {
734+
DCHECK(val.is_array());
735+
arrays.emplace_back(val.make_array());
736+
}
698737
}
699738
return std::make_shared<ChunkedArray>(std::move(arrays), type.GetSharedPtr());
700739
}
@@ -781,17 +820,45 @@ class KernelExecutorImpl : public KernelExecutor {
781820
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
782821
public:
783822
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
784-
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
785-
786823
if (batch.length == 0) {
787824
// For zero-length batches, we do nothing except return a zero-length
788825
// array of the correct output type
789826
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
790827
MakeArrayOfNull(output_type_.GetSharedPtr(), /*length=*/0,
791828
exec_context()->memory_pool()));
829+
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
792830
return EmitResult(result->data(), listener);
793831
}
794832

833+
if (batch.selection_vector && !kernel_->selective_exec) {
834+
// If the batch contains a selection vector but the kernel does not support
835+
// selective execution, we need to execute the batch in a "dense" manner.
836+
return ExecuteSelectiveDense(batch, listener);
837+
}
838+
839+
return ExecuteBatch(batch, listener);
840+
}
841+
842+
Datum WrapResults(const std::vector<Datum>& inputs,
843+
const std::vector<Datum>& outputs) override {
844+
// If execution yielded multiple chunks (because large arrays were split
845+
// based on the ExecContext parameters, then the result is a ChunkedArray
846+
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
847+
return ToChunkedArray(outputs, output_type_);
848+
} else {
849+
// Outputs have just one element
850+
return outputs[0];
851+
}
852+
}
853+
854+
protected:
855+
// Execute a single batch either non-selectively (batch doesn't contain a selection
856+
// vector) or selectively (kernel supports selective execution).
857+
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
858+
DCHECK(!batch.selection_vector || kernel_->selective_exec);
859+
860+
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
861+
795862
// If the executor is configured to produce a single large Array output for
796863
// kernels supporting preallocation, then we do so up front and then
797864
// iterate over slices of that large array. Otherwise, we preallocate prior
@@ -811,19 +878,46 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
811878
}
812879
}
813880

814-
Datum WrapResults(const std::vector<Datum>& inputs,
815-
const std::vector<Datum>& outputs) override {
816-
// If execution yielded multiple chunks (because large arrays were split
817-
// based on the ExecContext parameters, then the result is a ChunkedArray
818-
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
819-
return ToChunkedArray(outputs, output_type_);
820-
} else {
821-
// Outputs have just one element
822-
return outputs[0];
881+
// Execute a single batch with a selection vector "densely" for a kernel that doesn't
882+
// support selective execution. "Densely" here means that we first gather the rows
883+
// indicated by the selection vector into a contiguous ExecBatch, execute that, and
884+
// then scatter the result back to the original row positions.
885+
Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) {
886+
DCHECK(batch.selection_vector && !kernel_->selective_exec);
887+
888+
if (CheckIfAllScalar(batch)) {
889+
// For all-scalar batch, we can skip the gather/scatter steps as if there is no
890+
// selection vector - the result is a scalar anyway.
891+
ExecBatch input = batch;
892+
input.selection_vector = nullptr;
893+
return ExecuteBatch(input, listener);
823894
}
895+
896+
std::vector<Datum> values(batch.num_values());
897+
for (int i = 0; i < batch.num_values(); ++i) {
898+
if (batch[i].is_scalar()) {
899+
// XXX: Skip gather for scalars since it is not currently supported by Take.
900+
values[i] = batch[i];
901+
continue;
902+
}
903+
ARROW_ASSIGN_OR_RAISE(values[i],
904+
Take(batch[i], *batch.selection_vector->data(),
905+
TakeOptions{/*boundcheck=*/false}, exec_context()));
906+
}
907+
ARROW_ASSIGN_OR_RAISE(
908+
ExecBatch input,
909+
ExecBatch::Make(std::move(values), batch.selection_vector->length()));
910+
911+
DatumAccumulator dense_listener;
912+
RETURN_NOT_OK(ExecuteBatch(input, &dense_listener));
913+
Datum dense_result = WrapResults(input.values, dense_listener.values());
914+
915+
ARROW_ASSIGN_OR_RAISE(auto result,
916+
Scatter(dense_result, *batch.selection_vector->data(),
917+
ScatterOptions{/*max_index=*/batch.length - 1}));
918+
return listener->OnResult(std::move(result));
824919
}
825920

826-
protected:
827921
Status EmitResult(std::shared_ptr<ArrayData> out, ExecListener* listener) {
828922
if (span_iterator_.have_all_scalars()) {
829923
// ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to
@@ -842,6 +936,11 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
842936
// eventually skip the creation of ArrayData altogether
843937
std::shared_ptr<ArrayData> preallocation;
844938
ExecSpan input;
939+
SelectionVectorSpan selection;
940+
SelectionVectorSpan* selection_ptr = nullptr;
941+
if (span_iterator_.have_selection_vector()) {
942+
selection_ptr = &selection;
943+
}
845944
ExecResult output;
846945
ArraySpan* output_span = output.array_span_mutable();
847946

@@ -853,10 +952,10 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
853952
output_span->SetMembers(*preallocation);
854953
output_span->offset = 0;
855954
int64_t result_offset = 0;
856-
while (span_iterator_.Next(&input)) {
955+
while (span_iterator_.Next(&input, selection_ptr)) {
857956
// Set absolute output span position and length
858957
output_span->SetSlice(result_offset, input.length);
859-
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
958+
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
860959
result_offset = span_iterator_.position();
861960
}
862961

@@ -866,18 +965,19 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
866965
// Fully preallocating, but not contiguously
867966
// We preallocate (maybe) only for the output of processing the current
868967
// chunk
869-
while (span_iterator_.Next(&input)) {
968+
while (span_iterator_.Next(&input, selection_ptr)) {
870969
ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(input.length));
871970
output_span->SetMembers(*preallocation);
872-
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
971+
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
873972
// Emit the result for this chunk
874973
RETURN_NOT_OK(EmitResult(std::move(preallocation), listener));
875974
}
876975
return Status::OK();
877976
}
878977
}
879978

880-
Status ExecuteSingleSpan(const ExecSpan& input, ExecResult* out) {
979+
Status ExecuteSingleSpan(const ExecSpan& input, const SelectionVectorSpan* selection,
980+
ExecResult* out) {
881981
ArraySpan* result_span = out->array_span_mutable();
882982
if (output_type_.type->id() == Type::NA) {
883983
result_span->null_count = result_span->length;
@@ -888,7 +988,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
888988
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
889989
result_span->null_count = 0;
890990
}
891-
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, out));
991+
RETURN_NOT_OK(ExecuteKernel(input, selection, out));
892992
// Output type didn't change
893993
DCHECK(out->is_array_span());
894994
return Status::OK();
@@ -903,8 +1003,13 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
9031003
// We will eventually delete the Scalar output path per
9041004
// ARROW-16757.
9051005
ExecSpan input;
1006+
SelectionVectorSpan selection;
1007+
SelectionVectorSpan* selection_ptr = nullptr;
1008+
if (span_iterator_.have_selection_vector()) {
1009+
selection_ptr = &selection;
1010+
}
9061011
ExecResult output;
907-
while (span_iterator_.Next(&input)) {
1012+
while (span_iterator_.Next(&input, selection_ptr)) {
9081013
ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length));
9091014
DCHECK(output.is_array_data());
9101015

@@ -917,7 +1022,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
9171022
out_arr->null_count = 0;
9181023
}
9191024

920-
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, &output));
1025+
RETURN_NOT_OK(ExecuteKernel(input, selection_ptr, &output));
9211026

9221027
// Output type didn't change
9231028
DCHECK(output.is_array_data());
@@ -983,6 +1088,17 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
9831088
return Status::OK();
9841089
}
9851090

1091+
// Actually invoke the kernel on the given input span, either selectively if there is a
1092+
// selection or non-selectively otherwise.
1093+
Status ExecuteKernel(const ExecSpan& input, const SelectionVectorSpan* selection,
1094+
ExecResult* out) {
1095+
if (selection) {
1096+
DCHECK_NE(kernel_->selective_exec, nullptr);
1097+
return kernel_->selective_exec(kernel_ctx_, input, *selection, out);
1098+
}
1099+
return kernel_->exec(kernel_ctx_, input, out);
1100+
}
1101+
9861102
// Used to account for the case where we do not preallocate a
9871103
// validity bitmap because the inputs are all non-null and we're
9881104
// using NullHandling::INTERSECTION to compute the validity bitmap
@@ -1345,18 +1461,53 @@ const CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }
13451461

13461462
SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
13471463
: data_(std::move(data)) {
1348-
DCHECK_EQ(Type::INT32, data_->type->id());
1349-
DCHECK_EQ(0, data_->GetNullCount());
1464+
DCHECK_NE(data_, nullptr);
1465+
DCHECK_EQ(data_->type->id(), Type::INT32);
13501466
indices_ = data_->GetValues<int32_t>(1);
13511467
}
13521468

13531469
SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {}
13541470

1355-
int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
1471+
int64_t SelectionVector::length() const { return data_->length; }
1472+
1473+
Status SelectionVector::Validate(int64_t values_length) const {
1474+
if (data_ == nullptr) {
1475+
return Status::Invalid("SelectionVector not initialized");
1476+
}
1477+
ARROW_CHECK_NE(indices_, nullptr);
1478+
if (data_->type->id() != Type::INT32) {
1479+
return Status::Invalid("SelectionVector must be of type int32");
1480+
}
1481+
if (data_->GetNullCount() != 0) {
1482+
return Status::Invalid("SelectionVector cannot contain nulls");
1483+
}
1484+
for (int64_t i = 1; i < length(); ++i) {
1485+
if (indices_[i - 1] > indices_[i]) {
1486+
return Status::Invalid("SelectionVector indices must be sorted");
1487+
}
1488+
}
1489+
for (int64_t i = 0; i < length(); ++i) {
1490+
if (indices_[i] < 0) {
1491+
return Status::Invalid("SelectionVector indices must be non-negative");
1492+
}
1493+
}
1494+
if (values_length >= 0) {
1495+
for (int64_t i = 0; i < length(); ++i) {
1496+
if (indices_[i] >= values_length) {
1497+
return Status::Invalid("SelectionVector index ", indices_[i],
1498+
" >= values length ", values_length);
1499+
}
1500+
}
1501+
}
1502+
return Status::OK();
1503+
}
13561504

1357-
Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
1358-
const BooleanArray& arr) {
1359-
return Status::NotImplemented("FromMask");
1505+
void SelectionVectorSpan::SetSlice(int64_t offset, int64_t length,
1506+
int32_t index_back_shift) {
1507+
DCHECK_NE(indices_, nullptr);
1508+
offset_ = offset;
1509+
length_ = length;
1510+
index_back_shift_ = index_back_shift;
13601511
}
13611512

13621513
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,

0 commit comments

Comments
 (0)