From f3c38a3063b7d79a26652d650f85f45d770fab87 Mon Sep 17 00:00:00 2001 From: PradSenn Date: Tue, 11 Mar 2025 17:45:41 -0700 Subject: [PATCH 1/3] Implementing configurable aggregation cardinality limit and collector emitting all views instead of last view Issues Cardinality of the Aggregation limit (2000) is hardcoded and no ability to configure as part of the meter/aggregation Meter stores meter-name to storage in storage_registry_. When multiple views are added, the last view overrides the previous view, and collector can only emit the last view's metrics that are added. Fixes Introducing cardinality as configurable parameter in AggregationConfig, and implementing the limit from AggregationConfig in Storage Changing the storage_registry_ to keep track of view_name to storage, instead of metric_name to storage. So when multiple views are added, different view-names will be collected by collector. Please provide a brief description of the changes here. --- .../metrics/aggregation/aggregation_config.h | 3 + sdk/include/opentelemetry/sdk/metrics/meter.h | 2 +- .../sdk/metrics/state/async_metric_storage.h | 8 +- .../sdk/metrics/state/attributes_hashmap.h | 8 +- .../sdk/metrics/state/sync_metric_storage.h | 8 +- sdk/src/metrics/meter.cc | 4 +- sdk/src/metrics/state/sync_metric_storage.cc | 2 +- .../metrics/state/temporal_metric_storage.cc | 2 +- sdk/test/metrics/cardinality_limit_test.cc | 4 +- sdk/test/metrics/sum_aggregation_test.cc | 146 ++++++++++++++++++ 10 files changed, 174 insertions(+), 13 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h index f5a48d7ddb..58361855ad 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h @@ -6,6 +6,7 @@ #include #include "opentelemetry/version.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -15,6 +16,8 @@ namespace metrics class AggregationConfig { public: + AggregationConfig(size_t cardinality_limit = kAggregationCardinalityLimit) : cardinality_limit_(cardinality_limit) {} + size_t cardinality_limit_; virtual ~AggregationConfig() = default; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index bf1b0e6c37..810312d8d7 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -136,7 +136,7 @@ class Meter final : public opentelemetry::metrics::Meter // meter-context. std::unique_ptr scope_; std::weak_ptr meter_context_; - // Mapping between instrument-name and Aggregation Storage. + // Mapping between view-name and Aggregation Storage. std::unordered_map> storage_registry_; std::shared_ptr observable_registry_; MeterConfig meter_config_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 14e9a3fbfa..975357a536 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -42,8 +42,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, - cumulative_hash_map_(new AttributesHashMap()), - delta_hash_map_(new AttributesHashMap()), + aggregation_config_{aggregation_config}, + cumulative_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), + delta_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), exemplar_reservoir_(exemplar_reservoir), @@ -126,7 +127,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora { std::lock_guard guard(hashmap_lock_); delta_metrics = std::move(delta_hash_map_); - delta_hash_map_.reset(new AttributesHashMap); + delta_hash_map_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); } auto status = @@ -138,6 +139,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; + const AggregationConfig *aggregation_config_; std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; opentelemetry::common::SpinLockMutex hashmap_lock_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index ddf2a8b206..03aaca3ef9 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -47,7 +47,13 @@ class AttributesHashMap public: AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit) : attributes_limit_(attributes_limit) - {} + { + if (attributes_limit_ > kAggregationCardinalityLimit) + { + hash_map_.reserve(attributes_limit_); + } + } + Aggregation *Get(size_t hash) const { auto it = hash_map_.find(hash); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index 6e5b799e3f..b142b18730 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -63,10 +63,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage ExemplarFilterType exempler_filter_type, nostd::shared_ptr &&exemplar_reservoir, #endif - const AggregationConfig *aggregation_config, - size_t attributes_limit = kAggregationCardinalityLimit) + const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), - attributes_hashmap_(new AttributesHashMap(attributes_limit)), + aggregation_config_(aggregation_config), + attributes_hashmap_(new AttributesHashMap( + aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), attributes_processor_(attributes_processor), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), @@ -202,6 +203,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage ExemplarFilterType exemplar_filter_type_; nostd::shared_ptr exemplar_reservoir_; #endif + const AggregationConfig *aggregation_config_; TemporalMetricStorage temporal_metric_storage_; opentelemetry::common::SpinLockMutex attribute_hashmap_lock_; }; diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index f89de3b6b0..3a7d2c3be5 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -496,7 +496,7 @@ std::unique_ptr Meter::RegisterSyncMetricStorage( instrument_descriptor), #endif view.GetAggregationConfig())); - storage_registry_[instrument_descriptor.name_] = storage; + storage_registry_[view_instr_desc.name_] = storage; multi_storage->AddStorage(storage); return true; }); @@ -554,7 +554,7 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( instrument_descriptor), #endif view.GetAggregationConfig())); - storage_registry_[instrument_descriptor.name_] = storage; + storage_registry_[view_instr_desc.name_] = storage; static_cast(storages.get())->AddStorage(storage); return true; }); diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index ad6ea27821..2380bbb6c8 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -35,7 +35,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, { std::lock_guard guard(attribute_hashmap_lock_); delta_metrics = std::move(attributes_hashmap_); - attributes_hashmap_.reset(new AttributesHashMap); + attributes_hashmap_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); } return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 1302f9d642..90e09c0523 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -99,7 +99,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, } auto unreported_list = std::move(present->second); // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` - std::unique_ptr merged_metrics(new AttributesHashMap); + std::unique_ptr merged_metrics(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); for (auto &agg_hashmap : unreported_list) { agg_hashmap->GetAllEnteries( diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index c785eeb5d5..8e07e4ceac 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -121,6 +121,8 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati const size_t attributes_limit = 10; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; + + AggregationConfig aggConfig(attributes_limit); std::unique_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(), @@ -128,7 +130,7 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif - nullptr, attributes_limit); + &aggConfig); int64_t record_value = 100; // add 9 unique metric points, and 6 more above limit. diff --git a/sdk/test/metrics/sum_aggregation_test.cc b/sdk/test/metrics/sum_aggregation_test.cc index 72783f6492..7bd46947dd 100644 --- a/sdk/test/metrics/sum_aggregation_test.cc +++ b/sdk/test/metrics/sum_aggregation_test.cc @@ -30,6 +30,7 @@ #include "opentelemetry/sdk/metrics/view/instrument_selector.h" #include "opentelemetry/sdk/metrics/view/meter_selector.h" #include "opentelemetry/sdk/metrics/view/view.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #if OPENTELEMETRY_HAVE_WORKING_REGEX @@ -144,6 +145,78 @@ TEST(HistogramToSumFilterAttributes, Double) }); } +TEST(HistogramToSumFilterAttributesWithCardinaityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "historgram1"; + std::string instrument_desc = "histogram metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", instrument_unit, + AggregationType::kSum, dummy_aggregation_config, + std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kHistogram, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleHistogram(instrument_name, instrument_desc, instrument_unit); + size_t total_metrics_times = 5; + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}}; + h->Record(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (size_t i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.end() != md.point_data_attr_[i].attributes.find("attr1")) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } else { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + TEST(CounterToSum, Double) { MeterProvider mp; @@ -247,6 +320,79 @@ TEST(CounterToSumFilterAttributes, Double) }); } +TEST(CounterToSumFilterAttributesWithCardinalityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "counter1"; + std::string instrument_desc = "counter metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", instrument_unit, + AggregationType::kSum, dummy_aggregation_config, + std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kCounter, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto c = m->CreateDoubleCounter(instrument_name, instrument_desc, instrument_unit); + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + size_t total_metrics_times = 5; + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}}; + c->Add(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (int i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.find("attr1") != md.point_data_attr_[i].attributes.end()) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } else { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + + class UpDownCounterToSumFixture : public ::testing::TestWithParam {}; From 0d80b39f413f401cea8f07d707cb6cd515d7deeb Mon Sep 17 00:00:00 2001 From: Prad Senniappan Date: Wed, 12 Mar 2025 16:37:34 +0000 Subject: [PATCH 2/3] Fixing initialization order issue --- .../opentelemetry/sdk/metrics/state/sync_metric_storage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index b142b18730..05b97a36eb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -196,6 +196,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage private: InstrumentDescriptor instrument_descriptor_; // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) + const AggregationConfig *aggregation_config_; std::unique_ptr attributes_hashmap_; std::function()> create_default_aggregation_; const AttributesProcessor *attributes_processor_; @@ -203,7 +204,6 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage ExemplarFilterType exemplar_filter_type_; nostd::shared_ptr exemplar_reservoir_; #endif - const AggregationConfig *aggregation_config_; TemporalMetricStorage temporal_metric_storage_; opentelemetry::common::SpinLockMutex attribute_hashmap_lock_; }; From 671e4caccfe6fd425d5211cb16e8af492ce1fc78 Mon Sep 17 00:00:00 2001 From: Prad Senniappan Date: Fri, 14 Mar 2025 03:52:45 +0000 Subject: [PATCH 3/3] Fixing integer size mismatch error --- sdk/test/metrics/sum_aggregation_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/test/metrics/sum_aggregation_test.cc b/sdk/test/metrics/sum_aggregation_test.cc index 7bd46947dd..7980819e0a 100644 --- a/sdk/test/metrics/sum_aggregation_test.cc +++ b/sdk/test/metrics/sum_aggregation_test.cc @@ -372,7 +372,7 @@ TEST(CounterToSumFilterAttributesWithCardinalityLimit, Double) // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality. EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); - for (int i = 0; i < md.point_data_attr_.size(); i++) + for (size_t i = 0; i < md.point_data_attr_.size(); i++) { EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); if (md.point_data_attr_[i].attributes.find("attr1") != md.point_data_attr_[i].attributes.end())