Skip to content

Commit 88504b7

Browse files
jiangtianjiangtian
authored andcommitted
Destroy the previous Accumulator for all aggregates
1 parent 5fcc416 commit 88504b7

23 files changed

+108
-0
lines changed

velox/exec/SimpleAggregateAdapter.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,14 @@ class SimpleAggregateAdapter : public Aggregate {
366366
folly::Range<const vector_size_t*> indices) override {
367367
setAllNulls(groups, indices);
368368
for (auto i : indices) {
369+
if (isInitialized(groups[i])) {
370+
if constexpr (accumulator_custom_destroy_) {
371+
auto accumulator = value<typename FUNC::AccumulatorType>(groups[i]);
372+
accumulator->destroy(allocator_);
373+
} else {
374+
destroyAccumulator<typename FUNC::AccumulatorType>(groups[i]);
375+
}
376+
}
369377
new (groups[i] + offset_)
370378
typename FUNC::AccumulatorType(allocator_, fn_.get());
371379
}

velox/exec/tests/utils/SumNonPODAggregate.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ class SumNonPODAggregate : public Aggregate {
123123
char** groups,
124124
folly::Range<const velox::vector_size_t*> indices) override {
125125
for (auto i : indices) {
126+
if (isInitialized(groups[i])) {
127+
value<NonPODInt64>(groups[i])->~NonPODInt64();
128+
}
126129
char* group = value<char>(groups[i]);
127130
VELOX_CHECK_EQ(reinterpret_cast<uintptr_t>(group) % alignment_, 0);
128131
new (group) NonPODInt64(0);

velox/functions/lib/aggregates/MinMaxAggregateBase.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ class MinMaxAggregateBase : public exec::Aggregate {
467467
folly::Range<const vector_size_t*> indices) override {
468468
exec::Aggregate::setAllNulls(groups, indices);
469469
for (auto i : indices) {
470+
if (isInitialized(groups[i])) {
471+
value<SingleValueAccumulator>(groups[i])->destroy(allocator_);
472+
}
470473
new (groups[i] + offset_) SingleValueAccumulator();
471474
}
472475
}

velox/functions/lib/aggregates/MinMaxByAggregatesBase.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,9 @@ class MinMaxByAggregateBase : public exec::Aggregate {
497497
if constexpr (std::is_same_v<
498498
ValueAccumulatorType,
499499
SingleValueAccumulator>) {
500+
if (isInitialized(group)) {
501+
value(group)->destroy(allocator_);
502+
}
500503
new (group + offset_) SingleValueAccumulator();
501504
} else {
502505
*value(group) = ValueAccumulatorType();
@@ -507,6 +510,9 @@ class MinMaxByAggregateBase : public exec::Aggregate {
507510
SingleValueAccumulator>) {
508511
*comparisonValue(group) = ComparisonAccumulatorType();
509512
} else {
513+
if (isInitialized(group)) {
514+
comparisonValue(group)->destroy(allocator_);
515+
}
510516
new (group + offset_ + sizeof(ValueAccumulatorType))
511517
SingleValueAccumulator();
512518
}

velox/functions/lib/aggregates/SetBaseAggregate.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ class SetBaseAggregate : public exec::Aggregate {
201201
const auto& type = resultType()->childAt(0);
202202
exec::Aggregate::setAllNulls(groups, indices);
203203
for (auto i : indices) {
204+
if (isInitialized(groups[i])) {
205+
value(groups[i])->free(*allocator_);
206+
}
204207
new (groups[i] + offset_) AccumulatorType(type, allocator_);
205208
}
206209
}

velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,9 @@ class ApproxPercentileAggregate : public exec::Aggregate {
489489
exec::Aggregate::setAllNulls(groups, indices);
490490
for (auto i : indices) {
491491
auto group = groups[i];
492+
if (isInitialized(group)) {
493+
value<KllSketchAccumulator<T>>(group)->~KllSketchAccumulator<T>();
494+
}
492495
new (group + offset_)
493496
KllSketchAccumulator<T>(allocator_, fixedRandomSeed_);
494497
}

velox/functions/prestosql/aggregates/ArbitraryAggregate.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,15 @@ class NonNumericArbitrary : public exec::Aggregate {
375375
char** groups,
376376
folly::Range<const vector_size_t*> indices) override {
377377
for (auto i : indices) {
378+
if (isInitialized(groups[i])) {
379+
if (clusteredInput_) {
380+
auto* accumulator = value<ClusteredNonNumericAccumulator>(groups[i]);
381+
std::destroy_at(accumulator);
382+
} else {
383+
auto* accumulator = value<SingleValueAccumulator>(groups[i]);
384+
accumulator->destroy(allocator_);
385+
}
386+
}
378387
if (clusteredInput_) {
379388
new (groups[i] + offset_) ClusteredNonNumericAccumulator();
380389
} else {

velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,14 @@ class ArrayAggAggregate : public exec::Aggregate {
331331
char** groups,
332332
folly::Range<const vector_size_t*> indices) override {
333333
for (auto index : indices) {
334+
if (isInitialized(groups[index])) {
335+
if (clusteredInput_) {
336+
auto* accumulator = value<ClusteredAccumulator>(groups[index]);
337+
std::destroy_at(accumulator);
338+
} else {
339+
value<ArrayAccumulator>(groups[index])->elements.free(allocator_);
340+
}
341+
}
334342
if (clusteredInput_) {
335343
new (groups[index] + offset_) ClusteredAccumulator();
336344
} else {

velox/functions/prestosql/aggregates/ClassificationAggregation.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,9 @@ class ClassificationAggregation : public exec::Aggregate {
582582
exec::Aggregate::setAllNulls(groups, indices);
583583
for (auto i : indices) {
584584
auto group = groups[i];
585+
if (isInitialized(group)) {
586+
destroyAccumulator<Accumulator<type>>(group);
587+
}
585588
new (group + offset_) Accumulator<type>(allocator_);
586589
}
587590
}

velox/functions/prestosql/aggregates/HistogramAggregate.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ class HistogramAggregate : public exec::Aggregate {
528528
folly::Range<const vector_size_t*> indices) override {
529529
const auto& type = resultType()->childAt(0);
530530
for (auto index : indices) {
531+
if (isInitialized(groups[index])) {
532+
value<AccumulatorType>(groups[index])->free(*allocator_);
533+
}
531534
new (groups[index] + offset_) AccumulatorType{type, allocator_};
532535
}
533536
}

0 commit comments

Comments
 (0)