Skip to content

Implementing configurable aggregation cardinality limit and collector emitting all views instead of last view #3299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include "opentelemetry/version.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand All @@ -15,6 +16,8 @@ namespace metrics
class AggregationConfig
{
public:
AggregationConfig(size_t cardinality_limit = kAggregationCardinalityLimit) : cardinality_limit_(cardinality_limit) {}
size_t cardinality_limit_;
virtual ~AggregationConfig() = default;
};

Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Meter final : public opentelemetry::metrics::Meter
// meter-context.
std::unique_ptr<sdk::instrumentationscope::InstrumentationScope> scope_;
std::weak_ptr<sdk::metrics::MeterContext> meter_context_;
// Mapping between instrument-name and Aggregation Storage.
// Mapping between view-name and Aggregation Storage.
std::unordered_map<std::string, std::shared_ptr<MetricStorage>> storage_registry_;
std::shared_ptr<ObservableRegistry> observable_registry_;
MeterConfig meter_config_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
aggregation_config_{aggregation_config},
cumulative_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)),
delta_hash_map_(new AttributesHashMap(aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_filter_type_(exempler_filter_type),
exemplar_reservoir_(exemplar_reservoir),
Expand Down Expand Up @@ -126,7 +127,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
delta_metrics = std::move(delta_hash_map_);
delta_hash_map_.reset(new AttributesHashMap);
delta_hash_map_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit));
}

auto status =
Expand All @@ -138,6 +139,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
const AggregationConfig *aggregation_config_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ class AttributesHashMap
public:
AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit)
: attributes_limit_(attributes_limit)
{}
{
if (attributes_limit_ > kAggregationCardinalityLimit)
{
hash_map_.reserve(attributes_limit_);
}
}

Aggregation *Get(size_t hash) const
{
auto it = hash_map_.find(hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
ExemplarFilterType exempler_filter_type,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
#endif
const AggregationConfig *aggregation_config,
size_t attributes_limit = kAggregationCardinalityLimit)
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap(attributes_limit)),
aggregation_config_(aggregation_config),
attributes_hashmap_(new AttributesHashMap(
aggregation_config ? aggregation_config->cardinality_limit_ : kAggregationCardinalityLimit)),
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_filter_type_(exempler_filter_type),
Expand Down Expand Up @@ -195,6 +196,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
private:
InstrumentDescriptor instrument_descriptor_;
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
const AggregationConfig *aggregation_config_;
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
const AttributesProcessor *attributes_processor_;
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
instrument_descriptor),
#endif
view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
storage_registry_[view_instr_desc.name_] = storage;
multi_storage->AddStorage(storage);
return true;
});
Expand Down Expand Up @@ -554,7 +554,7 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
instrument_descriptor),
#endif
view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
storage_registry_[view_instr_desc.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
});
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/metrics/state/sync_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
delta_metrics = std::move(attributes_hashmap_);
attributes_hashmap_.reset(new AttributesHashMap);
attributes_hashmap_.reset(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit));
}

return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
}
auto unreported_list = std::move(present->second);
// Iterate over the unreporter metrics for `collector` and store result in `merged_metrics`
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap);
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_ : kAggregationCardinalityLimit));
for (auto &agg_hashmap : unreported_list)
{
agg_hashmap->GetAllEnteries(
Expand Down
4 changes: 3 additions & 1 deletion sdk/test/metrics/cardinality_limit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,16 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati
const size_t attributes_limit = 10;
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};

AggregationConfig aggConfig(attributes_limit);
std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
ExemplarFilterType::kAlwaysOff,
ExemplarReservoir::GetNoExemplarReservoir(),
#endif
nullptr, attributes_limit);
&aggConfig);

int64_t record_value = 100;
// add 9 unique metric points, and 6 more above limit.
Expand Down
146 changes: 146 additions & 0 deletions sdk/test/metrics/sum_aggregation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "opentelemetry/sdk/metrics/view/instrument_selector.h"
#include "opentelemetry/sdk/metrics/view/meter_selector.h"
#include "opentelemetry/sdk/metrics/view/view.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"

#if OPENTELEMETRY_HAVE_WORKING_REGEX

Expand Down Expand Up @@ -144,6 +145,78 @@ TEST(HistogramToSumFilterAttributes, Double)
});
}

TEST(HistogramToSumFilterAttributesWithCardinaityLimit, Double)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");
std::string instrument_unit = "ms";
std::string instrument_name = "historgram1";
std::string instrument_desc = "histogram metrics";
size_t cardinality_limit = 10000;

std::unordered_map<std::string, bool> allowedattr;
allowedattr["attr1"] = true;
std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor> attrproc{
new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)};

