Skip to content

Commit 97bf581

Browse files
committed
feat: Add ability to specify cardinality limit via Instrument advice
1 parent 2564a71 commit 97bf581

File tree

6 files changed

+161
-4
lines changed

6 files changed

+161
-4
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ also modified to suppress telemetry before invoking exporters.
1818
- This feature was previously removed in version 0.28 due to the lack of
1919
configurability but has now been reintroduced with the ability to configure
2020
the limit.
21-
- TODO/Placeholder: Add ability to configure cardinality limits via Instrument
21+
- There is ability to configure cardinality limits via Instrument
2222
advisory.
2323

2424
## 0.29.0

opentelemetry-sdk/src/metrics/meter.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl SdkMeter {
9696
builder.description,
9797
builder.unit,
9898
None,
99+
builder.cardinality_limit,
99100
)
100101
.map(|i| Counter::new(Arc::new(i)))
101102
{
@@ -138,6 +139,7 @@ impl SdkMeter {
138139
builder.description,
139140
builder.unit,
140141
None,
142+
builder.cardinality_limit,
141143
) {
142144
Ok(ms) => {
143145
if ms.is_empty() {
@@ -197,6 +199,7 @@ impl SdkMeter {
197199
builder.description,
198200
builder.unit,
199201
None,
202+
builder.cardinality_limit,
200203
) {
201204
Ok(ms) => {
202205
if ms.is_empty() {
@@ -256,6 +259,7 @@ impl SdkMeter {
256259
builder.description,
257260
builder.unit,
258261
None,
262+
builder.cardinality_limit,
259263
) {
260264
Ok(ms) => {
261265
if ms.is_empty() {
@@ -317,6 +321,7 @@ impl SdkMeter {
317321
builder.description,
318322
builder.unit,
319323
None,
324+
builder.cardinality_limit,
320325
)
321326
.map(|i| UpDownCounter::new(Arc::new(i)))
322327
{
@@ -361,6 +366,7 @@ impl SdkMeter {
361366
builder.description,
362367
builder.unit,
363368
None,
369+
builder.cardinality_limit,
364370
)
365371
.map(|i| Gauge::new(Arc::new(i)))
366372
{
@@ -422,6 +428,7 @@ impl SdkMeter {
422428
builder.description,
423429
builder.unit,
424430
builder.boundaries,
431+
builder.cardinality_limit,
425432
)
426433
.map(|i| Histogram::new(Arc::new(i)))
427434
{
@@ -654,8 +661,10 @@ where
654661
description: Option<Cow<'static, str>>,
655662
unit: Option<Cow<'static, str>>,
656663
boundaries: Option<Vec<f64>>,
664+
cardinality_limit: Option<usize>,
657665
) -> MetricResult<ResolvedMeasures<T>> {
658-
let aggregators = self.measures(kind, name, description, unit, boundaries)?;
666+
let aggregators =
667+
self.measures(kind, name, description, unit, boundaries, cardinality_limit)?;
659668
Ok(ResolvedMeasures {
660669
measures: aggregators,
661670
})
@@ -668,6 +677,7 @@ where
668677
description: Option<Cow<'static, str>>,
669678
unit: Option<Cow<'static, str>>,
670679
boundaries: Option<Vec<f64>>,
680+
cardinality_limit: Option<usize>,
671681
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
672682
let inst = Instrument {
673683
name,
@@ -677,7 +687,7 @@ where
677687
scope: self.meter.scope.clone(),
678688
};
679689

680-
self.resolve.measures(inst, boundaries)
690+
self.resolve.measures(inst, boundaries, cardinality_limit)
681691
}
682692
}
683693

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,12 +370,14 @@ mod tests {
370370
async fn counter_aggregation_overflow_delta() {
371371
counter_aggregation_overflow_helper(Temporality::Delta);
372372
counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
373+
counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Delta);
373374
}
374375

375376
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
376377
async fn counter_aggregation_overflow_cumulative() {
377378
counter_aggregation_overflow_helper(Temporality::Cumulative);
378379
counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
380+
counter_aggregation_overflow_helper_custom_limit_via_advice(Temporality::Cumulative);
379381
}
380382

381383
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -2568,6 +2570,105 @@ mod tests {
25682570
}
25692571
}
25702572

2573+
fn counter_aggregation_overflow_helper_custom_limit_via_advice(temporality: Temporality) {
2574+
// Arrange
2575+
let cardinality_limit = 2300;
2576+
let mut test_context = TestContext::new(temporality);
2577+
let meter = test_context.meter();
2578+
let counter = meter
2579+
.u64_counter("my_counter")
2580+
.with_cardinality_limit(cardinality_limit)
2581+
.build();
2582+
2583+
// Act
2584+
// Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit
2585+
for v in 0..cardinality_limit {
2586+
counter.add(100, &[KeyValue::new("A", v.to_string())]);
2587+
}
2588+
2589+
// Empty attributes is specially treated and does not count towards the limit.
2590+
counter.add(3, &[]);
2591+
counter.add(3, &[]);
2592+
2593+
// All of the below will now go into overflow.
2594+
counter.add(100, &[KeyValue::new("A", "foo")]);
2595+
counter.add(100, &[KeyValue::new("A", "another")]);
2596+
counter.add(100, &[KeyValue::new("A", "yet_another")]);
2597+
test_context.flush_metrics();
2598+
2599+
let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2600+
unreachable!()
2601+
};
2602+
2603+
// Expecting (cardinality_limit + 1 overflow + Empty attributes) data points.
2604+
assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2605+
2606+
let data_point =
2607+
find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
2608+
.expect("overflow point expected");
2609+
assert_eq!(data_point.value, 300);
2610+
2611+
// let empty_attrs_data_point = &sum.data_points[0];
2612+
let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2613+
.expect("Empty attributes point expected");
2614+
assert!(
2615+
empty_attrs_data_point.attributes.is_empty(),
2616+
"Non-empty attribute set"
2617+
);
2618+
assert_eq!(
2619+
empty_attrs_data_point.value, 6,
2620+
"Empty attributes value should be 3+3=6"
2621+
);
2622+
2623+
// Phase 2 - for delta temporality, after each collect, data points are cleared
2624+
// but for cumulative, they are not cleared.
2625+
test_context.reset_metrics();
2626+
// The following should be aggregated normally for Delta,
2627+
// and should go into overflow for Cumulative.
2628+
counter.add(100, &[KeyValue::new("A", "foo")]);
2629+
counter.add(100, &[KeyValue::new("A", "another")]);
2630+
counter.add(100, &[KeyValue::new("A", "yet_another")]);
2631+
test_context.flush_metrics();
2632+
2633+
let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2634+
unreachable!()
2635+
};
2636+
2637+
if temporality == Temporality::Delta {
2638+
assert_eq!(sum.data_points.len(), 3);
2639+
2640+
let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2641+
.expect("point expected");
2642+
assert_eq!(data_point.value, 100);
2643+
2644+
let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2645+
.expect("point expected");
2646+
assert_eq!(data_point.value, 100);
2647+
2648+
let data_point =
2649+
find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2650+
.expect("point expected");
2651+
assert_eq!(data_point.value, 100);
2652+
} else {
2653+
// For cumulative, overflow should still be there, and new points should not be added.
2654+
assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2655+
let data_point =
2656+
find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
2657+
.expect("overflow point expected");
2658+
assert_eq!(data_point.value, 600);
2659+
2660+
let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2661+
assert!(data_point.is_none(), "point should not be present");
2662+
2663+
let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2664+
assert!(data_point.is_none(), "point should not be present");
2665+
2666+
let data_point =
2667+
find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2668+
assert!(data_point.is_none(), "point should not be present");
2669+
}
2670+
}
2671+
25712672
fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
25722673
// Arrange
25732674
let mut test_context = TestContext::new(temporality);

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ where
251251
&self,
252252
inst: Instrument,
253253
boundaries: Option<&[f64]>,
254+
cardinality_limit: Option<usize>,
254255
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
255256
let mut matched = false;
256257
let mut measures = vec![];
@@ -312,6 +313,10 @@ where
312313
});
313314
}
314315

316+
if let Some(cardinality_limit) = cardinality_limit {
317+
stream.cardinality_limit = Some(cardinality_limit);
318+
}
319+
315320
match self.cached_aggregator(&inst.scope, kind, stream) {
316321
Ok(agg) => {
317322
if errs.is_empty() {
@@ -723,11 +728,12 @@ where
723728
&self,
724729
id: Instrument,
725730
boundaries: Option<Vec<f64>>,
731+
cardinality_limit: Option<usize>,
726732
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
727733
let (mut measures, mut errs) = (vec![], vec![]);
728734

729735
for inserter in &self.inserters {
730-
match inserter.instrument(id.clone(), boundaries.as_deref()) {
736+
match inserter.instrument(id.clone(), boundaries.as_deref(), cardinality_limit) {
731737
Ok(ms) => measures.extend(ms),
732738
Err(err) => errs.push(err),
733739
}

opentelemetry/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ disable telemetry generation during their internal operations, ensuring more
2424
predictable and efficient observability pipelines.
2525

2626
- re-export `tracing` for `internal-logs` feature to remove the need of adding `tracing` as a dependency
27+
- Added ability to configure cardinality limits via Instrument
28+
advisory.
2729

2830
## 0.29.1
2931

opentelemetry/src/metrics/instruments/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub struct HistogramBuilder<'a, T> {
4545
/// Unit of the Histogram.
4646
pub unit: Option<Cow<'static, str>>,
4747

48+
/// Cardinality limit for the Histogram.
49+
pub cardinality_limit: Option<usize>,
50+
4851
/// Bucket boundaries for the histogram.
4952
pub boundaries: Option<Vec<f64>>,
5053

@@ -60,6 +63,7 @@ impl<'a, T> HistogramBuilder<'a, T> {
6063
name,
6164
description: None,
6265
unit: None,
66+
cardinality_limit: None,
6367
boundaries: None,
6468
_marker: marker::PhantomData,
6569
}
@@ -83,6 +87,14 @@ impl<'a, T> HistogramBuilder<'a, T> {
8387
self
8488
}
8589

90+
/// Set the cardinality limit for this Histogram.
91+
/// Setting cardinality limit is optional. By default, the limit will be set
92+
/// to 2000.
93+
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
94+
self.cardinality_limit = Some(limit);
95+
self
96+
}
97+
8698
/// Set the boundaries for this histogram.
8799
///
88100
/// Setting boundaries is optional. By default, the boundaries are set to:
@@ -150,6 +162,9 @@ pub struct InstrumentBuilder<'a, T> {
150162
/// Unit of the instrument.
151163
pub unit: Option<Cow<'static, str>>,
152164

165+
/// Cardinality limit for the instrument.
166+
pub cardinality_limit: Option<usize>,
167+
153168
_marker: marker::PhantomData<T>,
154169
}
155170

@@ -161,6 +176,7 @@ impl<'a, T> InstrumentBuilder<'a, T> {
161176
name,
162177
description: None,
163178
unit: None,
179+
cardinality_limit: None,
164180
_marker: marker::PhantomData,
165181
}
166182
}
@@ -182,6 +198,14 @@ impl<'a, T> InstrumentBuilder<'a, T> {
182198
self.unit = Some(unit.into());
183199
self
184200
}
201+
202+
/// Set the cardinality limit for this instrument.
203+
/// Setting cardinality limit is optional. By default, the limit will be set
204+
/// to 2000.
205+
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
206+
self.cardinality_limit = Some(limit);
207+
self
208+
}
185209
}
186210

187211
macro_rules! build_instrument {
@@ -211,6 +235,7 @@ impl<T> fmt::Debug for InstrumentBuilder<'_, T> {
211235
.field("name", &self.name)
212236
.field("description", &self.description)
213237
.field("unit", &self.unit)
238+
.field("cardinality_limit", &self.cardinality_limit)
214239
.field("kind", &std::any::type_name::<T>())
215240
.finish()
216241
}
@@ -255,6 +280,9 @@ pub struct AsyncInstrumentBuilder<'a, I, M> {
255280
/// Unit of the instrument.
256281
pub unit: Option<Cow<'static, str>>,
257282

283+
/// Cardinality limit for the instrument.
284+
pub cardinality_limit: Option<usize>,
285+
258286
/// Callbacks to be called for this instrument.
259287
pub callbacks: Vec<Callback<M>>,
260288

@@ -269,6 +297,7 @@ impl<'a, I, M> AsyncInstrumentBuilder<'a, I, M> {
269297
name,
270298
description: None,
271299
unit: None,
300+
cardinality_limit: None,
272301
_inst: marker::PhantomData,
273302
callbacks: Vec::new(),
274303
}
@@ -292,6 +321,14 @@ impl<'a, I, M> AsyncInstrumentBuilder<'a, I, M> {
292321
self
293322
}
294323

324+
/// Set the cardinality limit for this instrument.
325+
/// Setting cardinality limit is optional. By default, the limit will be set
326+
/// to 2000.
327+
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
328+
self.cardinality_limit = Some(limit);
329+
self
330+
}
331+
295332
/// Set the callback to be called for this instrument.
296333
pub fn with_callback<F>(mut self, callback: F) -> Self
297334
where
@@ -340,6 +377,7 @@ where
340377
.field("name", &self.name)
341378
.field("description", &self.description)
342379
.field("unit", &self.unit)
380+
.field("cardinality_limit", &self.cardinality_limit)
343381
.field("kind", &std::any::type_name::<I>())
344382
.field("callbacks_len", &self.callbacks.len())
345383
.finish()

0 commit comments

Comments
 (0)