diff --git a/CHANGELOG.md b/CHANGELOG.md index f144fb02b30..73833bce84c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## main / unreleased +* [ENHANCEMENT] Used frontend MaxExemplars config as single source of truth for exemplar limits. Added a safety cap at the traceql engine entry points. [#6515](https://github.com/grafana/tempo/pull/6515) (@zhxiaogg) * [CHANGE] Set default `max_result_limit` for search to 256*1024 [#6525](https://github.com/grafana/tempo/pull/6525) (@zhxiaogg) * [CHANGE] **BREAKING CHANGE** Remove Opencensus receiver [#6523](https://github.com/grafana/tempo/pull/6523) (@javiermolinar) * [CHANGE] Upgrade Tempo to Go 1.26.0 [#6443](https://github.com/grafana/tempo/pull/6443) (@stoewer) diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index f86f9e1ad82..ae0f6e79954 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -51,6 +51,12 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp return err } + // Normalize exemplars before combiner creation: 0 (unspecified) or above the configured + // maximum defaults to MaxExemplars. + if req.Exemplars == 0 || req.Exemplars > cfg.Metrics.Sharder.MaxExemplars { + req.Exemplars = cfg.Metrics.Sharder.MaxExemplars + } + traceql.AlignRequest(req) // the end time cutoff is applied here because it has to be done before combiner creation @@ -133,6 +139,12 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper return httpInvalidRequest(err), nil } + // Normalize exemplars before combiner creation: 0 (unspecified) or above the configured + // maximum defaults to MaxExemplars. + if queryRangeReq.Exemplars == 0 || queryRangeReq.Exemplars > cfg.Metrics.Sharder.MaxExemplars { + queryRangeReq.Exemplars = cfg.Metrics.Sharder.MaxExemplars + } + traceql.AlignRequest(queryRangeReq) // the end time cutoff is applied here because it has to be done before combiner creation diff --git a/modules/frontend/metrics_query_range_handler_test.go b/modules/frontend/metrics_query_range_handler_test.go index eb1e17e504c..3d1f8d45f74 100644 --- a/modules/frontend/metrics_query_range_handler_test.go +++ b/modules/frontend/metrics_query_range_handler_test.go @@ -508,6 +508,133 @@ func TestQueryRangeHandlerWithEndCutoff(t *testing.T) { }) } +// TestQueryRangeHandlerExemplarNormalization verifies that exemplars from shard responses are +// kept in the final response when the client omits req.Exemplars (sends 0). Before the fix, +// the frontend combiner was created with req.Exemplars=0 which caused it to immediately discard +// all exemplars returned by backend shards. +func TestQueryRangeHandlerExemplarNormalization(t *testing.T) { + start := uint64(1100 * time.Second) + end := uint64(1200 * time.Second) + step := uint64(100 * time.Second) + // Exemplar timestamp in the middle of the query range (milliseconds) + exemplarTS := int64(1150 * 1000) + + mockResp := &tempopb.QueryRangeResponse{ + Metrics: &tempopb.SearchMetrics{InspectedTraces: 1, InspectedBytes: 1}, + Series: []*tempopb.TimeSeries{ + { + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, + Samples: []tempopb.Sample{ + {TimestampMs: 1100_000, Value: 1}, + {TimestampMs: 1200_000, Value: 2}, + }, + Exemplars: []tempopb.Exemplar{ + {TimestampMs: exemplarTS, Value: 1.5}, + }, + }, + }, + } + + makeRequest := func(exemplars uint32) *http.Request { + httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil) + httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{ + Query: "{} | rate()", + Start: start, + End: end, + Step: step, + Exemplars: exemplars, + }, "") + ctx := user.InjectOrgID(httpReq.Context(), "foo") + return httpReq.WithContext(ctx) + } + + t.Run("client omits exemplars defaults to cfg.MaxExemplars", func(t *testing.T) { + f := frontendWithSettings(t, &mockRoundTripper{ + responseFn: func() proto.Message { return mockResp }, + }, nil, nil, nil, func(c *Config, _ *overrides.Config) { + c.Metrics.Sharder.Interval = time.Hour + c.Metrics.Sharder.MaxExemplars = 10 + }) + + httpResp := httptest.NewRecorder() + f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0)) + require.Equal(t, 200, httpResp.Code) + + actualResp := &tempopb.QueryRangeResponse{} + require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp)) + + var total int + for _, s := range actualResp.Series { + total += len(s.Exemplars) + } + assert.Greater(t, total, 0, "exemplars should be kept when client omits exemplars and MaxExemplars > 0") + }) + + t.Run("exemplars disabled when cfg.MaxExemplars is zero", func(t *testing.T) { + f := frontendWithSettings(t, &mockRoundTripper{ + responseFn: func() proto.Message { return mockResp }, + }, nil, nil, nil, func(c *Config, _ *overrides.Config) { + c.Metrics.Sharder.Interval = time.Hour + c.Metrics.Sharder.MaxExemplars = 0 + }) + + httpResp := httptest.NewRecorder() + f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0)) + require.Equal(t, 200, httpResp.Code) + + actualResp := &tempopb.QueryRangeResponse{} + require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp)) + + for _, s := range actualResp.Series { + assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is disabled") + } + }) + + t.Run("client requests exemplars but MaxExemplars is zero disables them", func(t *testing.T) { + f := frontendWithSettings(t, &mockRoundTripper{ + responseFn: func() proto.Message { return mockResp }, + }, nil, nil, nil, func(c *Config, _ *overrides.Config) { + c.Metrics.Sharder.Interval = time.Hour + c.Metrics.Sharder.MaxExemplars = 0 + }) + + httpResp := httptest.NewRecorder() + f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(5)) // client requests exemplars + require.Equal(t, 200, httpResp.Code) + + actualResp := &tempopb.QueryRangeResponse{} + require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp)) + + for _, s := range actualResp.Series { + assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is zero even if client requests them") + } + }) + + t.Run("client-specified exemplars capped to cfg.MaxExemplars", func(t *testing.T) { + f := frontendWithSettings(t, &mockRoundTripper{ + responseFn: func() proto.Message { return mockResp }, + }, nil, nil, nil, func(c *Config, _ *overrides.Config) { + c.Metrics.Sharder.Interval = time.Hour + c.Metrics.Sharder.MaxExemplars = 10 + }) + + httpResp := httptest.NewRecorder() + f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(1000)) // request more than cfg cap + require.Equal(t, 200, httpResp.Code) + + actualResp := &tempopb.QueryRangeResponse{} + require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp)) + + var total int + for _, s := range actualResp.Series { + total += len(s.Exemplars) + } + assert.Greater(t, total, 0, "exemplars should be kept when client requests more than cfg cap") + }) +} + // mockRoundTripperWithCapture is a mitm helper that captures query range requests type mockRoundTripperWithCapture struct { rt mockRoundTripper diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 7824e72843c..157e527daf0 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -48,7 +48,7 @@ type QueryRangeSharderConfig struct { // QueryBackendAfter determines when to query backend storage vs ingesters only. QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"` Interval time.Duration `yaml:"interval,omitempty"` - MaxExemplars int `yaml:"max_exemplars,omitempty"` + MaxExemplars uint32 `yaml:"max_exemplars,omitempty"` MaxResponseSeries int `yaml:"max_response_series,omitempty"` StreamingShards int `yaml:"streaming_shards,omitempty"` } @@ -116,15 +116,10 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline traceql.AlignRequest(req) - var maxExemplars uint32 // Instant queries must not compute exemplars - if !s.instantMode && s.cfg.MaxExemplars > 0 { - maxExemplars = req.Exemplars - if maxExemplars == 0 || maxExemplars > uint32(s.cfg.MaxExemplars) { - maxExemplars = uint32(s.cfg.MaxExemplars) // Enforce configuration - } + if s.instantMode { + req.Exemplars = 0 } - req.Exemplars = maxExemplars // if a limit is being enforced, honor the request if it is less than the limit // else set it to max limit diff --git a/modules/frontend/metrics_query_range_sharder_test.go b/modules/frontend/metrics_query_range_sharder_test.go index ac128dcef7e..64631015599 100644 --- a/modules/frontend/metrics_query_range_sharder_test.go +++ b/modules/frontend/metrics_query_range_sharder_test.go @@ -307,6 +307,17 @@ func TestExemplarsCutoff(t *testing.T) { expectedBeforeCut uint32 expectedAfterCut uint32 }{ + { + // Instant queries zero exemplars before calling exemplarsCutoff; both sides must be 0. + name: "zero exemplars (instant mode) spanning cutoff", + req: tempopb.QueryRangeRequest{ + Start: uint64(cutoff.Add(-20 * time.Minute).UnixNano()), + End: uint64(now.UnixNano()), + Exemplars: 0, + }, + expectedBeforeCut: 0, + expectedAfterCut: 0, + }, { // When all data is after the cutoff, all exemplars should go to the 'after' portion name: "all data after cutoff", diff --git a/pkg/traceql/ast_metrics.go b/pkg/traceql/ast_metrics.go index 53154b8f4f9..c523e5623d5 100644 --- a/pkg/traceql/ast_metrics.go +++ b/pkg/traceql/ast_metrics.go @@ -159,7 +159,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode } } else { innerAggFunc = func() RangeAggregator { - return NewStepAggregator(q.Start, q.End, q.Step, innerAgg) + return NewStepAggregator(q, innerAgg) } } @@ -249,7 +249,7 @@ func exemplarAttribute(a Attribute) func(Span) (float64, uint64) { func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) { // Currently all metrics are summed by job to produce // intermediate results. This will change when adding min/max/topk/etc - a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp, maxExemplars) + a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp) } func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) { @@ -258,7 +258,7 @@ func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) { a.seriesAgg = NewHistogramAggregator(q, a.floats, q.Exemplars) default: // These are simple additions by series - a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp, q.Exemplars) + a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp) } } diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index 04ce64dcfa0..2c577b11647 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -12,9 +12,11 @@ import ( "sync" "time" + "github.com/go-kit/log/level" "github.com/grafana/tempo/pkg/tempopb" commonv1proto "github.com/grafana/tempo/pkg/tempopb/common/v1" "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/log" "github.com/prometheus/prometheus/model/labels" ) @@ -22,8 +24,10 @@ const ( internalLabelMetaType = "__meta_type" internalMetaTypeCount = "__count" internalLabelBucket = "__bucket" - maxExemplars = 100 maxExemplarsPerBucket = 2 + // maxExemplars is a safety cap applied at the engine entry points to bound memory + // usage regardless of what the caller requests. + maxExemplars uint32 = 100000 // NormalNaN is a quiet NaN. This is also math.NaN(). normalNaN uint64 = 0x7ff8000000000001 ) @@ -535,9 +539,9 @@ type StepAggregator struct { var _ RangeAggregator = (*StepAggregator)(nil) -func NewStepAggregator(start, end, step uint64, innerAgg func() VectorAggregator) *StepAggregator { +func NewStepAggregator(q *tempopb.QueryRangeRequest, innerAgg func() VectorAggregator) *StepAggregator { const instant = false // never used for instant queries - mapper := NewIntervalMapper(start, end, step, instant) + mapper := NewIntervalMapper(q.Start, q.End, q.Step, instant) intervals := mapper.IntervalCount() vectors := make([]VectorAggregator, intervals) for i := range vectors { @@ -547,8 +551,8 @@ func NewStepAggregator(start, end, step uint64, innerAgg func() VectorAggregator return &StepAggregator{ intervalMapper: mapper, vectors: vectors, - exemplars: make([]Exemplar, 0, maxExemplars), - exemplarBuckets: newExemplarBucketSet(maxExemplars, start, end, step, instant), + exemplars: make([]Exemplar, 0, q.Exemplars), + exemplarBuckets: newExemplarBucketSet(q.Exemplars, q.Start, q.End, q.Step, instant), } } @@ -931,6 +935,11 @@ func (u *UngroupedAggregator) Series() SeriesSet { } func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest, mode AggregateMode) (*MetricsFrontendEvaluator, error) { + if req.Exemplars > maxExemplars { + level.Warn(log.Logger).Log("msg", "capping exemplars to safety limit", "requested", req.Exemplars, "cap", maxExemplars) + req.Exemplars = maxExemplars + } + if req.Start <= 0 { return nil, fmt.Errorf("start required") } @@ -974,6 +983,11 @@ func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest, // example if the datasource is replication factor=1 or only a single block then we know there // aren't duplicates, and we can make some optimizations. func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, timeOverlapCutoff float64, allowUnsafeQueryHints bool) (*MetricsEvaluator, error) { + if req.Exemplars > maxExemplars { + level.Warn(log.Logger).Log("msg", "capping exemplars to safety limit", "requested", req.Exemplars, "cap", maxExemplars) + req.Exemplars = maxExemplars + } + if req.Start <= 0 { return nil, fmt.Errorf("start required") } @@ -1465,7 +1479,7 @@ type SimpleAggregator struct { initWithNaN bool } -func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp, exemplars uint32) *SimpleAggregator { +func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp) *SimpleAggregator { var initWithNaN bool var f func(existingValue float64, newValue float64) float64 switch op { @@ -1488,7 +1502,7 @@ func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp, e } return &SimpleAggregator{ ss: make(SeriesSet), - exemplarBuckets: newExemplarBucketSet(exemplars, req.Start, req.End, req.Step, IsInstant(req)), + exemplarBuckets: newExemplarBucketSet(req.Exemplars, req.Start, req.End, req.Step, IsInstant(req)), intervalMapper: NewIntervalMapperFromReq(req), aggregationFunc: f, initWithNaN: initWithNaN, diff --git a/pkg/traceql/engine_metrics_average.go b/pkg/traceql/engine_metrics_average.go index b35349119e6..bc387c1b954 100644 --- a/pkg/traceql/engine_metrics_average.go +++ b/pkg/traceql/engine_metrics_average.go @@ -39,11 +39,11 @@ func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode Aggr weightedAverageSeries: make(map[SeriesMapKey]*averageSeries), len: intervalMapper.IntervalCount(), intervalMapper: intervalMapper, - exemplarBuckets: newExemplarBucketSet(maxExemplars, q.Start, q.End, q.Step, IsInstant(q)), + exemplarBuckets: newExemplarBucketSet(q.Exemplars, q.Start, q.End, q.Step, IsInstant(q)), } if mode == AggregateModeRaw { - a.agg = newAvgOverTimeSpanAggregator(a.attr, a.by, q.Start, q.End, q.Step, IsInstant(q)) + a.agg = newAvgOverTimeSpanAggregator(a.attr, a.by, q.Start, q.End, q.Step, IsInstant(q), q.Exemplars) } a.mode = mode @@ -202,7 +202,7 @@ type averageSeries struct { Exemplars []Exemplar } -func newAverageSeries(l int, lenExemplars int, labels Labels) averageSeries { +func newAverageSeries(l int, lenExemplars uint32, labels Labels) averageSeries { s := averageSeries{ values: make([]averageValue, l), labels: labels, @@ -301,7 +301,7 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) { countPosMapper[key] = i } else if !ok { lbls := getLabels(ts.Labels, "") - s := newAverageSeries(b.len, len(ts.Exemplars), lbls) + s := newAverageSeries(b.len, uint32(len(ts.Exemplars)), lbls) b.weightedAverageSeries[key] = &s } } @@ -406,6 +406,7 @@ type avgOverTimeSpanAggregator[F FastStatic, S StaticVals] struct { intervalMapper IntervalMapper start, end, step uint64 instant bool + exemplars uint32 // Data series map[F]avgOverTimeSeries[S] @@ -416,7 +417,7 @@ type avgOverTimeSpanAggregator[F FastStatic, S StaticVals] struct { var _ SpanAggregator = (*avgOverTimeSpanAggregator[FastStatic1, StaticVals1])(nil) -func newAvgOverTimeSpanAggregator(attr Attribute, by []Attribute, start, end, step uint64, instant bool) SpanAggregator { +func newAvgOverTimeSpanAggregator(attr Attribute, by []Attribute, start, end, step uint64, instant bool, exemplars uint32) SpanAggregator { lookups := make([][]Attribute, len(by)) for i, attr := range by { if attr.Intrinsic == IntrinsicNone && attr.Scope == AttributeScopeNone { @@ -435,19 +436,19 @@ func newAvgOverTimeSpanAggregator(attr Attribute, by []Attribute, start, end, st switch aggNum { case 2: - return newAvgAggregator[FastStatic2, StaticVals2](attr, by, lookups, start, end, step, instant) + return newAvgAggregator[FastStatic2, StaticVals2](attr, by, lookups, start, end, step, instant, exemplars) case 3: - return newAvgAggregator[FastStatic3, StaticVals3](attr, by, lookups, start, end, step, instant) + return newAvgAggregator[FastStatic3, StaticVals3](attr, by, lookups, start, end, step, instant, exemplars) case 4: - return newAvgAggregator[FastStatic4, StaticVals4](attr, by, lookups, start, end, step, instant) + return newAvgAggregator[FastStatic4, StaticVals4](attr, by, lookups, start, end, step, instant, exemplars) case 5: - return newAvgAggregator[FastStatic5, StaticVals5](attr, by, lookups, start, end, step, instant) + return newAvgAggregator[FastStatic5, StaticVals5](attr, by, lookups, start, end, step, instant, exemplars) default: - return newAvgAggregator[FastStatic1, StaticVals1](attr, by, lookups, start, end, step, instant) + return newAvgAggregator[FastStatic1, StaticVals1](attr, by, lookups, start, end, step, instant, exemplars) } } -func newAvgAggregator[F FastStatic, S StaticVals](attr Attribute, by []Attribute, lookups [][]Attribute, start, end, step uint64, instant bool) SpanAggregator { +func newAvgAggregator[F FastStatic, S StaticVals](attr Attribute, by []Attribute, lookups [][]Attribute, start, end, step uint64, instant bool, exemplars uint32) SpanAggregator { var fn func(s Span) float64 switch attr { @@ -475,6 +476,7 @@ func newAvgAggregator[F FastStatic, S StaticVals](attr Attribute, by []Attribute end: end, step: step, instant: instant, + exemplars: exemplars, } } @@ -581,8 +583,8 @@ func (g *avgOverTimeSpanAggregator[F, S]) getSeries(span Span) avgOverTimeSeries intervals := g.intervalMapper.IntervalCount() s = avgOverTimeSeries[S]{ vals: g.buf.vals, - average: newAverageSeries(intervals, maxExemplars, nil), - exemplarBuckets: newExemplarBucketSet(maxExemplars, g.start, g.end, g.step, g.instant), + average: newAverageSeries(intervals, g.exemplars, nil), + exemplarBuckets: newExemplarBucketSet(g.exemplars, g.start, g.end, g.step, g.instant), initialized: true, } g.series[g.buf.fast] = s diff --git a/pkg/traceql/engine_metrics_average_test.go b/pkg/traceql/engine_metrics_average_test.go index 29abfe116ed..6bf3f21ec51 100644 --- a/pkg/traceql/engine_metrics_average_test.go +++ b/pkg/traceql/engine_metrics_average_test.go @@ -9,6 +9,36 @@ import ( "github.com/stretchr/testify/require" ) +// TestAvgOverTimeExemplarLimit verifies that avgOverTimeSpanAggregator caps +// per-series exemplar collection at req.Exemplars (set via q.Exemplars). +func TestAvgOverTimeExemplarLimit(t *testing.T) { + const limit = uint32(5) + req := &tempopb.QueryRangeRequest{ + Start: uint64(1 * time.Second), + End: uint64(100 * time.Second), + Step: uint64(10 * time.Second), + Query: "{ } | avg_over_time(duration) by (span.service)", + Exemplars: limit, + } + + a := newAverageOverTimeMetricsAggregator(IntrinsicDurationAttribute, []Attribute{NewAttribute("service")}) + a.init(req, AggregateModeRaw) + + // Send limit*3 spans, all for service=a, with distinct timestamps spread across the range. + for i := 0; i < int(limit)*3; i++ { + ts := uint64(i+1) * uint64(time.Second) + span := newMockSpan(nil).WithStartTime(ts).WithSpanString("service", "a").WithDuration(uint64(time.Second)) + a.observe(span) + a.observeExemplar(span) + } + + result := a.result(1.0) + serviceA, ok := result[LabelsFromArgs(".service", "a").MapKey()] + require.True(t, ok, "series for service=a must exist") + require.LessOrEqual(t, len(serviceA.Exemplars), int(limit), "exemplars must be capped at req.Exemplars") + require.Greater(t, len(serviceA.Exemplars), 0, "at least one exemplar must be collected") +} + func TestAvgOverTime(t *testing.T) { req := &tempopb.QueryRangeRequest{ Start: 1, diff --git a/pkg/traceql/engine_metrics_compare.go b/pkg/traceql/engine_metrics_compare.go index e3390ab9497..bc46d82b07e 100644 --- a/pkg/traceql/engine_metrics_compare.go +++ b/pkg/traceql/engine_metrics_compare.go @@ -40,6 +40,7 @@ type MetricsCompare struct { baselineExemplars []Exemplar selectionExemplars []Exemplar seriesAgg SeriesAggregator + maxExemplars uint32 // Runtime fields to avoid allocating closures // and escaping to the heap when we call span.AllAttributesFunc. @@ -82,9 +83,10 @@ func (m *MetricsCompare) init(q *tempopb.QueryRangeRequest, mode AggregateMode) m.selections = make(map[Attribute]map[StaticMapKey]*staticWithCounts) m.baselineTotals = make(map[Attribute][]float64) m.selectionTotals = make(map[Attribute][]float64) + m.maxExemplars = q.Exemplars case AggregateModeSum: - m.seriesAgg = NewSimpleCombiner(q, sumAggregation, maxExemplars) + m.seriesAgg = NewSimpleCombiner(q, sumAggregation) return case AggregateModeFinal: @@ -185,7 +187,7 @@ func (m *MetricsCompare) observeExemplar(span Span) { isSelection := m.isSelection(span, st) // Exemplars - if len(m.baselineExemplars) >= maxExemplars || len(m.selectionExemplars) >= maxExemplars { + if uint32(len(m.baselineExemplars)) >= m.maxExemplars || uint32(len(m.selectionExemplars)) >= m.maxExemplars { return } diff --git a/pkg/traceql/engine_metrics_compare_test.go b/pkg/traceql/engine_metrics_compare_test.go index 98f8e401550..d9babd9ee40 100644 --- a/pkg/traceql/engine_metrics_compare_test.go +++ b/pkg/traceql/engine_metrics_compare_test.go @@ -123,6 +123,37 @@ func TestCompare(t *testing.T) { requireEqualSeriesSets(t, expected, ss) } +// TestMetricsCompareObserveExemplarLimit verifies that MetricsCompare.observeExemplar +// caps baseline and selection exemplars at m.maxExemplars (set from req.Exemplars). +func TestMetricsCompareObserveExemplarLimit(t *testing.T) { + const limit = 5 + req := &tempopb.QueryRangeRequest{ + Start: 1, + End: uint64(100 * time.Second), + Step: uint64(10 * time.Second), + Query: `{ } | compare({ .service="selected" })`, + Exemplars: limit, + } + + a := newMetricsCompare(newSpansetFilter( + newBinaryOperation(OpEqual, + NewAttribute("service"), + NewStaticString("selected"), + )), 10, 0, 0) + a.init(req, AggregateModeRaw) + + // Alternate baseline and selection spans so both slices fill up. + for i := 0; i < limit*3; i++ { + ts := uint64(i+1) * uint64(time.Second) + a.observeExemplar(newMockSpan(nil).WithStartTime(ts).WithSpanString("service", "baseline")) + a.observeExemplar(newMockSpan(nil).WithStartTime(ts).WithSpanString("service", "selected")) + } + + require.LessOrEqual(t, len(a.baselineExemplars), limit, "baseline exemplars must not exceed req.Exemplars") + require.LessOrEqual(t, len(a.selectionExemplars), limit, "selection exemplars must not exceed req.Exemplars") + require.Greater(t, len(a.baselineExemplars)+len(a.selectionExemplars), 0, "some exemplars must be collected") +} + func TestCompareScalesResults(t *testing.T) { // Test that the compare function correctly multiplies results based on sampling multiplier // The multiplication happens in result() method: s.Values[i] *= multiplier diff --git a/pkg/traceql/engine_metrics_test.go b/pkg/traceql/engine_metrics_test.go index 98c53a33bd1..2e0b8a81755 100644 --- a/pkg/traceql/engine_metrics_test.go +++ b/pkg/traceql/engine_metrics_test.go @@ -522,6 +522,60 @@ func TestCompileMetricsQueryRangeExemplarsHint(t *testing.T) { } } +func TestCompileMetricsQueryRangeExemplarsSafetyCap(t *testing.T) { + tcs := []struct { + name string + exemplars uint32 + expected int + }{ + {"below cap", maxExemplars - 1, int(maxExemplars - 1)}, + {"at cap", maxExemplars, int(maxExemplars)}, + {"above cap", maxExemplars + 1, int(maxExemplars)}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + req := &tempopb.QueryRangeRequest{ + Query: "{} | rate()", + Start: 1, + End: 2, + Step: 1, + Exemplars: tc.exemplars, + } + eval, err := NewEngine().CompileMetricsQueryRange(req, 0, false) + require.NoError(t, err) + require.Equal(t, tc.expected, eval.maxExemplars) + }) + } +} + +func TestCompileMetricsQueryRangeNonRawExemplarsSafetyCap(t *testing.T) { + tcs := []struct { + name string + exemplars uint32 + expected uint32 + }{ + {"below cap", maxExemplars - 1, maxExemplars - 1}, + {"at cap", maxExemplars, maxExemplars}, + {"above cap", maxExemplars + 1, maxExemplars}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + req := &tempopb.QueryRangeRequest{ + Query: "{} | rate()", + Start: 1, + End: 2, + Step: 1, + Exemplars: tc.exemplars, + } + _, err := NewEngine().CompileMetricsQueryRangeNonRaw(req, AggregateModeSum) + require.NoError(t, err) + require.Equal(t, tc.expected, req.Exemplars) + }) + } +} + func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) { tc := map[string]struct { q string @@ -2746,12 +2800,64 @@ func TestTiesInBottomK(t *testing.T) { }) } +// TestSimpleAggregatorExemplarLimit verifies that SimpleAggregator (used in AggregateModeSum) +// respects req.Exemplars, including values above the old hardcoded limit of 100. +func TestSimpleAggregatorExemplarLimit(t *testing.T) { + tcs := []struct { + name string + exemplars uint32 + sendCount int + minExpected int // at least this many exemplars must appear in the result + }{ + {"below_old_limit", 50, 200, 1}, + {"at_old_limit", 100, 200, 1}, + // Proves the fix: before the change, this was capped at 100. + {"above_old_limit", 150, 200, 101}, + {"large", 200, 300, 1}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + req := &tempopb.QueryRangeRequest{ + Start: uint64(1 * time.Second), + End: uint64(time.Duration(tc.sendCount+1) * time.Second), + Step: uint64(time.Second), + Exemplars: tc.exemplars, + } + + agg := NewSimpleCombiner(req, sumAggregation) + + // Build exemplars spread evenly across the time range (ms timestamps). + startMs := req.Start / uint64(time.Millisecond) + endMs := req.End / uint64(time.Millisecond) + exemplars := make([]tempopb.Exemplar, tc.sendCount) + for i := range exemplars { + ts := startMs + uint64(i)*(endMs-startMs)/uint64(tc.sendCount) + exemplars[i] = tempopb.Exemplar{TimestampMs: int64(ts), Value: float64(i)} //nolint: gosec // G115 + } + + agg.Combine([]*tempopb.TimeSeries{{ + Labels: []commonv1proto.KeyValue{{Key: "service", Value: &commonv1proto.AnyValue{Value: &commonv1proto.AnyValue_StringValue{StringValue: "test"}}}}, + Samples: []tempopb.Sample{{TimestampMs: int64(startMs), Value: 1.0}}, //nolint: gosec // G115 + Exemplars: exemplars, + }}) + + total := 0 + for _, ts := range agg.Results() { + total += len(ts.Exemplars) + } + require.LessOrEqual(t, total, int(tc.exemplars), "exemplar count must not exceed req.Exemplars") + require.GreaterOrEqual(t, total, tc.minExpected, "exemplar count must meet minimum expected") + }) + } +} + func TestHistogramAggregator(t *testing.T) { req := &tempopb.QueryRangeRequest{ Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()), End: uint64(time.Now().UnixNano()), Step: uint64(15 * time.Second.Nanoseconds()), - Exemplars: maxExemplars, + Exemplars: 100, } const seriesCount = 6 @@ -2963,7 +3069,7 @@ func BenchmarkHistogramAggregator_Combine(b *testing.B) { Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()), End: uint64(time.Now().UnixNano()), Step: uint64(15 * time.Second.Nanoseconds()), - Exemplars: maxExemplars, + Exemplars: 100, } const seriesCount = 6 @@ -2995,7 +3101,7 @@ func BenchmarkHistogramAggregator_Results(b *testing.B) { Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()), End: uint64(time.Now().UnixNano()), Step: uint64(15 * time.Second.Nanoseconds()), - Exemplars: maxExemplars, + Exemplars: 100, } benchmarks := []struct { diff --git a/pkg/traceql/util.go b/pkg/traceql/util.go index f418fc6dab4..ae492b13418 100644 --- a/pkg/traceql/util.go +++ b/pkg/traceql/util.go @@ -83,9 +83,6 @@ type limitedBucketSet struct { // newBucketSet creates a new bucket set for the given time range // start and end are in nanoseconds func newBucketSet(exemplars uint32, start, end uint64) *limitedBucketSet { - if exemplars > maxExemplars || exemplars == 0 { - exemplars = maxExemplars - } buckets := exemplars / maxExemplarsPerBucket if buckets == 0 { // edge case for few exemplars buckets = 1 @@ -96,7 +93,7 @@ func newBucketSet(exemplars uint32, start, end uint64) *limitedBucketSet { end /= uint64(time.Millisecond.Nanoseconds()) //nolint: gosec // G115 interval := end - start - bucketWidth := interval / uint64(buckets) + bucketWidth := max(interval/uint64(buckets), 1) return &limitedBucketSet{ sz: int(buckets), diff --git a/pkg/traceql/util_test.go b/pkg/traceql/util_test.go index ad118fa4e54..00a62a64093 100644 --- a/pkg/traceql/util_test.go +++ b/pkg/traceql/util_test.go @@ -68,8 +68,8 @@ func TestBucketSet_Bucket(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - bs := newBucketSet(maxExemplars, tc.start*uint64(time.Second.Nanoseconds()), tc.end*uint64(time.Second.Nanoseconds())) //nolint: gosec // G115 - actual := bs.bucket(tc.ts * uint64(time.Second.Milliseconds())) //nolint: gosec // G115 + bs := newBucketSet(100, tc.start*uint64(time.Second.Nanoseconds()), tc.end*uint64(time.Second.Nanoseconds())) //nolint: gosec // G115 + actual := bs.bucket(tc.ts * uint64(time.Second.Milliseconds())) //nolint: gosec // G115 assert.Equal(t, tc.expected, actual) }) } @@ -80,7 +80,7 @@ func TestBucketSet_Instant(t *testing.T) { end := uint64(1010) step := end - start - bs := newExemplarBucketSet(maxExemplars, start, end, step, true) + bs := newExemplarBucketSet(100, start, end, step, true) assert.True(t, bs.testTotal()) assert.True(t, bs.addAndTest(start-1)) assert.True(t, bs.addAndTest(start)) @@ -88,7 +88,7 @@ func TestBucketSet_Instant(t *testing.T) { } func TestBucketSet(t *testing.T) { - s := newBucketSet(maxExemplars, uint64(100*time.Second.Nanoseconds()), uint64(199*time.Second.Nanoseconds())) //nolint: gosec // G115 + s := newBucketSet(100, uint64(100*time.Second.Nanoseconds()), uint64(199*time.Second.Nanoseconds())) //nolint: gosec // G115 // Add two to each bucket for ts := uint64(100); ts <= 199; ts += 2 { // 100 in total @@ -115,3 +115,23 @@ func TestBucketSetSingleExemplar(t *testing.T) { assert.False(t, s.addAndTest(tsMilli), "ts=%d should be added to bucket", 100) assert.True(t, s.addAndTest(tsMilli), "ts=%d should not be added to bucket", 100) } + +func TestBucketSetLargeExemplarsShortRange(t *testing.T) { + // exemplars=10000 → buckets=5000, but the range is only 1 second (1000ms interval). + // Without the guard, bucketWidth=0 causes a divide-by-zero in bucket(). + s := newBucketSet(10000, 0, uint64(time.Second.Nanoseconds())) //nolint: gosec // G115 + assert.NotPanics(t, func() { + s.addAndTest(500) // 500ms into the range + }) + assert.False(t, s.testTotal(), "should not be full after one exemplar") +} + +func TestBucketSetZeroExemplars(t *testing.T) { + // exemplars=0 means collection is disabled: testTotal() should always return true + // and no exemplars should ever be accepted. + s := newBucketSet(0, uint64(100*time.Second.Nanoseconds()), uint64(200*time.Second.Nanoseconds())) //nolint: gosec // G115 + assert.True(t, s.testTotal(), "bucket set with 0 exemplars should always report full") + tsMilli := uint64(150 * time.Second.Milliseconds()) //nolint: gosec // G115 + assert.True(t, s.addAndTest(tsMilli), "adding to a 0-exemplar bucket set should be rejected") + assert.Equal(t, 0, s.len()) +}