std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig> dummy_aggregation_config{
new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)};
std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

std::unique_ptr<View> view{new View("view1", "view1_description", instrument_unit,
AggregationType::kSum, dummy_aggregation_config,
std::move(attrproc))};
std::unique_ptr<InstrumentSelector> instrument_selector{
new InstrumentSelector(InstrumentType::kHistogram, instrument_name, instrument_unit)};
std::unique_ptr<MeterSelector> meter_selector{new MeterSelector("meter1", "version1", "schema1")};
mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view));

auto h = m->CreateDoubleHistogram(instrument_name, instrument_desc, instrument_unit);
size_t total_metrics_times = 5;

size_t agg_repeat_count = 5;
for (size_t repeat = 0; repeat < agg_repeat_count; repeat++)
{

for (size_t times = 0; times < total_metrics_times; times++)
{
for (size_t i = 0; i < 2 * cardinality_limit; i++)
{
std::unordered_map<std::string, std::string> attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}};
h->Record(1, attr, opentelemetry::context::Context{});
}
}

reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
// Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality.
EXPECT_GE(cardinality_limit, md.point_data_attr_.size());
EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size());
for (size_t i = 0; i < md.point_data_attr_.size(); i++)
{
EXPECT_EQ(1, md.point_data_attr_[i].attributes.size());
if (md.point_data_attr_[i].attributes.end() != md.point_data_attr_[i].attributes.find("attr1"))
{
EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get<double>(opentelemetry::nostd::get<SumPointData>(
md.point_data_attr_[i].point_data)
.value_));
} else {
EXPECT_NE(md.point_data_attr_[i].attributes.end(),
md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey));
}
}
}
}
return true;
});
}
}

TEST(CounterToSum, Double)
{
MeterProvider mp;
Expand Down Expand Up @@ -247,6 +320,79 @@ TEST(CounterToSumFilterAttributes, Double)
});
}

TEST(CounterToSumFilterAttributesWithCardinalityLimit, Double)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");
std::string instrument_unit = "ms";
std::string instrument_name = "counter1";
std::string instrument_desc = "counter metrics";
size_t cardinality_limit = 10000;

std::unordered_map<std::string, bool> allowedattr;
allowedattr["attr1"] = true;
std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor> attrproc{
new opentelemetry::sdk::metrics::FilteringAttributesProcessor(allowedattr)};

std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig> dummy_aggregation_config{
new opentelemetry::sdk::metrics::AggregationConfig(cardinality_limit)};
std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

std::unique_ptr<View> view{new View("view1", "view1_description", instrument_unit,
AggregationType::kSum, dummy_aggregation_config,
std::move(attrproc))};
std::unique_ptr<InstrumentSelector> instrument_selector{
new InstrumentSelector(InstrumentType::kCounter, instrument_name, instrument_unit)};
std::unique_ptr<MeterSelector> meter_selector{new MeterSelector("meter1", "version1", "schema1")};
mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view));

auto c = m->CreateDoubleCounter(instrument_name, instrument_desc, instrument_unit);

size_t agg_repeat_count = 5;
for (size_t repeat = 0; repeat < agg_repeat_count; repeat++)
{
size_t total_metrics_times = 5;

for (size_t times = 0; times < total_metrics_times; times++)
{
for (size_t i = 0; i < 2 * cardinality_limit; i++)
{
std::unordered_map<std::string, std::string> attr = {{"attr1", std::to_string(i)}, {"attr2", "val2"}};
c->Add(1, attr, opentelemetry::context::Context{});
}
}

reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
// Something weird about attributes hashmap. If cardinality is setup to n, it emits n-1 including overflow. Just making the logic generic here to succeed for n or n-1 total cardinality.
EXPECT_GE(cardinality_limit, md.point_data_attr_.size());
EXPECT_LT(cardinality_limit / 2, md.point_data_attr_.size());
for (size_t i = 0; i < md.point_data_attr_.size(); i++)
{
EXPECT_EQ(1, md.point_data_attr_[i].attributes.size());
if (md.point_data_attr_[i].attributes.find("attr1") != md.point_data_attr_[i].attributes.end())
{
EXPECT_EQ(total_metrics_times * (repeat + 1), opentelemetry::nostd::get<double>(opentelemetry::nostd::get<SumPointData>(
md.point_data_attr_[i].point_data)
.value_));
} else {
EXPECT_NE(md.point_data_attr_[i].attributes.end(),
md.point_data_attr_[i].attributes.find(sdk::metrics::kAttributesLimitOverflowKey));
}
}
}
}
return true;
});
}
}


class UpDownCounterToSumFixture : public ::testing::TestWithParam<bool>
{};

Expand Down
Loading