From 2faa400782adedb1681d6244a56ec209421a5dbd Mon Sep 17 00:00:00 2001 From: Doug Barker Date: Tue, 20 May 2025 00:52:56 -0600 Subject: [PATCH] [SDK] Fix Base2ExponentialHistogramAggregation Merge with empty buckets (#3425) --- ...base2_exponential_histogram_aggregation.cc | 59 +++++++----- sdk/test/metrics/aggregation_test.cc | 94 +++++++++++++++++++ 2 files changed, 130 insertions(+), 23 deletions(-) diff --git a/sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc b/sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc index ea98c88a0b..42fd73d89d 100644 --- a/sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc +++ b/sdk/src/metrics/aggregation/base2_exponential_histogram_aggregation.cc @@ -260,6 +260,16 @@ std::unique_ptr Base2ExponentialHistogramAggregation::Merge( auto right = nostd::get( (static_cast(delta).ToPoint())); + if (left.count_ == 0) + { + return std::make_unique(std::move(right)); + } + + if (right.count_ == 0) + { + return std::make_unique(std::move(left)); + } + auto &low_res = left.scale_ < right.scale_ ? left : right; auto &high_res = left.scale_ < right.scale_ ? right : left; @@ -289,30 +299,33 @@ std::unique_ptr Base2ExponentialHistogramAggregation::Merge( } } - auto pos_min_index = - (std::min)(low_res.positive_buckets_->StartIndex(), high_res.positive_buckets_->StartIndex()); - auto pos_max_index = - (std::max)(low_res.positive_buckets_->EndIndex(), high_res.positive_buckets_->EndIndex()); - auto neg_min_index = - (std::min)(low_res.negative_buckets_->StartIndex(), high_res.negative_buckets_->StartIndex()); - auto neg_max_index = - (std::max)(low_res.negative_buckets_->EndIndex(), high_res.negative_buckets_->EndIndex()); - - if (static_cast(pos_max_index) > - static_cast(pos_min_index) + result_value.max_buckets_ || - static_cast(neg_max_index) > - static_cast(neg_min_index) + result_value.max_buckets_) + if (!low_res.positive_buckets_->Empty() && !high_res.positive_buckets_->Empty()) { - // We need to downscale the buckets to fit into the new max_buckets_. - const uint32_t scale_reduction = - GetScaleReduction(pos_min_index, pos_max_index, result_value.max_buckets_); - DownscaleBuckets(low_res.positive_buckets_, scale_reduction); - DownscaleBuckets(high_res.positive_buckets_, scale_reduction); - DownscaleBuckets(low_res.negative_buckets_, scale_reduction); - DownscaleBuckets(high_res.negative_buckets_, scale_reduction); - low_res.scale_ -= scale_reduction; - high_res.scale_ -= scale_reduction; - result_value.scale_ -= scale_reduction; + auto pos_min_index = (std::min)(low_res.positive_buckets_->StartIndex(), + high_res.positive_buckets_->StartIndex()); + auto pos_max_index = + (std::max)(low_res.positive_buckets_->EndIndex(), high_res.positive_buckets_->EndIndex()); + auto neg_min_index = (std::min)(low_res.negative_buckets_->StartIndex(), + high_res.negative_buckets_->StartIndex()); + auto neg_max_index = + (std::max)(low_res.negative_buckets_->EndIndex(), high_res.negative_buckets_->EndIndex()); + + if (static_cast(pos_max_index) > + static_cast(pos_min_index) + result_value.max_buckets_ || + static_cast(neg_max_index) > + static_cast(neg_min_index) + result_value.max_buckets_) + { + // We need to downscale the buckets to fit into the new max_buckets_. + const uint32_t scale_reduction = + GetScaleReduction(pos_min_index, pos_max_index, result_value.max_buckets_); + DownscaleBuckets(low_res.positive_buckets_, scale_reduction); + DownscaleBuckets(high_res.positive_buckets_, scale_reduction); + DownscaleBuckets(low_res.negative_buckets_, scale_reduction); + DownscaleBuckets(high_res.negative_buckets_, scale_reduction); + low_res.scale_ -= scale_reduction; + high_res.scale_ -= scale_reduction; + result_value.scale_ -= scale_reduction; + } } result_value.positive_buckets_ = std::make_unique(MergeBuckets( diff --git a/sdk/test/metrics/aggregation_test.cc b/sdk/test/metrics/aggregation_test.cc index 85a01af589..1588ac319c 100644 --- a/sdk/test/metrics/aggregation_test.cc +++ b/sdk/test/metrics/aggregation_test.cc @@ -10,11 +10,13 @@ #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" #include "opentelemetry/sdk/metrics/aggregation/base2_exponential_histogram_aggregation.h" +#include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" #include "opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h" #include "opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h" #include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" #include "opentelemetry/sdk/metrics/data/circular_buffer.h" #include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/instruments.h" using namespace opentelemetry::sdk::metrics; namespace nostd = opentelemetry::nostd; @@ -357,3 +359,95 @@ TEST(Aggregation, Base2ExponentialHistogramAggregation) ASSERT_TRUE(diffd_point.negative_buckets_ != nullptr); EXPECT_EQ(diffd_point.positive_buckets_->Get(2), 1); } + +TEST(Aggregation, Base2ExponentialHistogramAggregationMerge) +{ + Base2ExponentialHistogramAggregationConfig config; + config.max_scale_ = 10; + config.max_buckets_ = 100; + config.record_min_max_ = true; + + Base2ExponentialHistogramAggregation aggr(&config); + + int expected_count = 0; + double expected_sum = 0.0; + + // Aggregate some small values + for (int i = 1; i < 10; ++i) + { + expected_count++; + const double value = i * 1e-12; + expected_sum += value; + aggr.Aggregate(value); + } + + const auto aggr_point = nostd::get(aggr.ToPoint()); + + ASSERT_EQ(aggr_point.count_, expected_count); + ASSERT_DOUBLE_EQ(aggr_point.sum_, expected_sum); + ASSERT_EQ(aggr_point.zero_count_, 0); + ASSERT_GT(aggr_point.scale_, -10); + ASSERT_EQ(aggr_point.max_buckets_, config.max_buckets_); + + auto test_merge = [](const std::unique_ptr &merged_aggr, int expected_count, + double expected_sum, int expected_zero_count, int expected_scale, + int expected_max_buckets) { + auto merged_point = nostd::get(merged_aggr->ToPoint()); + EXPECT_EQ(merged_point.count_, expected_count); + EXPECT_DOUBLE_EQ(merged_point.sum_, expected_sum); + EXPECT_EQ(merged_point.zero_count_, expected_zero_count); + EXPECT_EQ(merged_point.scale_, expected_scale); + EXPECT_EQ(merged_point.max_buckets_, expected_max_buckets); + }; + + // default aggregation merge + { + InstrumentDescriptor descriptor; + descriptor.type_ = InstrumentType::kHistogram; + descriptor.unit_ = "unit"; + descriptor.name_ = "histogram"; + descriptor.description_ = "a histogram"; + descriptor.value_type_ = InstrumentValueType::kDouble; + + auto default_aggr = DefaultAggregation::CreateAggregation( + AggregationType::kBase2ExponentialHistogram, descriptor); + auto default_point = nostd::get(default_aggr->ToPoint()); + + const int expected_scale = + aggr_point.scale_ < default_point.scale_ ? aggr_point.scale_ : default_point.scale_; + const int expected_max_buckets = aggr_point.max_buckets_ < default_point.max_buckets_ + ? aggr_point.max_buckets_ + : default_point.max_buckets_; + const int expected_zero_count = 0; + + auto merged_from_default = aggr.Merge(*default_aggr); + test_merge(merged_from_default, expected_count, expected_sum, expected_zero_count, + expected_scale, expected_max_buckets); + + auto merged_to_default = default_aggr->Merge(aggr); + test_merge(merged_to_default, expected_count, expected_sum, expected_zero_count, expected_scale, + expected_max_buckets); + } + + // zero count aggregation merge (Zero is a special case and does not increment the buckets) + { + Base2ExponentialHistogramAggregation zero_aggr(&config); + zero_aggr.Aggregate(0.0); + + const auto zero_point = nostd::get(zero_aggr.ToPoint()); + const int expected_scale = + aggr_point.scale_ < zero_point.scale_ ? aggr_point.scale_ : zero_point.scale_; + const int expected_max_buckets = aggr_point.max_buckets_ < zero_point.max_buckets_ + ? aggr_point.max_buckets_ + : zero_point.max_buckets_; + const int expected_zero_count = 1; + + auto merged_from_zero = aggr.Merge(zero_aggr); + test_merge(merged_from_zero, expected_count + 1, expected_sum, expected_zero_count, + expected_scale, expected_max_buckets); + + auto merged_to_zero = zero_aggr.Merge(aggr); + test_merge(merged_to_zero, expected_count + 1, expected_sum, expected_zero_count, + expected_scale, expected_max_buckets); + } +}