Skip to content

Commit 4193b21

Browse files
authored
Implementing configurable aggregation cardinality limit (open-telemetry#3624)
1 parent a025766 commit 4193b21

File tree

11 files changed

+222
-16
lines changed

11 files changed

+222
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Increment the:
3333
* [BUILD] Use -dev versions in main branch
3434
[#3609](https://github.com/open-telemetry/opentelemetry-cpp/pull/3609)
3535

36+
* [SDK] Implementing configurable aggregation cardinality limit
37+
[#3624](https://github.com/open-telemetry/opentelemetry-cpp/pull/3624)
38+
3639
Important changes:
3740

3841
* [CMAKE] Upgrade CMake minimum version to 3.16

ci/do_ci.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ elif [[ "$1" == "bazel.tsan" ]]; then
558558
exit 0
559559
elif [[ "$1" == "bazel.valgrind" ]]; then
560560
bazel $BAZEL_STARTUP_OPTIONS build $BAZEL_OPTIONS_ASYNC //...
561-
bazel $BAZEL_STARTUP_OPTIONS test --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --errors-for-leak-kinds=definite --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS_ASYNC //...
561+
bazel $BAZEL_STARTUP_OPTIONS test --test_timeout=600 --run_under="/usr/bin/valgrind --leak-check=full --error-exitcode=1 --errors-for-leak-kinds=definite --suppressions=\"${SRC_DIR}/ci/valgrind-suppressions\"" $BAZEL_TEST_OPTIONS_ASYNC //...
562562
exit 0
563563
elif [[ "$1" == "bazel.e2e" ]]; then
564564
cd examples/e2e

sdk/include/opentelemetry/sdk/metrics/aggregation/aggregation_config.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <vector>
77

8+
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
89
#include "opentelemetry/version.h"
910

1011
OPENTELEMETRY_BEGIN_NAMESPACE
@@ -15,6 +16,21 @@ namespace metrics
1516
class AggregationConfig
1617
{
1718
public:
19+
AggregationConfig(size_t cardinality_limit = kAggregationCardinalityLimit)
20+
: cardinality_limit_(cardinality_limit)
21+
{}
22+
23+
static const AggregationConfig *GetOrDefault(const AggregationConfig *config)
24+
{
25+
if (config)
26+
{
27+
return config;
28+
}
29+
static const AggregationConfig default_config{};
30+
return &default_config;
31+
}
32+
33+
size_t cardinality_limit_;
1834
virtual ~AggregationConfig() = default;
1935
};
2036

sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include "opentelemetry/nostd/shared_ptr.h"
1111
#include "opentelemetry/sdk/common/attributemap_hash.h"
12+
#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h"
1213
#include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
1314

1415
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
@@ -42,8 +43,11 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
4243
const AggregationConfig *aggregation_config)
4344
: instrument_descriptor_(instrument_descriptor),
4445
aggregation_type_{aggregation_type},
45-
cumulative_hash_map_(new AttributesHashMap()),
46-
delta_hash_map_(new AttributesHashMap()),
46+
aggregation_config_{AggregationConfig::GetOrDefault(aggregation_config)},
47+
cumulative_hash_map_(
48+
std::make_unique<AttributesHashMap>(aggregation_config_->cardinality_limit_)),
49+
delta_hash_map_(
50+
std::make_unique<AttributesHashMap>(aggregation_config_->cardinality_limit_)),
4751
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
4852
exemplar_filter_type_(exempler_filter_type),
4953
exemplar_reservoir_(exemplar_reservoir),
@@ -124,7 +128,8 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
124128
{
125129
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
126130
delta_metrics = std::move(delta_hash_map_);
127-
delta_hash_map_.reset(new AttributesHashMap);
131+
delta_hash_map_ =
132+
std::make_unique<AttributesHashMap>(aggregation_config_->cardinality_limit_);
128133
}
129134

130135
auto status =
@@ -136,6 +141,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
136141
private:
137142
InstrumentDescriptor instrument_descriptor_;
138143
AggregationType aggregation_type_;
144+
const AggregationConfig *aggregation_config_;
139145
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
140146
std::unique_ptr<AttributesHashMap> delta_hash_map_;
141147
opentelemetry::common::SpinLockMutex hashmap_lock_;

sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ class AttributesHashMapWithCustomHash
4949
public:
5050
AttributesHashMapWithCustomHash(size_t attributes_limit = kAggregationCardinalityLimit)
5151
: attributes_limit_(attributes_limit)
52-
{}
52+
{
53+
if (attributes_limit_ > kAggregationCardinalityLimit)
54+
{
55+
hash_map_.reserve(attributes_limit_);
56+
}
57+
}
58+
5359
Aggregation *Get(const MetricAttributes &attributes) const
5460
{
5561
auto it = hash_map_.find(attributes);

sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "opentelemetry/nostd/string_view.h"
1919
#include "opentelemetry/sdk/common/attributemap_hash.h"
2020
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
21+
#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h"
2122
#include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
2223
#include "opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h"
2324
#include "opentelemetry/sdk/metrics/data/metric_data.h"
@@ -63,10 +64,11 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
6364
ExemplarFilterType exempler_filter_type,
6465
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
6566
#endif
66-
const AggregationConfig *aggregation_config,
67-
size_t attributes_limit = kAggregationCardinalityLimit)
67+
const AggregationConfig *aggregation_config)
6868
: instrument_descriptor_(instrument_descriptor),
69-
attributes_hashmap_(new AttributesHashMap(attributes_limit)),
69+
aggregation_config_(AggregationConfig::GetOrDefault(aggregation_config)),
70+
attributes_hashmap_(
71+
std::make_unique<AttributesHashMap>(aggregation_config_->cardinality_limit_)),
7072
attributes_processor_(std::move(attributes_processor)),
7173
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
7274
exemplar_filter_type_(exempler_filter_type),
@@ -173,6 +175,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
173175
private:
174176
InstrumentDescriptor instrument_descriptor_;
175177
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
178+
const AggregationConfig *aggregation_config_;
176179
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
177180
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
178181
std::shared_ptr<const AttributesProcessor> attributes_processor_;

sdk/src/metrics/state/sync_metric_storage.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "opentelemetry/common/timestamp.h"
1010
#include "opentelemetry/nostd/function_ref.h"
1111
#include "opentelemetry/nostd/span.h"
12+
#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h"
1213
#include "opentelemetry/sdk/metrics/data/metric_data.h"
1314
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
1415
#include "opentelemetry/sdk/metrics/state/metric_collector.h"
@@ -35,7 +36,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
3536
{
3637
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
3738
delta_metrics = std::move(attributes_hashmap_);
38-
attributes_hashmap_.reset(new AttributesHashMap);
39+
attributes_hashmap_.reset(new AttributesHashMap(aggregation_config_->cardinality_limit_));
3940
}
4041

4142
return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,

sdk/src/metrics/state/temporal_metric_storage.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
9797
}
9898
auto unreported_list = std::move(present->second);
9999
// Iterate over the unreporter metrics for `collector` and store result in `merged_metrics`
100-
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap);
100+
std::unique_ptr<AttributesHashMap> merged_metrics(
101+
new AttributesHashMap(aggregation_config_ ? aggregation_config_->cardinality_limit_
102+
: kAggregationCardinalityLimit));
101103
for (auto &agg_hashmap : unreported_list)
102104
{
103105
agg_hashmap->GetAllEnteries(

sdk/test/metrics/async_metric_storage_test.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
2626
#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h"
2727
#include "opentelemetry/sdk/metrics/state/metric_collector.h"
28-
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
2928

3029
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
30+
# include "opentelemetry/sdk/metrics/data/exemplar_data.h"
3131
# include "opentelemetry/sdk/metrics/exemplar/filter_type.h"
3232
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
33+
#else
34+
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
3335
#endif
3436

3537
using namespace opentelemetry::sdk::metrics;
@@ -192,8 +194,10 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
192194
}
193195
return true;
194196
});
195-
// subsequent recording after collection shouldn't fail
196-
// monotonic increasing values;
197+
// Note: When the cardinality limit is set to n, the attributes hashmap emits n-1 distinct
198+
// attribute sets, plus an overflow bucket for additional attributes. The test logic below is made
199+
// generic to succeed for either n or n-1 total cardinality. If this behavior is unexpected,
200+
// please investigate and file an issue.
197201
int64_t get_count2 = -50;
198202
int64_t put_count2 = -70;
199203

sdk/test/metrics/cardinality_limit_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "opentelemetry/nostd/span.h"
2121
#include "opentelemetry/nostd/variant.h"
2222
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
23+
#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h"
2324
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
2425
#include "opentelemetry/sdk/metrics/data/metric_data.h"
2526
#include "opentelemetry/sdk/metrics/data/point_data.h"
@@ -110,14 +111,15 @@ TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregati
110111
const size_t attributes_limit = 10;
111112
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
112113
InstrumentValueType::kLong};
114+
AggregationConfig aggConfig(attributes_limit);
113115
std::shared_ptr<DefaultAttributesProcessor> default_attributes_processor{
114116
new DefaultAttributesProcessor{}};
115117
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor,
116118
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
117119
ExemplarFilterType::kAlwaysOff,
118120
ExemplarReservoir::GetNoExemplarReservoir(),
119121
#endif
120-
nullptr, attributes_limit);
122+
&aggConfig);
121123

122124
int64_t record_value = 100;
123125
// add 9 unique metric points, and 6 more above limit.

0 commit comments

Comments
 (0)