Skip to content

Commit fb01f1d

Browse files
authored
[Metrics Generator] Handle invalid utf8 (grafana#5980)
* drop invalid utf8 Signed-off-by: Joe Elliott <[email protected]> * validate in CloseAndBuildLabels() Signed-off-by: Joe Elliott <[email protected]> * changelog Signed-off-by: Joe Elliott <[email protected]> * removed dumbest comment ever Signed-off-by: Joe Elliott <[email protected]> --------- Signed-off-by: Joe Elliott <[email protected]>
1 parent b64c297 commit fb01f1d

File tree

12 files changed

+138
-93
lines changed

12 files changed

+138
-93
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* [ENHANCEMENT] Add partition ownership metric to live-store [#5815](https://github.com/grafana/tempo/pull/5815) (@javiermolinar, @mapno)
3232
* [ENHANCEMENT] Update list of intrinsics returned by search tags endpoint [#5857](https://github.com/grafana/tempo/pull/5857) (@andreasgerstmayr)
3333
* [ENHANCEMENT] Add "Requests Executed" panel for querier metrics in the operational dashboard. [#5848](https://github.com/grafana/tempo/pull/5848) (@anglerfishlyy)
34+
* [ENHANCEMENT] Drop and metric invalid utf8 for all metrics in metrics generator instead of sending invalid data. [#5980](https://github.com/grafana/tempo/pull/5980) (@joe-elliott)
3435
* [ENHANCEMENT] Add support for application/protobuf in frontend endpoints [#5865](https://github.com/grafana/tempo/pull/5865) (@oleg-kozliuk-grafana)
3536
* [BUGFIX] Fix compactor to properly consider SSE-KMS information during metadata copy [#5774](https://github.com/grafana/tempo/pull/5774) (@steffsas)
3637
* [BUGFIX] Fix incorrect results in TraceQL compare() caused by potential hash collision of string array attributes [#5835](https://github.com/grafana/tempo/pull/5835) (@mdisibio)

modules/generator/instance.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var (
5959
Namespace: "tempo",
6060
Name: "metrics_generator_spans_discarded_total",
6161
Help: "The total number of discarded spans received per tenant",
62-
}, []string{"tenant", "reason"})
62+
}, []string{"tenant", "reason", "processor"})
6363
metricSkippedProcessorPushes = promauto.NewCounterVec(prometheus.CounterOpts{
6464
Namespace: "tempo",
6565
Name: "metrics_generator_metrics_generation_skipped_processor_pushes_total",
@@ -331,14 +331,15 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
331331
var err error
332332
switch processorName {
333333
case processor.SpanMetricsName:
334-
filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered)
335-
invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8)
334+
filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered, processor.SpanMetricsName)
335+
invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8, processor.SpanMetricsName)
336336
newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter, invalidUTF8Counter)
337337
if err != nil {
338338
return err
339339
}
340340
case processor.ServiceGraphsName:
341-
newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger)
341+
invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8, processor.ServiceGraphsName)
342+
newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger, invalidUTF8Counter)
342343
case processor.LocalBlocksName:
343344
p, err := localblocks.New(cfg.LocalBlocks, i.instanceID, i.traceWAL, i.writer, i.overrides)
344345
if err != nil {
@@ -357,7 +358,8 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
357358
}
358359
}
359360
case processor.HostInfoName:
360-
newProcessor, err = hostinfo.New(cfg.HostInfo, i.registry, i.logger)
361+
invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8, processor.HostInfoName)
362+
newProcessor, err = hostinfo.New(cfg.HostInfo, i.registry, i.logger, invalidUTF8Counter)
361363
if err != nil {
362364
return err
363365
}
@@ -581,7 +583,7 @@ func (i *instance) QueryRange(ctx context.Context, req *tempopb.QueryRangeReques
581583
func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) {
582584
metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested))
583585
metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount))
584-
metricSpansDiscarded.WithLabelValues(i.instanceID, reasonOutsideTimeRangeSlack).Add(float64(expiredSpanCount))
586+
metricSpansDiscarded.WithLabelValues(i.instanceID, reasonOutsideTimeRangeSlack, "all").Add(float64(expiredSpanCount))
585587
}
586588

