From 2bb1d5ed233d2d4cdc72baa031b6f7a45a85783a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 21 Jul 2025 12:27:41 -0700 Subject: [PATCH 1/2] Add Bandpass Aggregation Proof-of-concept to allow views to filter measurements. --- sdk/metric/aggregation.go | 30 ++++++++++++++++++++++++++++++ sdk/metric/pipeline.go | 25 +++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/sdk/metric/aggregation.go b/sdk/metric/aggregation.go index e6f5cfb2ad9..23c21242afe 100644 --- a/sdk/metric/aggregation.go +++ b/sdk/metric/aggregation.go @@ -187,3 +187,33 @@ func (e AggregationBase2ExponentialHistogram) err() error { } return nil } + +// AggregationBandpass is an [Aggregation] that only records measurements to +// its Downstream Aggregation if they fall within the specified High and Low +// bounds. Measurements outside of these bounds are dropped. +type AggregationBandpass struct { + High, Low float64 + Downstream Aggregation +} + +var _ Aggregation = AggregationBandpass{} + +// copy returns a deep copy of d. +func (d AggregationBandpass) copy() Aggregation { + return AggregationBandpass{ + High: d.High, + Low: d.Low, + Downstream: d.Downstream.copy(), + } +} + +// err returns an error for any misconfiguration. +func (d AggregationBandpass) err() error { + if d.High <= d.Low { + return fmt.Errorf("%w: high bound %f is less than or equal to low bound %f", errAgg, d.High, d.Low) + } + if d.Downstream == nil { + return fmt.Errorf("%w: downstream aggregation is nil", errAgg) + } + return nil +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 7bdb699cae0..c31b254d514 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -11,6 +11,7 @@ import ( "sync" "sync/atomic" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -524,6 +525,20 @@ func (i *inserter[N]) aggregateFunc( noSum = true } meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum) + case AggregationBandpass: + d := a.Downstream + if d == nil { + // This is an error if the downstream is nil, this is a + // misconfiguration. However, "fail gracefully" and return nil in + // and out to signify the drop aggregator. + err = fmt.Errorf( + "%w: bandpass aggregation requires a downstream aggregator", + errAgg, + ) + } else { + meas, comp, err = i.aggregateFunc(b, d, kind) + meas = bandpass(meas, a.High, a.Low) + } default: err = errUnknownAggregation @@ -532,6 +547,16 @@ func (i *inserter[N]) aggregateFunc( return meas, comp, err } +func bandpass[N int64 | float64](m aggregate.Measure[N], high, low float64) aggregate.Measure[N] { + return func(ctx context.Context, n N, s attribute.Set) { + if n > N(high) || n < N(low) { + // Drop the measurement if it is outside of the bandpass. + return + } + m(ctx, n, s) + } +} + // isAggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: // From 078a1ff3fc03e47fecb2947b4b975198c70b4a09 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 21 Jul 2025 12:33:51 -0700 Subject: [PATCH 2/2] Add example --- sdk/metric/example_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 234bfd65945..006673e8b90 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -219,6 +219,30 @@ func ExampleNewView_attributeFilter() { ) } +func ExampleNewView_filterMeasurements() { + // Create a view that restricts values recorded by the "slo" instrument in + // the "http" instrumentation library to be within the range of 0 to 100. + view := metric.NewView( + metric.Instrument{ + Name: "slo", + Scope: instrumentation.Scope{Name: "http"}, + }, + metric.Stream{ + Aggregation: metric.AggregationBandpass{ + High: 100, + Low: 0, + Downstream: metric.AggregationDefault{}, + }, + }, + ) + + // The created view can then be registered with the OpenTelemetry metric + // SDK using the WithView option. + _ = metric.NewMeterProvider( + metric.WithView(view), + ) +} + func ExampleNewView_exponentialHistogram() { // Create a view that makes the "latency" instrument from the "http" // instrumentation library to be reported as an exponential histogram.