Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Used frontend MaxExemplars config as single source of truth for exemplar limits. Added a safety cap at the traceql engine entry points. [#6515](https://github.com/grafana/tempo/pull/6515) (@zhxiaogg)
* [CHANGE] Set default `max_result_limit` for search to 256*1024 [#6525](https://github.com/grafana/tempo/pull/6525) (@zhxiaogg)
* [CHANGE] **BREAKING CHANGE** Remove Opencensus receiver [#6523](https://github.com/grafana/tempo/pull/6523) (@javiermolinar)
* [CHANGE] Upgrade Tempo to Go 1.26.0 [#6443](https://github.com/grafana/tempo/pull/6443) (@stoewer)
Expand Down
12 changes: 12 additions & 0 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
return err
}

// Normalize exemplars before combiner creation: 0 (unspecified) or above the configured
// maximum defaults to MaxExemplars.
if req.Exemplars == 0 || req.Exemplars > cfg.Metrics.Sharder.MaxExemplars {
req.Exemplars = cfg.Metrics.Sharder.MaxExemplars
}

Check notice on line 58 in modules/frontend/metrics_query_range_handler.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 56-58 are not covered by tests

traceql.AlignRequest(req)

// the end time cutoff is applied here because it has to be done before combiner creation
Expand Down Expand Up @@ -133,6 +139,12 @@
return httpInvalidRequest(err), nil
}

// Normalize exemplars before combiner creation: 0 (unspecified) or above the configured
// maximum defaults to MaxExemplars.
if queryRangeReq.Exemplars == 0 || queryRangeReq.Exemplars > cfg.Metrics.Sharder.MaxExemplars {
queryRangeReq.Exemplars = cfg.Metrics.Sharder.MaxExemplars
}

traceql.AlignRequest(queryRangeReq)

// the end time cutoff is applied here because it has to be done before combiner creation
Expand Down
127 changes: 127 additions & 0 deletions modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,133 @@ func TestQueryRangeHandlerWithEndCutoff(t *testing.T) {
})
}

// TestQueryRangeHandlerExemplarNormalization verifies that exemplars from shard responses are
// kept in the final response when the client omits req.Exemplars (sends 0). Before the fix,
// the frontend combiner was created with req.Exemplars=0 which caused it to immediately discard
// all exemplars returned by backend shards.
func TestQueryRangeHandlerExemplarNormalization(t *testing.T) {
start := uint64(1100 * time.Second)
end := uint64(1200 * time.Second)
step := uint64(100 * time.Second)
// Exemplar timestamp in the middle of the query range (milliseconds)
exemplarTS := int64(1150 * 1000)

mockResp := &tempopb.QueryRangeResponse{
Metrics: &tempopb.SearchMetrics{InspectedTraces: 1, InspectedBytes: 1},
Series: []*tempopb.TimeSeries{
{
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{TimestampMs: 1100_000, Value: 1},
{TimestampMs: 1200_000, Value: 2},
},
Exemplars: []tempopb.Exemplar{
{TimestampMs: exemplarTS, Value: 1.5},
},
},
},
}

makeRequest := func(exemplars uint32) *http.Request {
httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil)
httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{
Query: "{} | rate()",
Start: start,
End: end,
Step: step,
Exemplars: exemplars,
}, "")
ctx := user.InjectOrgID(httpReq.Context(), "foo")
return httpReq.WithContext(ctx)
}

t.Run("client omits exemplars defaults to cfg.MaxExemplars", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 10
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0))
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

var total int
for _, s := range actualResp.Series {
total += len(s.Exemplars)
}
assert.Greater(t, total, 0, "exemplars should be kept when client omits exemplars and MaxExemplars > 0")
})

t.Run("exemplars disabled when cfg.MaxExemplars is zero", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 0
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0))
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

for _, s := range actualResp.Series {
assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is disabled")
}
})

t.Run("client requests exemplars but MaxExemplars is zero disables them", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 0
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(5)) // client requests exemplars
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

for _, s := range actualResp.Series {
assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is zero even if client requests them")
}
})

t.Run("client-specified exemplars capped to cfg.MaxExemplars", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 10
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(1000)) // request more than cfg cap
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

var total int
for _, s := range actualResp.Series {
total += len(s.Exemplars)
}
assert.Greater(t, total, 0, "exemplars should be kept when client requests more than cfg cap")
})
}

// mockRoundTripperWithCapture is a mitm helper that captures query range requests
type mockRoundTripperWithCapture struct {
rt mockRoundTripper
Expand Down
11 changes: 3 additions & 8 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
// QueryBackendAfter determines when to query backend storage vs ingesters only.
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
MaxExemplars int `yaml:"max_exemplars,omitempty"`
MaxExemplars uint32 `yaml:"max_exemplars,omitempty"`
MaxResponseSeries int `yaml:"max_response_series,omitempty"`
StreamingShards int `yaml:"streaming_shards,omitempty"`
}
Expand Down Expand Up @@ -116,15 +116,10 @@

traceql.AlignRequest(req)