587589
// shutdown stops the instance and flushes any remaining data. After shutdown

modules/generator/processor/hostinfo/processor.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55

66
"github.com/go-kit/log"
7+
"github.com/prometheus/client_golang/prometheus"
8+
79
"github.com/grafana/tempo/modules/generator/processor"
810
"github.com/grafana/tempo/modules/generator/registry"
911
"github.com/grafana/tempo/pkg/tempopb"
@@ -20,9 +22,10 @@ type Processor struct {
2022
Cfg Config
2123
logger log.Logger
2224

23-
gauge registry.Gauge
24-
registry registry.Registry
25-
metricName string
25+
gauge registry.Gauge
26+
registry registry.Registry
27+
metricName string
28+
invalidUTF8Counter prometheus.Counter
2629
}
2730

2831
func (p *Processor) Name() string {
@@ -53,20 +56,26 @@ func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest)
5356
builder := p.registry.NewLabelBuilder()
5457
builder.Add(hostIdentifierAttr, hostID)
5558
builder.Add(hostSourceAttr, hostSource)
56-
p.gauge.Set(builder.CloseAndBuildLabels(), 1)
59+
labels, validUTF8 := builder.CloseAndBuildLabels()
60+
if !validUTF8 {
61+
p.invalidUTF8Counter.Inc()
62+
continue
63+
}
64+
p.gauge.Set(labels, 1)
5765
}
5866
}
5967
}
6068

6169
func (p *Processor) Shutdown(_ context.Context) {}
6270

