Skip to content

Commit 0de6e41

Browse files
author
Rafał Hibner
committed
Use not null vector for centroids
1 parent 4048db2 commit 0de6e41

File tree

6 files changed

+147
-175
lines changed

6 files changed

+147
-175
lines changed

cpp/src/arrow/compute/api_aggregate.cc

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,11 @@ static auto kTDigestMapOptionsType = GetFunctionOptionsType<TDigestMapOptions>(
147147
DataMember("skip_nulls", &TDigestMapOptions::skip_nulls),
148148
DataMember("scaler", &TDigestMapOptions::scaler));
149149
static auto kTDigestReduceOptionsType = GetFunctionOptionsType<TDigestReduceOptions>(
150+
DataMember("delta", &TDigestReduceOptions::delta),
150151
DataMember("scaler", &TDigestReduceOptions::scaler));
151152
static auto kTDigestQuantileOptionsType = GetFunctionOptionsType<TDigestQuantileOptions>(
152153
DataMember("q", &TDigestQuantileOptions::q),
154+
DataMember("delta", &TDigestQuantileOptions::delta),
153155
DataMember("min_count", &TDigestQuantileOptions::min_count),
154156
DataMember("scaler", &TDigestQuantileOptions::scaler));
155157
static auto kPivotOptionsType = GetFunctionOptionsType<PivotWiderOptions>(
@@ -236,24 +238,28 @@ TDigestMapOptions::TDigestMapOptions(uint32_t delta, uint32_t buffer_size,
236238
scaler{scaler} {}
237239
constexpr char TDigestMapOptions::kTypeName[];
238240

239-
TDigestReduceOptions::TDigestReduceOptions(Scaler scaler)
240-
: FunctionOptions(internal::kTDigestReduceOptionsType), scaler{scaler} {}
241+
TDigestReduceOptions::TDigestReduceOptions(uint32_t delta, Scaler scaler)
242+
: FunctionOptions(internal::kTDigestReduceOptionsType),
243+
delta(delta),
244+
scaler{scaler} {}
241245
constexpr char TDigestReduceOptions::kTypeName[];
242246

243-
TDigestQuantileOptions::TDigestQuantileOptions(double q, uint32_t min_count,
244-
Scaler scaler)
247+
TDigestQuantileOptions::TDigestQuantileOptions(double q, uint32_t delta,
248+
uint32_t min_count, Scaler scaler)
245249
: FunctionOptions(internal::kTDigestQuantileOptionsType),
246250
q{q},
251+
delta(delta),
247252
min_count{min_count},
248253
scaler{scaler} {}
249254

250-
TDigestQuantileOptions::TDigestQuantileOptions(std::vector<double> q, uint32_t min_count,
251-
Scaler scaler)
255+
TDigestQuantileOptions::TDigestQuantileOptions(std::vector<double> q, uint32_t delta,
256+
uint32_t min_count, Scaler scaler)
252257
: FunctionOptions(internal::kTDigestQuantileOptionsType),
253258
q{std::move(q)},
259+
delta(delta),
254260
min_count{min_count},
255261
scaler{scaler} {}
256-
constexpr char TDigestReduceOptions::kTypeName[];
262+
constexpr char TDigestQuantileOptions::kTypeName[];
257263

258264
PivotWiderOptions::PivotWiderOptions(std::vector<std::string> key_names,
259265
UnexpectedKeyBehavior unexpected_key_behavior)

cpp/src/arrow/compute/api_aggregate.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,12 @@ class ARROW_EXPORT TDigestReduceOptions : public FunctionOptions {
231231
public:
232232
using Scaler = TDigestOptions::Scaler;
233233

234-
explicit TDigestReduceOptions(Scaler scaler = Scaler::K1);
234+
explicit TDigestReduceOptions(uint32_t delta = 100, Scaler scaler = Scaler::K1);
235235
static constexpr char const kTypeName[] = "TDigestReduceOptions";
236236
static TDigestReduceOptions Defaults() { return TDigestReduceOptions{}; }
237237

238+
/// compression parameter, default 100
239+
uint32_t delta;
238240
/// select scaler implementation
239241
Scaler scaler;
240242
};
@@ -246,15 +248,17 @@ class ARROW_EXPORT TDigestQuantileOptions : public FunctionOptions {
246248
public:
247249
using Scaler = TDigestOptions::Scaler;
248250

249-
explicit TDigestQuantileOptions(double q = 0.5, uint32_t min_count = 0,
250-
Scaler scaler = Scaler::K1);
251-
explicit TDigestQuantileOptions(std::vector<double> q, uint32_t min_count = 0,
252-
Scaler scaler = Scaler::K1);
251+
explicit TDigestQuantileOptions(double q = 0.5, uint32_t delta = 100,
252+
uint32_t min_count = 0, Scaler scaler = Scaler::K1);
253+
explicit TDigestQuantileOptions(std::vector<double> q, uint32_t delta = 100,
254+
uint32_t min_count = 0, Scaler scaler = Scaler::K1);
253255
static constexpr char const kTypeName[] = "TDigestQuantileOptions";
254256
static TDigestQuantileOptions Defaults() { return TDigestQuantileOptions{}; }
255257

256258
/// probability level of quantile must be between 0 and 1 inclusive
257259
std::vector<double> q;
260+
/// compression parameter, default 100
261+
uint32_t delta;
258262
/// If less than this many non-null values are observed, emit null.
259263
uint32_t min_count;
260264
/// select scaler implementation

cpp/src/arrow/compute/kernels/aggregate_tdigest.cc

Lines changed: 30 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ using arrow::internal::VisitSetBitRunsVoid;
3535
struct TDigestBaseImpl : public ScalarAggregator {
3636
explicit TDigestBaseImpl(std::unique_ptr<TDigest::Scaler> scaler, uint32_t buffer_size)
3737
: tdigest{std::move(scaler), buffer_size}, count{0}, all_valid{true} {
38-
auto output_size = tdigest.delta();
3938
out_type = struct_({
40-
field("mean", fixed_size_list(float64(), output_size), false),
41-
field("weight", fixed_size_list(float64(), output_size), false),
39+
field("mean", list(field("item", float64(), false)), false),
40+
field("weight", list(field("item", float64(), false)), false),
4241
field("min", float64(), true),
4342
field("max", float64(), true),
4443
field("count", uint64(), false),
@@ -118,7 +117,7 @@ struct TDigestCentroidFinalizer : public TDigestBaseImpl {
118117
*out = MakeNullScalar(out_type);
119118
} else {
120119
// Float64Array
121-
const int64_t out_length = tdigest.delta();
120+
const int64_t out_length = this->tdigest.GetCentroidCount();
122121
auto mean_data = ArrayData::Make(float64(), out_length, 0);
123122
mean_data->buffers.resize(2, nullptr);
124123
ARROW_ASSIGN_OR_RAISE(mean_data->buffers[1],
@@ -130,30 +129,14 @@ struct TDigestCentroidFinalizer : public TDigestBaseImpl {
130129
ARROW_ASSIGN_OR_RAISE(weight_data->buffers[1],
131130
ctx->Allocate(out_length * sizeof(double)));
132131
double* weight_buffer = weight_data->template GetMutableValues<double>(1);
133-
134-
ARROW_ASSIGN_OR_RAISE(auto bitmap, ctx->AllocateBitmap(out_length));
135-
auto bitmap_data = bitmap->mutable_data();
136-
bit_util::SetBitsTo(bitmap_data, 0, out_length, true);
137-
mean_data->buffers[0] = bitmap;
138-
weight_data->buffers[0] = bitmap;
139-
140132
for (int64_t i = 0; i < out_length; ++i) {
141-
auto maybe_c = this->tdigest.GetCentroid(i);
142-
if (maybe_c) {
143-
std::tie(mean_buffer[i], weight_buffer[i]) = *std::move(maybe_c);
144-
} else {
145-
bit_util::SetBitsTo(bitmap_data, i, out_length, false);
146-
auto null_count = out_length - i;
147-
std::fill(mean_buffer + i, mean_buffer + out_length, 0.0);
148-
std::fill(weight_buffer + i, weight_buffer + out_length, 0.0);
149-
mean_data->SetNullCount(null_count);
150-
weight_data->SetNullCount(null_count);
151-
break;
152-
}
133+
std::tie(mean_buffer[i], weight_buffer[i]) = this->tdigest.GetCentroid(i);
153134
}
154135

155-
auto mean = std::make_shared<FixedSizeListScalar>(MakeArray(mean_data));
156-
auto weight = std::make_shared<FixedSizeListScalar>(MakeArray(weight_data));
136+
auto mean = std::make_shared<ListScalar>(MakeArray(mean_data),
137+
list(field("item", float64(), false)));
138+
auto weight = std::make_shared<ListScalar>(MakeArray(weight_data),
139+
list(field("item", float64(), false)));
157140
auto count = std::make_shared<UInt64Scalar>(this->count);
158141
std::shared_ptr<Scalar> min, max;
159142
if (this->count) {
@@ -239,32 +222,22 @@ struct TDigestCentroidConsumerImpl : public TDigestFinalizer_T {
239222
Status Consume(const Scalar* scalar) {
240223
const auto* input_struct_scalar = checked_cast<const StructScalar*>(scalar);
241224
auto mean_array =
242-
checked_cast<const FixedSizeListScalar*>(input_struct_scalar->value[0].get())
243-
->value;
225+
checked_cast<const ListScalar*>(input_struct_scalar->value[0].get())->value;
244226
auto weight_array =
245-
checked_cast<const FixedSizeListScalar*>(input_struct_scalar->value[1].get())
246-
->value;
227+
checked_cast<const ListScalar*>(input_struct_scalar->value[1].get())->value;
247228
auto min = checked_cast<const DoubleScalar*>(input_struct_scalar->value[2].get());
248229
auto max = checked_cast<const DoubleScalar*>(input_struct_scalar->value[3].get());
249230
auto count = checked_cast<const UInt64Scalar*>(input_struct_scalar->value[4].get());
250231
auto mean_double_array = checked_cast<const DoubleArray*>(mean_array.get());
251232
auto weight_double_array = checked_cast<const DoubleArray*>(weight_array.get());
252-
DCHECK_EQ(mean_double_array->length(), this->tdigest.delta());
253-
DCHECK_EQ(weight_double_array->length(), this->tdigest.delta());
254-
233+
DCHECK_EQ(mean_double_array->length(), weight_double_array->length());
255234
if (min->is_valid) {
256235
DCHECK(max->is_valid);
257236
this->tdigest.SetMinMax(min->value, max->value);
258-
259237
} else {
260238
DCHECK(!max->is_valid);
261239
}
262-
for (int64_t i = 0; i < this->tdigest.delta(); i++) {
263-
if (mean_double_array->IsNull(i)) {
264-
break;
265-
}
266-
DCHECK(weight_double_array->IsValid(i));
267-
240+
for (int64_t i = 0; i < mean_double_array->length(); i++) {
268241
this->tdigest.NanAdd(mean_double_array->Value(i), weight_double_array->Value(i));
269242
}
270243
this->count += count->value;
@@ -325,25 +298,25 @@ struct TDigestMapImpl
325298

326299
struct TDigestReduceImpl : public TDigestCentroidConsumerImpl<TDigestCentroidFinalizer> {
327300
explicit TDigestReduceImpl(const TDigestReduceOptions& options,
328-
std::unique_ptr<TDigest::Scaler> scaler, uint32_t size)
301+
std::unique_ptr<TDigest::Scaler> scaler)
329302
: TDigestCentroidConsumerImpl<TDigestCentroidFinalizer>(
330303
// TDigestCentroidConsumerImpl
331304
// TDigestCentroidFinalizer
332305
// TDigestBaseImpl
333-
std::move(scaler), size) {}
306+
std::move(scaler), options.delta) {}
334307
};
335308

336309
struct TDigestQuantileImpl
337310
: public TDigestCentroidConsumerImpl<TDigestQuantileFinalizer> {
338311
explicit TDigestQuantileImpl(const TDigestQuantileOptions& options,
339-
std::unique_ptr<TDigest::Scaler> scaler, uint32_t size)
312+
std::unique_ptr<TDigest::Scaler> scaler)
340313
: TDigestCentroidConsumerImpl<TDigestQuantileFinalizer>(
341314

342315
// TDigestCentroidConsumerImpl
343316
// TDigestQuantileFinalizer
344317
options.q, options.min_count,
345318
// TDigestBaseImpl
346-
std::move(scaler), size) {}
319+
std::move(scaler), options.delta) {}
347320
};
348321

349322
template <template <typename> typename TDigestImpl_T, typename TDigestOptions_T>
@@ -390,31 +363,28 @@ struct TDigestInitState {
390363
struct TDigestCentroidTypeMatcher : public TypeMatcher {
391364
~TDigestCentroidTypeMatcher() override = default;
392365

393-
static Result<uint32_t> getDelta(const DataType& type) {
366+
bool Matches(const DataType& type) const override {
394367
if (Type::STRUCT == type.id()) {
395368
const auto& input_struct_type = checked_cast<const StructType&>(type);
396369
if (5 == input_struct_type.num_fields()) {
397-
if (Type::FIXED_SIZE_LIST == input_struct_type.field(0)->type()->id() &&
370+
if (Type::LIST == input_struct_type.field(0)->type()->id() &&
398371
input_struct_type.field(0)->type()->Equals(
399372
input_struct_type.field(1)->type()) &&
400373
Type::DOUBLE == input_struct_type.field(2)->type()->id() &&
401374
Type::DOUBLE == input_struct_type.field(3)->type()->id() &&
402375
Type::UINT64 == input_struct_type.field(4)->type()->id()) {
403-
auto fsl = checked_cast<const FixedSizeListType*>(
404-
input_struct_type.field(0)->type().get());
405-
return fsl->list_size();
376+
return true;
406377
}
407378
}
408379
}
409-
return Status::Invalid("Type ", type.ToString(), " does not match ",
410-
ToStringStatic());
380+
return false;
411381
}
412382

413-
bool Matches(const DataType& type) const override { return getDelta(type).ok(); }
414-
415383
static std::string ToStringStatic() {
416-
return "struct{mean:fixed_size_list<item: double>[N], weight:fixed_size_list<item: "
417-
"double>[N], min:float64, max:float64, count:int64}";
384+
return "struct{mean:list<item: double not null>[N] not null, "
385+
"weight:fixed_size_list<item: "
386+
"double not null>[N] not null, min:float64, max:float64, count:int64 not "
387+
"null}";
418388
}
419389
std::string ToString() const override { return ToStringStatic(); }
420390

@@ -448,20 +418,18 @@ Result<std::unique_ptr<KernelState>> TDigestMapInit(KernelContext* ctx,
448418

449419
Result<std::unique_ptr<KernelState>> TDigestReduceInit(KernelContext* ctx,
450420
const KernelInitArgs& args) {
451-
ARROW_ASSIGN_OR_RAISE(uint32_t delta,
452-
TDigestCentroidTypeMatcher::getDelta(*args.inputs[0].type));
453421
auto options = static_cast<const TDigestReduceOptions&>(*args.options);
454-
ARROW_ASSIGN_OR_RAISE(auto scaler, TDigestBaseImpl::MakeScaler(options.scaler, delta));
455-
return std::make_unique<TDigestReduceImpl>(options, std::move(scaler), delta);
422+
ARROW_ASSIGN_OR_RAISE(auto scaler,
423+
TDigestBaseImpl::MakeScaler(options.scaler, options.delta));
424+
return std::make_unique<TDigestReduceImpl>(options, std::move(scaler));
456425
}
457426

458427
Result<std::unique_ptr<KernelState>> TDigestQuantileInit(KernelContext* ctx,
459428
const KernelInitArgs& args) {
460-
ARROW_ASSIGN_OR_RAISE(uint32_t delta,
461-
TDigestCentroidTypeMatcher::getDelta(*args.inputs[0].type));
462429
auto options = static_cast<const TDigestQuantileOptions&>(*args.options);
463-
ARROW_ASSIGN_OR_RAISE(auto scaler, TDigestBaseImpl::MakeScaler(options.scaler, delta));
464-
return std::make_unique<TDigestQuantileImpl>(options, std::move(scaler), delta);
430+
ARROW_ASSIGN_OR_RAISE(auto scaler,
431+
TDigestBaseImpl::MakeScaler(options.scaler, options.delta));
432+
return std::make_unique<TDigestQuantileImpl>(options, std::move(scaler));
465433
}
466434

467435
void AddTDigestKernels(KernelInit init,

0 commit comments

Comments
 (0)