var maxExemplars uint32
// Instant queries must not compute exemplars
if !s.instantMode && s.cfg.MaxExemplars > 0 {
maxExemplars = req.Exemplars
if maxExemplars == 0 || maxExemplars > uint32(s.cfg.MaxExemplars) {
maxExemplars = uint32(s.cfg.MaxExemplars) // Enforce configuration
}
if s.instantMode {
req.Exemplars = 0

Check notice on line 121 in modules/frontend/metrics_query_range_sharder.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 121 is not covered by tests
}
req.Exemplars = maxExemplars

// if a limit is being enforced, honor the request if it is less than the limit
// else set it to max limit
Expand Down
11 changes: 11 additions & 0 deletions modules/frontend/metrics_query_range_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ func TestExemplarsCutoff(t *testing.T) {
expectedBeforeCut uint32
expectedAfterCut uint32
}{
{
// Instant queries zero exemplars before calling exemplarsCutoff; both sides must be 0.
name: "zero exemplars (instant mode) spanning cutoff",
req: tempopb.QueryRangeRequest{
Start: uint64(cutoff.Add(-20 * time.Minute).UnixNano()),
End: uint64(now.UnixNano()),
Exemplars: 0,
},
expectedBeforeCut: 0,
expectedAfterCut: 0,
},
{
// When all data is after the cutoff, all exemplars should go to the 'after' portion
name: "all data after cutoff",
Expand Down
6 changes: 3 additions & 3 deletions pkg/traceql/ast_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
}
} else {
innerAggFunc = func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
return NewStepAggregator(q, innerAgg)
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func exemplarAttribute(a Attribute) func(Span) (float64, uint64) {
func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp, maxExemplars)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}

func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
Expand All @@ -258,7 +258,7 @@ func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
a.seriesAgg = NewHistogramAggregator(q, a.floats, q.Exemplars)
default:
// These are simple additions by series
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp, q.Exemplars)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}
}

Expand Down
28 changes: 21 additions & 7 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ import (
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/tempopb"
commonv1proto "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
"github.com/prometheus/prometheus/model/labels"
)

const (
internalLabelMetaType = "__meta_type"
internalMetaTypeCount = "__count"
internalLabelBucket = "__bucket"
maxExemplars = 100
maxExemplarsPerBucket = 2
// maxExemplars is a safety cap applied at the engine entry points to bound memory
// usage regardless of what the caller requests.
maxExemplars uint32 = 100000
// NormalNaN is a quiet NaN. This is also math.NaN().
normalNaN uint64 = 0x7ff8000000000001
)
Expand Down Expand Up @@ -535,9 +539,9 @@ type StepAggregator struct {

var _ RangeAggregator = (*StepAggregator)(nil)

func NewStepAggregator(start, end, step uint64, innerAgg func() VectorAggregator) *StepAggregator {
func NewStepAggregator(q *tempopb.QueryRangeRequest, innerAgg func() VectorAggregator) *StepAggregator {
const instant = false // never used for instant queries
mapper := NewIntervalMapper(start, end, step, instant)
mapper := NewIntervalMapper(q.Start, q.End, q.Step, instant)
intervals := mapper.IntervalCount()
vectors := make([]VectorAggregator, intervals)
for i := range vectors {
Expand All @@ -547,8 +551,8 @@ func NewStepAggregator(start, end, step uint64, innerAgg func() VectorAggregator
return &StepAggregator{
intervalMapper: mapper,
vectors: vectors,
exemplars: make([]Exemplar, 0, maxExemplars),
exemplarBuckets: newExemplarBucketSet(maxExemplars, start, end, step, instant),
exemplars: make([]Exemplar, 0, q.Exemplars),
exemplarBuckets: newExemplarBucketSet(q.Exemplars, q.Start, q.End, q.Step, instant),
}
}

Expand Down Expand Up @@ -931,6 +935,11 @@ func (u *UngroupedAggregator) Series() SeriesSet {
}

func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest, mode AggregateMode) (*MetricsFrontendEvaluator, error) {
if req.Exemplars > maxExemplars {
level.Warn(log.Logger).Log("msg", "capping exemplars to safety limit", "requested", req.Exemplars, "cap", maxExemplars)
req.Exemplars = maxExemplars
}

if req.Start <= 0 {
return nil, fmt.Errorf("start required")
}
Expand Down Expand Up @@ -974,6 +983,11 @@ func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest,
// example if the datasource is replication factor=1 or only a single block then we know there
// aren't duplicates, and we can make some optimizations.
func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, timeOverlapCutoff float64, allowUnsafeQueryHints bool) (*MetricsEvaluator, error) {
if req.Exemplars > maxExemplars {
level.Warn(log.Logger).Log("msg", "capping exemplars to safety limit", "requested", req.Exemplars, "cap", maxExemplars)
req.Exemplars = maxExemplars
}

if req.Start <= 0 {
return nil, fmt.Errorf("start required")
}
Expand Down Expand Up @@ -1465,7 +1479,7 @@ type SimpleAggregator struct {
initWithNaN bool
}

func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp, exemplars uint32) *SimpleAggregator {
func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp) *SimpleAggregator {
var initWithNaN bool
var f func(existingValue float64, newValue float64) float64
switch op {
Expand All @@ -1488,7 +1502,7 @@ func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp, e
}
return &SimpleAggregator{
ss: make(SeriesSet),
exemplarBuckets: newExemplarBucketSet(exemplars, req.Start, req.End, req.Step, IsInstant(req)),
exemplarBuckets: newExemplarBucketSet(req.Exemplars, req.Start, req.End, req.Step, IsInstant(req)),
intervalMapper: NewIntervalMapperFromReq(req),
aggregationFunc: f,
initWithNaN: initWithNaN,
Expand Down
Loading
Loading