diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b79f94cba..a0f9e4b0dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ Increment the: * [BUILD] Use -dev versions in main branch [#3609](https://github.com/open-telemetry/opentelemetry-cpp/pull/3609) +* [SDK] Implementing configurable aggregation cardinality limit + [#3624](https://github.com/open-telemetry/opentelemetry-cpp/pull/3624) + Important changes: * [CMAKE] Upgrade CMake minimum version to 3.16 diff --git a/ci/do_ci.sh b/ci/do_ci.sh index 2eff9cd200..71a9a82cbd 100755 --- a/ci/do_ci.sh +++ b/ci/do_ci.sh @@ -558,7 +558,7 @@ elif [[ "$1" == "bazel.tsan" ]]; then exit 0 elif [[ "$1" == "bazel.valgrind" ]]; then bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC //... - bazel $BAZEL_STARTUP_OPTIONS test --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --errors-for-leak-kinds=definite --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS_ASYNC //... + bazel $BAZEL_STARTUP_OPTIONS test --test_timeout=600 --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --errors-for-leak-kinds=definite --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS_ASYNC //... exit 0 elif [[ "$1" == "bazel.e2e" ]]; then cd examples/e2e diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h index 0cae37f39e..2effa4933c 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h @@ -5,6 +5,7 @@ #include +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -15,6 +16,21 @@ namespace metrics class AggregationConfig { public: + AggregationConfig(size_t cardinality_limit = kAggregationCardinalityLimit) + : cardinality_limit_(cardinality_limit) + {} + + static const AggregationConfig *GetOrDefault(const AggregationConfig *config) + { + if (config) + { + return config; + } + static const AggregationConfig default_config{}; + return &default_config; + } + + size_t cardinality_limit_; virtual ~AggregationConfig() = default; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 93ffccc48c..968de7aa82 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -9,6 +9,7 @@ #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/sdk/common/attributemap_hash.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" #include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW @@ -42,8 +43,11 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, - cumulative_hash_map_(new AttributesHashMap()), - delta_hash_map_(new AttributesHashMap()), + aggregation_config_{AggregationConfig::GetOrDefault(aggregation_config)}, + cumulative_hash_map_( + std::make_unique(aggregation_config_->cardinality_limit_)), + delta_hash_map_( + std::make_unique(aggregation_config_->cardinality_limit_)), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), exemplar_reservoir_(exemplar_reservoir), @@ -124,7 +128,8 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora { std::lock_guard guard(hashmap_lock_); delta_metrics = std::move(delta_hash_map_); - delta_hash_map_.reset(new AttributesHashMap); + delta_hash_map_ = + std::make_unique(aggregation_config_->cardinality_limit_); } auto status = @@ -136,6 +141,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; + const AggregationConfig *aggregation_config_; std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; opentelemetry::common::SpinLockMutex hashmap_lock_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index f1ab55ed61..b2b8863e35 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -49,7 +49,13 @@ class AttributesHashMapWithCustomHash public: AttributesHashMapWithCustomHash(size_t attributes_limit = kAggregationCardinalityLimit) : attributes_limit_(attributes_limit) - {} + { + if (attributes_limit_ > kAggregationCardinalityLimit) + { + hash_map_.reserve(attributes_limit_); + } + } + Aggregation *Get(const MetricAttributes &attributes) const { auto it = hash_map_.find(attributes); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index ded2be1820..f517375fd8 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -18,6 +18,7 @@ #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/sdk/common/attributemap_hash.h" #include "opentelemetry/sdk/metrics/aggregation/aggregation.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" #include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" #include "opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" @@ -63,10 +64,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage ExemplarFilterType exempler_filter_type, nostd::shared_ptr &&exemplar_reservoir, #endif - const AggregationConfig *aggregation_config, - size_t attributes_limit = kAggregationCardinalityLimit) + const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), - attributes_hashmap_(new AttributesHashMap(attributes_limit)), + aggregation_config_(AggregationConfig::GetOrDefault(aggregation_config)), + attributes_hashmap_( + std::make_unique(aggregation_config_->cardinality_limit_)), attributes_processor_(std::move(attributes_processor)), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), @@ -173,6 +175,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage private: InstrumentDescriptor instrument_descriptor_; // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) + const AggregationConfig *aggregation_config_; std::unique_ptr attributes_hashmap_; std::function()> create_default_aggregation_; std::shared_ptr attributes_processor_; diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index ad6ea27821..3844d2b05d 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -9,6 +9,7 @@ #include "opentelemetry/common/timestamp.h" #include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/nostd/span.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #include "opentelemetry/sdk/metrics/state/metric_collector.h" @@ -35,7 +36,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, { std::lock_guard guard(attribute_hashmap_lock_); delta_metrics = std::move(attributes_hashmap_); - attributes_hashmap_.reset(new AttributesHashMap); + attributes_hashmap_.reset(new AttributesHashMap(aggregation_config_->cardinality_limit_)); } return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index fb839ffd7c..9e991f73f9 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -97,7 +97,9 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, } auto unreported_list = std::move(present->second); // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` - std::unique_ptr merged_metrics(new AttributesHashMap); + std::unique_ptr merged_metrics( + new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ + : kAggregationCardinalityLimit)); for (auto &agg_hashmap : unreported_list) { agg_hashmap->GetAllEnteries( diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index f621379384..380ddd0564 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -25,11 +25,13 @@ #include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h" #include "opentelemetry/sdk/metrics/state/metric_collector.h" -#include "opentelemetry/sdk/metrics/view/attributes_processor.h" #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW +# include "opentelemetry/sdk/metrics/data/exemplar_data.h" # include "opentelemetry/sdk/metrics/exemplar/filter_type.h" # include "opentelemetry/sdk/metrics/exemplar/reservoir.h" +#else +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" #endif using namespace opentelemetry::sdk::metrics; @@ -192,8 +194,10 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) } return true; }); - // subsequent recording after collection shouldn't fail - // monotonic increasing values; + // Note: When the cardinality limit is set to n, the attributes hashmap emits n-1 distinct + // attribute sets, plus an overflow bucket for additional attributes. The test logic below is made + // generic to succeed for either n or n-1 total cardinality. If this behavior is unexpected, + // please investigate and file an issue. int64_t get_count2 = -50; int64_t put_count2 = -70; diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index 45e15200c3..61a47c5983 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -20,6 +20,7 @@ #include "opentelemetry/nostd/span.h" #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/metrics/aggregation/aggregation.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" #include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/sdk/metrics/data/point_data.h" @@ -110,6 +111,7 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati const size_t attributes_limit = 10; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; + AggregationConfig aggConfig(attributes_limit); std::shared_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor, @@ -117,7 +119,7 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif - nullptr, attributes_limit); + &aggConfig); int64_t record_value = 100; // add 9 unique metric points, and 6 more above limit. diff --git a/sdk/test/metrics/sum_aggregation_test.cc b/sdk/test/metrics/sum_aggregation_test.cc index 40d3bbf89a..7722db1afa 100644 --- a/sdk/test/metrics/sum_aggregation_test.cc +++ b/sdk/test/metrics/sum_aggregation_test.cc @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 #include +#include #include +#include #include #include #include @@ -26,6 +28,7 @@ #include "opentelemetry/sdk/metrics/meter_provider.h" #include "opentelemetry/sdk/metrics/metric_reader.h" #include "opentelemetry/sdk/metrics/push_metric_exporter.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #include "opentelemetry/sdk/metrics/view/attributes_processor.h" #include "opentelemetry/sdk/metrics/view/instrument_selector.h" #include "opentelemetry/sdk/metrics/view/meter_selector.h" @@ -42,7 +45,7 @@ TEST(HistogramToSum, Double) MeterProvider mp; auto m = mp.GetMeter("meter1", "version1", "schema1"); std::string instrument_unit = "ms"; - std::string instrument_name = "historgram1"; + std::string instrument_name = "histogram1"; std::string instrument_desc = "histogram metrics"; std::unique_ptr exporter(new MockMetricExporter()); @@ -94,7 +97,7 @@ TEST(HistogramToSumFilterAttributes, Double) MeterProvider mp; auto m = mp.GetMeter("meter1", "version1", "schema1"); std::string instrument_unit = "ms"; - std::string instrument_name = "historgram1"; + std::string instrument_name = "histogram1"; std::string instrument_desc = "histogram metrics"; std::unordered_map allowedattr; @@ -142,6 +145,85 @@ TEST(HistogramToSumFilterAttributes, Double) }); } +TEST(HistogramToSumFilterAttributesWithCardinalityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "histogram1"; + std::string instrument_desc = "histogram metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kSum, + dummy_aggregation_config, std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kHistogram, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleHistogram(instrument_name, instrument_desc, instrument_unit); + size_t total_metrics_times = 5; + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, + {"attr2", "val2"}}; + h->Record(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 + // including overflow. Just making the logic generic here to succeed for n or n-1 total + // cardinality. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (size_t i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.end() != + md.point_data_attr_[i].attributes.find("attr1")) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), + opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } + else + { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find( + sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + TEST(CounterToSum, Double) { MeterProvider mp; @@ -244,6 +326,87 @@ TEST(CounterToSumFilterAttributes, Double) }); } +TEST(CounterToSumFilterAttributesWithCardinalityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "counter1"; + std::string instrument_desc = "counter metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kSum, + dummy_aggregation_config, std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kCounter, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto c = m->CreateDoubleCounter(instrument_name, instrument_desc, instrument_unit); + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + size_t total_metrics_times = 5; + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, + {"attr2", "val2"}}; + c->Add(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // When the number of unique attribute sets exceeds the cardinality limit, the + // implementation emits up to (cardinality_limit - 1) unique sets and one overflow set, + // resulting in a total of cardinality_limit sets. This test checks that the number of + // emitted attribute sets is within the expected range, accounting for the overflow + // behavior. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (size_t i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.find("attr1") != + md.point_data_attr_[i].attributes.end()) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), + opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } + else + { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find( + sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + class UpDownCounterToSumFixture : public ::testing::TestWithParam {};