63-
func New(cfg Config, reg registry.Registry, logger log.Logger) (*Processor, error) {
71+
func New(cfg Config, reg registry.Registry, logger log.Logger, invalidUTF8Counter prometheus.Counter) (*Processor, error) {
6472
p := &Processor{
65-
Cfg: cfg,
66-
logger: logger,
67-
registry: reg,
68-
metricName: cfg.MetricName,
69-
gauge: reg.NewGauge(cfg.MetricName),
73+
Cfg: cfg,
74+
logger: logger,
75+
registry: reg,
76+
metricName: cfg.MetricName,
77+
gauge: reg.NewGauge(cfg.MetricName),
78+
invalidUTF8Counter: invalidUTF8Counter,
7079
}
7180
return p, nil
7281
}

modules/generator/processor/hostinfo/processor_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@ import (
55
"strconv"
66
"testing"
77

8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/prometheus/model/labels"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
813
"github.com/grafana/tempo/modules/generator/processor"
914
"github.com/grafana/tempo/modules/generator/registry"
1015
"github.com/grafana/tempo/pkg/tempopb"
1116
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
1217
trace_v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
1318
"github.com/grafana/tempo/pkg/util/test"
14-
"github.com/prometheus/prometheus/model/labels"
15-
"github.com/stretchr/testify/assert"
16-
"github.com/stretchr/testify/require"
1719
)
1820

1921
func TestHostInfo(t *testing.T) {
2022
testRegistry := registry.NewTestRegistry()
2123
cfg := Config{}
2224

2325
cfg.RegisterFlagsAndApplyDefaults("", nil)
24-
p, err := New(cfg, testRegistry, nil)
26+
p, err := New(cfg, testRegistry, nil, prometheus.NewCounter(prometheus.CounterOpts{}))
2527
require.NoError(t, err)
2628
require.Equal(t, p.Name(), processor.HostInfoName)
2729
defer p.Shutdown(context.TODO())
@@ -59,7 +61,7 @@ func TestHostInfoHostSource(t *testing.T) {
5961

6062
cfg := Config{}
6163
cfg.RegisterFlagsAndApplyDefaults("", nil)
62-
p, err := New(cfg, testRegistry, nil)
64+
p, err := New(cfg, testRegistry, nil, prometheus.NewCounter(prometheus.CounterOpts{}))
6365
require.NoError(t, err)
6466
require.Equal(t, p.Name(), processor.HostInfoName)
6567
defer p.Shutdown(context.TODO())

modules/generator/processor/servicegraphs/servicegraphs.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ type Processor struct {
8484
metricDroppedSpans prometheus.Counter
8585
metricTotalEdges prometheus.Counter
8686
metricExpiredEdges prometheus.Counter
87+
invalidUTF8Counter prometheus.Counter
8788
logger log.Logger
8889
}
8990

90-
func New(cfg Config, tenant string, reg registry.Registry, logger log.Logger) gen.Processor {
91+
func New(cfg Config, tenant string, reg registry.Registry, logger log.Logger, invalidUTF8Counter prometheus.Counter) gen.Processor {
9192
if cfg.EnableVirtualNodeLabel {
9293
cfg.Dimensions = append(cfg.Dimensions, virtualNodeLabel)
9394
}
@@ -106,6 +107,7 @@ func New(cfg Config, tenant string, reg registry.Registry, logger log.Logger) ge
106107
metricDroppedSpans: metricDroppedSpans.WithLabelValues(tenant),
107108
metricTotalEdges: metricTotalEdges.WithLabelValues(tenant),
108109
metricExpiredEdges: metricExpiredEdges.WithLabelValues(tenant),
110+
invalidUTF8Counter: invalidUTF8Counter,
109111
logger: log.With(logger, "component", "service-graphs"),
110112
}
111113

@@ -348,7 +350,11 @@ func (p *Processor) onComplete(e *store.Edge) {
348350
}
349351
}
350352

351-
registryLabelValues := builder.CloseAndBuildLabels()
353+
registryLabelValues, validUTF8 := builder.CloseAndBuildLabels()
354+
if !validUTF8 {
355+
p.invalidUTF8Counter.Inc()
356+
return
357+
}
352358

353359
p.serviceGraphRequestTotal.Inc(registryLabelValues, 1*e.SpanMultiplier)
354360
if e.Failed {

modules/generator/processor/servicegraphs/servicegraphs_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/go-kit/log"
1313
"github.com/gogo/protobuf/jsonpb"
14+
"github.com/prometheus/client_golang/prometheus"
1415
"github.com/prometheus/prometheus/model/labels"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
@@ -46,7 +47,7 @@ func TestServiceGraphs(t *testing.T) {
4647
cfg.Dimensions = []string{"beast", "god"}
4748
cfg.EnableMessagingSystemLatencyHistogram = true
4849

49-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
50+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
5051
defer p.Shutdown(context.Background())
5152

5253
request, err := loadTestData("testdata/trace-with-queue-database.json")
@@ -128,7 +129,7 @@ func TestServiceGraphs_prefixDimensions(t *testing.T) {
128129
cfg.Dimensions = []string{"beast", "god"}
129130
cfg.EnableClientServerPrefix = true
130131

131-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
132+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
132133
defer p.Shutdown(context.Background())
133134

134135
request, err := loadTestData("testdata/trace-with-queue-database.json")
@@ -159,7 +160,7 @@ func TestServiceGraphs_MessagingSystemLatencyHistogram(t *testing.T) {
159160
cfg.Dimensions = []string{"beast", "god"}
160161
cfg.EnableMessagingSystemLatencyHistogram = true
161162

162-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
163+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
163164
defer p.Shutdown(context.Background())
164165

165166
request, err := loadTestData("testdata/trace-with-queue-database.json")
@@ -183,7 +184,7 @@ func TestServiceGraphs_failedRequests(t *testing.T) {
183184
cfg := Config{}
184185
cfg.RegisterFlagsAndApplyDefaults("", nil)
185186

186-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
187+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
187188
defer p.Shutdown(context.Background())
188189

189190
request, err := loadTestData("testdata/trace-with-failed-requests.json")
@@ -215,7 +216,7 @@ func TestServiceGraphs_tooManySpansErr(t *testing.T) {
215216
cfg := Config{}
216217
cfg.RegisterFlagsAndApplyDefaults("", nil)
217218
cfg.MaxItems = 1
218-
p := New(cfg, "test", &testRegistry, log.NewNopLogger())
219+
p := New(cfg, "test", &testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
219220
defer p.Shutdown(context.Background())
220221

221222
request, err := loadTestData("testdata/trace-with-queue-database.json")
@@ -235,7 +236,7 @@ func TestServiceGraphs_virtualNodes(t *testing.T) {
235236
cfg.HistogramBuckets = []float64{0.04}
236237
cfg.Wait = time.Nanosecond
237238

238-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
239+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
239240
defer p.Shutdown(context.Background())
240241

241242
request, err := loadTestData("testdata/trace-with-virtual-nodes.json")
@@ -283,7 +284,7 @@ func TestServiceGraphs_virtualNodesExtraLabelsForUninstrumentedServices(t *testi
283284
cfg.EnableVirtualNodeLabel = true
284285
cfg.Wait = time.Nanosecond
285286

286-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
287+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
287288
defer p.Shutdown(context.Background())
288289

289290
request, err := loadTestData("testdata/trace-with-virtual-nodes.json")
@@ -326,7 +327,7 @@ func TestServiceGraphs_expiredEdges(t *testing.T) {
326327

327328
const tenant = "expired-edge-test"
328329

329-
p := New(cfg, tenant, testRegistry, log.NewNopLogger())
330+
p := New(cfg, tenant, testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
330331
defer p.Shutdown(context.Background())
331332

332333
/*
@@ -474,7 +475,7 @@ func TestServiceGraphs_databaseVirtualNodes(t *testing.T) {
474475
cfg.HistogramBuckets = []float64{0.04}
475476
cfg.EnableMessagingSystemLatencyHistogram = true
476477

477-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
478+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
478479
defer p.Shutdown(context.Background())
479480

480481
request, err := loadTestData(tc.fixturePath)
@@ -511,7 +512,7 @@ func TestServiceGraphs_prefixDimensionsAndEnableExtraLabels(t *testing.T) {
511512
cfg.EnableClientServerPrefix = true
512513
cfg.EnableVirtualNodeLabel = true
513514

514-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
515+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
515516
defer p.Shutdown(context.Background())
516517

517518
request, err := loadTestData("testdata/trace-with-queue-database.json")
@@ -551,7 +552,7 @@ func BenchmarkServiceGraphs(b *testing.B) {
551552
cfg.HistogramBuckets = []float64{0.04}
552553
cfg.Dimensions = []string{"beast", "god"}
553554

554-
p := New(cfg, "test", testRegistry, log.NewNopLogger())
555+
p := New(cfg, "test", testRegistry, log.NewNopLogger(), prometheus.NewCounter(prometheus.CounterOpts{}))
555556
defer p.Shutdown(context.Background())
556557

557558
request, err := loadTestData("testdata/trace-with-queue-database.json")

modules/generator/processor/spanmetrics/spanmetrics.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/grafana/tempo/modules/generator/validation"
99
"github.com/prometheus/client_golang/prometheus"
10-
"github.com/prometheus/common/model"
1110

1211
gen "github.com/grafana/tempo/modules/generator/processor"
1312
processor_util "github.com/grafana/tempo/modules/generator/processor/util"
@@ -197,8 +196,8 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst
197196

198197
spanMultiplier := processor_util.GetSpanMultiplier(p.Cfg.SpanMultiplierKey, span, rs)
199198

200-
registryLabelValues := builder.CloseAndBuildLabels()
201-
if !registryLabelValues.IsValid(model.UTF8Validation) {
199+
registryLabelValues, validUTF8 := builder.CloseAndBuildLabels()
200+
if !validUTF8 {
202201
p.invalidUTF8Counter.Inc()
203202
return
204203
}
@@ -231,7 +230,11 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst
231230
targetInfoBuilder.Add(gen.DimInstance, instanceID)
232231
}
233232

234-
targetInfoRegistryLabelValues := targetInfoBuilder.CloseAndBuildLabels()
233+
targetInfoRegistryLabelValues, validUTF8 := targetInfoBuilder.CloseAndBuildLabels()
234+
if !validUTF8 {
235+
p.invalidUTF8Counter.Inc()
236+
return
237+
}
235238

236239
// only register target info if at least (job or instance) AND one other attribute are present
237240
// TODO - We can move this check to the top

0 commit comments

Comments
 (0)