diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h index f5a48d7ddb..58361855ad 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h @@ -6,6 +6,7 @@ #include #include "opentelemetry/version.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -15,6 +16,8 @@ namespace metrics class AggregationConfig { public: + AggregationConfig(size_t cardinality_limit = kAggregationCardinalityLimit) : cardinality_limit_(cardinality_limit) {} + size_t cardinality_limit_; virtual ~AggregationConfig() = default; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index bf1b0e6c37..810312d8d7 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -136,7 +136,7 @@ class Meter final : public opentelemetry::metrics::Meter // meter-context. std::unique_ptr scope_; std::weak_ptr meter_context_; - // Mapping between instrument-name and Aggregation Storage. + // Mapping between view-name and Aggregation Storage. std::unordered_map> storage_registry_; std::shared_ptr observable_registry_; MeterConfig meter_config_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 14e9a3fbfa..975357a536 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -42,8 +42,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, - cumulative_hash_map_(new AttributesHashMap()), - delta_hash_map_(new AttributesHashMap()), + aggregation_config_{aggregation_config}, + cumulative_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), + delta_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), exemplar_reservoir_(exemplar_reservoir), @@ -126,7 +127,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora { std::lock_guard guard(hashmap_lock_); delta_metrics = std::move(delta_hash_map_); - delta_hash_map_.reset(new AttributesHashMap); + delta_hash_map_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); } auto status = @@ -138,6 +139,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; + const AggregationConfig *aggregation_config_; std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; opentelemetry::common::SpinLockMutex hashmap_lock_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index ddf2a8b206..03aaca3ef9 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -47,7 +47,13 @@ class AttributesHashMap public: AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit) : attributes_limit_(attributes_limit) - {} + { + if (attributes_limit_ > kAggregationCardinalityLimit) + { + hash_map_.reserve(attributes_limit_); + } + } + Aggregation *Get(size_t hash) const { auto it = hash_map_.find(hash); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index 6e5b799e3f..05b97a36eb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -63,10 +63,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage ExemplarFilterType exempler_filter_type, nostd::shared_ptr &&exemplar_reservoir, #endif - const AggregationConfig *aggregation_config, - size_t attributes_limit = kAggregationCardinalityLimit) + const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), - attributes_hashmap_(new AttributesHashMap(attributes_limit)), + aggregation_config_(aggregation_config), + attributes_hashmap_(new AttributesHashMap( + aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)), attributes_processor_(attributes_processor), #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_filter_type_(exempler_filter_type), @@ -195,6 +196,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage private: InstrumentDescriptor instrument_descriptor_; // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) + const AggregationConfig *aggregation_config_; std::unique_ptr attributes_hashmap_; std::function()> create_default_aggregation_; const AttributesProcessor *attributes_processor_; diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index f89de3b6b0..3a7d2c3be5 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -496,7 +496,7 @@ std::unique_ptr Meter::RegisterSyncMetricStorage( instrument_descriptor), #endif view.GetAggregationConfig())); - storage_registry_[instrument_descriptor.name_] = storage; + storage_registry_[view_instr_desc.name_] = storage; multi_storage->AddStorage(storage); return true; }); @@ -554,7 +554,7 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( instrument_descriptor), #endif view.GetAggregationConfig())); - storage_registry_[instrument_descriptor.name_] = storage; + storage_registry_[view_instr_desc.name_] = storage; static_cast(storages.get())->AddStorage(storage); return true; }); diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index ad6ea27821..2380bbb6c8 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -35,7 +35,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, { std::lock_guard guard(attribute_hashmap_lock_); delta_metrics = std::move(attributes_hashmap_); - attributes_hashmap_.reset(new AttributesHashMap); + attributes_hashmap_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); } return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 1302f9d642..90e09c0523 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -99,7 +99,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, } auto unreported_list = std::move(present->second); // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` - std::unique_ptr merged_metrics(new AttributesHashMap); + std::unique_ptr merged_metrics(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit)); for (auto &agg_hashmap : unreported_list) { agg_hashmap->GetAllEnteries( diff --git a/sdk/test/metrics/cardinality_limit_test.cc b/sdk/test/metrics/cardinality_limit_test.cc index c785eeb5d5..8e07e4ceac 100644 --- a/sdk/test/metrics/cardinality_limit_test.cc +++ b/sdk/test/metrics/cardinality_limit_test.cc @@ -121,6 +121,8 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati const size_t attributes_limit = 10; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; + + AggregationConfig aggConfig(attributes_limit); std::unique_ptr default_attributes_processor{ new DefaultAttributesProcessor{}}; SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(), @@ -128,7 +130,7 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(), #endif - nullptr, attributes_limit); + &aggConfig); int64_t record_value = 100; // add 9 unique metric points, and 6 more above limit. diff --git a/sdk/test/metrics/sum_aggregation_test.cc b/sdk/test/metrics/sum_aggregation_test.cc index 72783f6492..7980819e0a 100644 --- a/sdk/test/metrics/sum_aggregation_test.cc +++ b/sdk/test/metrics/sum_aggregation_test.cc @@ -30,6 +30,7 @@ #include "opentelemetry/sdk/metrics/view/instrument_selector.h" #include "opentelemetry/sdk/metrics/view/meter_selector.h" #include "opentelemetry/sdk/metrics/view/view.h" +#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" #if OPENTELEMETRY_HAVE_WORKING_REGEX @@ -144,6 +145,78 @@ TEST(HistogramToSumFilterAttributes, Double) }); } +TEST(HistogramToSumFilterAttributesWithCardinaityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "historgram1"; + std::string instrument_desc = "histogram metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", instrument_unit, + AggregationType::kSum, dummy_aggregation_config, + std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kHistogram, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleHistogram(instrument_name, instrument_desc, instrument_unit); + size_t total_metrics_times = 5; + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}}; + h->Record(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (size_t i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.end() != md.point_data_attr_[i].attributes.find("attr1")) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } else { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + TEST(CounterToSum, Double) { MeterProvider mp; @@ -247,6 +320,79 @@ TEST(CounterToSumFilterAttributes, Double) }); } +TEST(CounterToSumFilterAttributesWithCardinalityLimit, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + std::string instrument_unit = "ms"; + std::string instrument_name = "counter1"; + std::string instrument_desc = "counter metrics"; + size_t cardinality_limit = 10000; + + std::unordered_map allowedattr; + allowedattr["attr1"] = true; + std::unique_ptr attrproc{ + new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)}; + + std::shared_ptr dummy_aggregation_config{ + new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)}; + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", instrument_unit, + AggregationType::kSum, dummy_aggregation_config, + std::move(attrproc))}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kCounter, instrument_name, instrument_unit)}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto c = m->CreateDoubleCounter(instrument_name, instrument_desc, instrument_unit); + + size_t agg_repeat_count = 5; + for (size_t repeat = 0; repeat < agg_repeat_count; repeat++) + { + size_t total_metrics_times = 5; + + for (size_t times = 0; times < total_metrics_times; times++) + { + for (size_t i = 0; i < 2 * cardinality_limit; i++) + { + std::unordered_map attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}}; + c->Add(1, attr, opentelemetry::context::Context{}); + } + } + + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + // Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality. + EXPECT_GE(cardinality_limit, md.point_data_attr_.size()); + EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size()); + for (size_t i = 0; i < md.point_data_attr_.size(); i++) + { + EXPECT_EQ(1, md.point_data_attr_[i].attributes.size()); + if (md.point_data_attr_[i].attributes.find("attr1") != md.point_data_attr_[i].attributes.end()) + { + EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get(opentelemetry::nostd::get( + md.point_data_attr_[i].point_data) + .value_)); + } else { + EXPECT_NE(md.point_data_attr_[i].attributes.end(), + md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey)); + } + } + } + } + return true; + }); + } +} + + class UpDownCounterToSumFixture : public ::testing::TestWithParam {};