Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased

* [BUGFIX] Apply exemplars hint end-to-end and fix safety cap bypass in metrics queries. [#6559](https://github.com/grafana/tempo/pull/6559) (@zhxiaogg)
* [ENHANCEMENT] Used frontend MaxExemplars config as single source of truth for exemplar limits. Added a safety cap at the traceql engine entry points. [#6515](https://github.com/grafana/tempo/pull/6515) (@zhxiaogg)
* [CHANGE] Set default `max_result_limit` for search to 256*1024 [#6525](https://github.com/grafana/tempo/pull/6525) (@zhxiaogg)

# v2.10.1
Expand Down
32 changes: 31 additions & 1 deletion modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
return err
}

if err := normalizeRequestExemplars(req, cfg.Metrics.Sharder.MaxExemplars); err != nil {
return err
}

traceql.AlignRequest(req)

// the end time cutoff is applied here because it has to be done before combiner creation
Expand Down Expand Up @@ -132,7 +136,11 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
if err := validateQueryRangeReq(cfg, queryRangeReq); err != nil {
return httpInvalidRequest(err), nil
}
req = api.BuildQueryRangeRequest(req, queryRangeReq, "")

if err := normalizeRequestExemplars(queryRangeReq, cfg.Metrics.Sharder.MaxExemplars); err != nil {
return httpInvalidRequest(err), nil
}

traceql.AlignRequest(queryRangeReq)

// the end time cutoff is applied here because it has to be done before combiner creation
Expand Down Expand Up @@ -173,6 +181,28 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
})
}

// normalizeRequestExemplars resolves the final exemplar limit for a query range request.
// It applies the exemplars hint from the TraceQL query if present, overriding the value
// from the HTTP parameter. req.Exemplars is then capped to maxExemplars.
// If no hint is set and req.Exemplars is 0 (unspecified), it defaults to maxExemplars.
func normalizeRequestExemplars(req *tempopb.QueryRangeRequest, maxExemplars uint32) error {
expr, err := traceql.Parse(req.Query)
if err != nil {
return err
}
if v, ok := expr.Hints.GetInt(traceql.HintExemplars, false); ok {
req.Exemplars = uint32(max(v, 0)) //nolint: gosec // G115
} else if v, ok := expr.Hints.GetBool(traceql.HintExemplars, false); ok && !v {
req.Exemplars = 0
} else if req.Exemplars == 0 {
req.Exemplars = maxExemplars
}
if req.Exemplars > maxExemplars {
req.Exemplars = maxExemplars
}
return nil
}

func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryRangeRequest, resp *tempopb.QueryRangeResponse, err error) {
if resp == nil {
level.Info(logger).Log(
Expand Down
247 changes: 247 additions & 0 deletions modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,253 @@ func TestQueryRangeHandlerWithEndCutoff(t *testing.T) {
})
}

// TestQueryRangeHandlerExemplarNormalization verifies that exemplars from shard responses are
// kept in the final response when the client omits req.Exemplars (sends 0). Before the fix,
// the frontend combiner was created with req.Exemplars=0 which caused it to immediately discard
// all exemplars returned by backend shards.
func TestQueryRangeHandlerExemplarNormalization(t *testing.T) {
start := uint64(1100 * time.Second)
end := uint64(1200 * time.Second)
step := uint64(100 * time.Second)
// Exemplar timestamp in the middle of the query range (milliseconds)
exemplarTS := int64(1150 * 1000)

mockResp := &tempopb.QueryRangeResponse{
Metrics: &tempopb.SearchMetrics{InspectedTraces: 1, InspectedBytes: 1},
Series: []*tempopb.TimeSeries{
{
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{TimestampMs: 1100_000, Value: 1},
{TimestampMs: 1200_000, Value: 2},
},
Exemplars: []tempopb.Exemplar{
{TimestampMs: exemplarTS, Value: 1.5},
},
},
},
}

makeRequestWithQuery := func(query string, exemplars uint32) *http.Request {
httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil)
httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{
Query: query,
Start: start,
End: end,
Step: step,
Exemplars: exemplars,
}, "")
ctx := user.InjectOrgID(httpReq.Context(), "foo")
return httpReq.WithContext(ctx)
}
makeRequest := func(exemplars uint32) *http.Request {
return makeRequestWithQuery("{} | rate()", exemplars)
}

t.Run("client omits exemplars defaults to cfg.MaxExemplars", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 10
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0))
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

var total int
for _, s := range actualResp.Series {
total += len(s.Exemplars)
}
assert.Greater(t, total, 0, "exemplars should be kept when client omits exemplars and MaxExemplars > 0")
})

t.Run("exemplars disabled when cfg.MaxExemplars is zero", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 0
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(0))
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

for _, s := range actualResp.Series {
assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is disabled")
}
})

t.Run("client requests exemplars but MaxExemplars is zero disables them", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 0
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(5)) // client requests exemplars
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

for _, s := range actualResp.Series {
assert.Empty(t, s.Exemplars, "exemplars should be empty when MaxExemplars is zero even if client requests them")
}
})

t.Run("client-specified exemplars capped to cfg.MaxExemplars", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 10
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequest(1000)) // request more than cfg cap
require.Equal(t, 200, httpResp.Code)

actualResp := &tempopb.QueryRangeResponse{}
require.NoError(t, jsonpb.Unmarshal(httpResp.Body, actualResp))

var total int
for _, s := range actualResp.Series {
total += len(s.Exemplars)
}
assert.Greater(t, total, 0, "exemplars should be kept when client requests more than cfg cap")
})

t.Run("invalid query returns 400", func(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{
responseFn: func() proto.Message { return mockResp },
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.Metrics.Sharder.MaxExemplars = 10
})

httpResp := httptest.NewRecorder()
f.MetricsQueryRangeHandler.ServeHTTP(httpResp, makeRequestWithQuery("this is not valid traceql", 0))
require.Equal(t, http.StatusBadRequest, httpResp.Code)
})
}

