Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions sdk/metric/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,33 @@ func (e AggregationBase2ExponentialHistogram) err() error {
}
return nil
}

// AggregationBandpass is an [Aggregation] that only records measurements to
// its Downstream Aggregation if they fall within the specified High and Low
// bounds. Measurements outside of these bounds are dropped.
type AggregationBandpass struct {
High, Low float64
Downstream Aggregation
}

var _ Aggregation = AggregationBandpass{}

// copy returns a deep copy of d.
func (d AggregationBandpass) copy() Aggregation {
return AggregationBandpass{
High: d.High,
Low: d.Low,
Downstream: d.Downstream.copy(),
}
}

// err returns an error for any misconfiguration.
func (d AggregationBandpass) err() error {
if d.High <= d.Low {
return fmt.Errorf("%w: high bound %f is less than or equal to low bound %f", errAgg, d.High, d.Low)
}
if d.Downstream == nil {
return fmt.Errorf("%w: downstream aggregation is nil", errAgg)
}
return nil
}
24 changes: 24 additions & 0 deletions sdk/metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,30 @@ func ExampleNewView_attributeFilter() {
)
}

func ExampleNewView_filterMeasurements() {
// Create a view that restricts values recorded by the "slo" instrument in
// the "http" instrumentation library to be within the range of 0 to 100.
view := metric.NewView(
metric.Instrument{
Name: "slo",
Scope: instrumentation.Scope{Name: "http"},
},
metric.Stream{
Aggregation: metric.AggregationBandpass{
High: 100,
Low: 0,
Downstream: metric.AggregationDefault{},
},
},
)

// The created view can then be registered with the OpenTelemetry metric
// SDK using the WithView option.
_ = metric.NewMeterProvider(
metric.WithView(view),
)
}

func ExampleNewView_exponentialHistogram() {
// Create a view that makes the "latency" instrument from the "http"
// instrumentation library to be reported as an exponential histogram.
Expand Down
25 changes: 25 additions & 0 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
Expand Down Expand Up @@ -524,6 +525,20 @@ func (i *inserter[N]) aggregateFunc(
noSum = true
}
meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum)
case AggregationBandpass:
d := a.Downstream
if d == nil {
// This is an error if the downstream is nil, this is a
// misconfiguration. However, "fail gracefully" and return nil in
// and out to signify the drop aggregator.
err = fmt.Errorf(
"%w: bandpass aggregation requires a downstream aggregator",
errAgg,
)
} else {
meas, comp, err = i.aggregateFunc(b, d, kind)
meas = bandpass(meas, a.High, a.Low)
}

default:
err = errUnknownAggregation
Expand All @@ -532,6 +547,16 @@ func (i *inserter[N]) aggregateFunc(
return meas, comp, err
}

func bandpass[N int64 | float64](m aggregate.Measure[N], high, low float64) aggregate.Measure[N] {
return func(ctx context.Context, n N, s attribute.Set) {
if n > N(high) || n < N(low) {
// Drop the measurement if it is outside of the bandpass.
return
}
m(ctx, n, s)
}
}

// isAggregatorCompatible checks if the aggregation can be used by the instrument.
// Current compatibility:
//
Expand Down
Loading