func TestNormalizeRequestExemplars(t *testing.T) {
const maxExemplars = uint32(20)

tcs := []struct {
name string
query string
reqExemplars uint32
wantExemplars uint32
wantErr bool
}{
// no hint: 0 (unspecified) defaults to maxExemplars
{
name: "no hint, unspecified defaults to max",
query: "{} | rate()",
reqExemplars: 0,
wantExemplars: maxExemplars,
},
// no hint: explicit value is preserved
{
name: "no hint, explicit value preserved",
query: "{} | rate()",
reqExemplars: 10,
wantExemplars: 10,
},
// no hint: value above max is capped
{
name: "no hint, above max capped",
query: "{} | rate()",
reqExemplars: maxExemplars + 5,
wantExemplars: maxExemplars,
},
// hint int: overrides req.Exemplars
{
name: "hint int overrides req value",
query: "{} | rate() with(exemplars=7)",
reqExemplars: 10,
wantExemplars: 7,
},
// hint int: overrides unspecified (0) req value without applying default
{
name: "hint int=0 disables exemplars",
query: "{} | rate() with(exemplars=0)",
reqExemplars: 0,
wantExemplars: 0,
},
// hint int: capped to maxExemplars when above max
{
name: "hint int above max is capped",
query: "{} | rate() with(exemplars=9999)",
reqExemplars: 0,
wantExemplars: maxExemplars,
},
// hint bool=false: disables exemplars regardless of req value
{
name: "hint false disables exemplars",
query: "{} | rate() with(exemplars=false)",
reqExemplars: 10,
wantExemplars: 0,
},
// hint bool=true: no-op, falls through to req.Exemplars / default path
{
name: "hint true is no-op, falls through to default",
query: "{} | rate() with(exemplars=true)",
reqExemplars: 0,
wantExemplars: maxExemplars,
},
// invalid query returns an error
{
name: "invalid query returns error",
query: "this is not valid traceql",
wantErr: true,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Query: tc.query,
Exemplars: tc.reqExemplars,
}
err := normalizeRequestExemplars(req, maxExemplars)
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.wantExemplars, req.Exemplars)
})
}
}

func TestQueryRangeGRPCHandlerInvalidQueryReturnsError(t *testing.T) {
f := frontendWithSettings(t, &mockRoundTripper{}, nil, nil, nil)

srv := newMockStreamingServer[*tempopb.QueryRangeResponse]("foo", nil)
err := f.MetricsQueryRange(&tempopb.QueryRangeRequest{
Query: "this is not valid traceql",
Start: 1,
End: 2,
Step: 1,
}, srv)
require.Error(t, err)
}

// mockRoundTripperWithCapture is a mitm helper that captures query range requests
type mockRoundTripperWithCapture struct {
rt mockRoundTripper
Expand Down
11 changes: 3 additions & 8 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
// QueryBackendAfter determines when to query backend storage vs ingesters only.
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
MaxExemplars int `yaml:"max_exemplars,omitempty"`
MaxExemplars uint32 `yaml:"max_exemplars,omitempty"`
MaxResponseSeries int `yaml:"max_response_series,omitempty"`
StreamingShards int `yaml:"streaming_shards,omitempty"`
}
Expand Down Expand Up @@ -113,15 +113,10 @@

traceql.AlignRequest(req)

var maxExemplars uint32
// Instant queries must not compute exemplars
if !s.instantMode && s.cfg.MaxExemplars > 0 {
maxExemplars = req.Exemplars
if maxExemplars == 0 || maxExemplars > uint32(s.cfg.MaxExemplars) {
maxExemplars = uint32(s.cfg.MaxExemplars) // Enforce configuration
}
if s.instantMode {
req.Exemplars = 0

Check notice on line 118 in modules/frontend/metrics_query_range_sharder.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 118 is not covered by tests
}
req.Exemplars = maxExemplars

// if a limit is being enforced, honor the request if it is less than the limit
// else set it to max limit
Expand Down
11 changes: 11 additions & 0 deletions modules/frontend/metrics_query_range_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ func TestExemplarsCutoff(t *testing.T) {
expectedBeforeCut uint32
expectedAfterCut uint32
}{
{
// Instant queries zero exemplars before calling exemplarsCutoff; both sides must be 0.
name: "zero exemplars (instant mode) spanning cutoff",
req: tempopb.QueryRangeRequest{
Start: uint64(cutoff.Add(-20 * time.Minute).UnixNano()),
End: uint64(now.UnixNano()),
Exemplars: 0,
},
expectedBeforeCut: 0,
expectedAfterCut: 0,
},
{
// When all data is after the cutoff, all exemplars should go to the 'after' portion
name: "all data after cutoff",
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@
// Compile the raw version of the query for head and wal blocks
// These aren't cached and we put them all into the same evaluator
// for efficiency.
rawEval, err := e.CompileMetricsQueryRange(req, int(req.Exemplars), timeOverlapCutoff, unsafe)
rawEval, err := e.CompileMetricsQueryRange(req, timeOverlapCutoff, unsafe)

Check notice on line 551 in modules/generator/instance.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 551 is not covered by tests
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestProcessorDoesNotRace(t *testing.T) {
End: uint64(time.Now().UnixNano()),
Step: uint64(30 * time.Second),
}
me, err := e.CompileMetricsQueryRange(qr, 0, 0, false)
me, err := e.CompileMetricsQueryRange(qr, 0, false)
require.NoError(t, err)

je, err := e.CompileMetricsQueryRangeNonRaw(qr, traceql.AggregateModeSum)
Expand Down
Loading
Loading