Skip to content

Commit 15fa5b4

Browse files
distributor: log truncated oversized attributes with rate limited logger (#6467)
* distributor: log truncated oversized attributes with rate limited logger Add warn-level logging when attribute keys or values exceed the configured max_attribute_bytes limit. * remove benchmark, no longer useful
1 parent c0a5a3f commit 15fa5b4

File tree

4 files changed

+161
-26
lines changed

4 files changed

+161
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* [ENHANCEMENT] Add new alerts and runbooks entries [#6276](https://github.com/grafana/tempo/pull/6276) (@javiermolinar)
2020
* [ENHANCEMENT] Double the maximum number of dedicated string columns in vParquet5 and update tempo-cli to determine the optimum number for the data [#6282](https://github.com/grafana/tempo/pull/6282) (@mdisibio)
2121
* [ENHANCEMENT] Improve attribute truncating observability [#6400](https://github.com/grafana/tempo/pull/6400) (@javiermolinar)
22+
* [ENHANCEMENT] Log truncated oversized attributes [#6467](https://github.com/grafana/tempo/pull/6467) (@carles-grafana)
2223
* [ENHANCEMENT] Remove live-store partition owner from ring on shutdown to prevent stale owner entries [#6409](https://github.com/grafana/tempo/pull/6409) (@oleg-kozlyuk-grafana)
2324
* [ENHANCEMENT] Improved live store readiness check and added `readiness_target_lag` and `readiness_max_wait` config parameters. Live store will now - if `readiness_target_lag` is set - not report `/ready` until Kafka lag is brought under the specified value [#6238](https://github.com/grafana/tempo/pull/6238) [#6405](https://github.com/grafana/tempo/pull/6405) (@oleg-kozlyuk-grafana, @ruslan-mikhailov)
2425
* [ENHANCEMENT] Expose a new histogram metric to track the jobs per query distribution [#6343](https://github.com/grafana/tempo/pull/6343) (@javiermolinar)

modules/distributor/distributor.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,14 @@ import (
4444
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
4545
"github.com/grafana/tempo/pkg/usagestats"
4646
"github.com/grafana/tempo/pkg/util"
47+
tempo_log "github.com/grafana/tempo/pkg/util/log"
4748
"github.com/grafana/tempo/pkg/validation"
4849
)
4950

5051
const (
5152
distributorRingKey = "distributor"
53+
54+
truncationLogsPerSecond = 1
5255
)
5356

5457
var (
@@ -167,6 +170,13 @@ func (c truncatedAttributesCount) Total() int {
167170
return c.Resource + c.Scope + c.Span + c.Event + c.Link
168171
}
169172

173+
type truncatedAttrInfo struct {
174+
scope string
175+
name string
176+
field string // "key" or "value"
177+
origSize int // original byte length before truncation; 0 means no example captured yet
178+
}
179+
170180
// Distributor coordinates replicates and distribution of log streams.
171181
type Distributor struct {
172182
services.Service
@@ -207,6 +217,8 @@ type Distributor struct {
207217
// Middleware errors are logged but don't fail the push (fail open behavior).
208218
tracePushMiddlewares []TracePushMiddleware
209219

220+
truncationLogger *tempo_log.RateLimitedLogger
221+
210222
// For testing functionality that relies on timing without having to sleep in unit tests.
211223
sleep func(time.Duration)
212224
now func() time.Time
@@ -284,6 +296,7 @@ func New(
284296
overrides: o,
285297
traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
286298
tracePushMiddlewares: cfg.TracePushMiddlewares,
299+
truncationLogger: tempo_log.NewRateLimitedLogger(truncationLogsPerSecond, level.Warn(logger)),
287300
logger: logger,
288301
sleep: time.Sleep,
289302
now: time.Now,
@@ -494,7 +507,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
494507

495508
maxAttributeBytes := d.getMaxAttributeBytes(userID)
496509

497-
ringTokens, rebatchedTraces, truncatedAttributesCount, err := requestsByTraceID(batches, userID, spanCount, maxAttributeBytes)
510+
ringTokens, rebatchedTraces, truncatedAttributesCount, truncationExample, err := requestsByTraceID(batches, userID, spanCount, maxAttributeBytes)
498511
if err != nil {
499512
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
500513
return nil, err
@@ -506,6 +519,17 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
506519
metricAttributesTruncated.WithLabelValues(userID, "span").Add(float64(truncatedAttributesCount.Span))
507520
metricAttributesTruncated.WithLabelValues(userID, "event").Add(float64(truncatedAttributesCount.Event))
508521
metricAttributesTruncated.WithLabelValues(userID, "link").Add(float64(truncatedAttributesCount.Link))
522+
523+
if truncationExample != nil {
524+
d.truncationLogger.Log("msg", "attributes truncated",
525+
"tenant", userID,
526+
"total_truncated", truncatedAttributesCount.Total(),
527+
"max_size_bytes", maxAttributeBytes,
528+
"example_scope", truncationExample.scope,
529+
"example_name", truncationExample.name,
530+
"example_field", truncationExample.field,
531+
"example_orig_size", truncationExample.origSize)
532+
}
509533
}
510534

511535
if d.cfg.IngesterWritePathEnabled {
@@ -727,53 +751,53 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin
727751
}, ring.DoBatchOptions{})
728752
}
729753

730-
// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
731-
// and traces to pass onto the ingesters.
732-
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, truncatedAttributesCount, error) {
754+
// requestsByTraceID groups ResourceSpans by trace ID, producing hash-ring tokens and
755+
// rebatched traces for the ingesters. It truncates oversized attributes and returns
756+
// the first truncation example (if any) for diagnostic logging.
757+
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, truncatedAttributesCount, *truncatedAttrInfo, error) {
733758
const tracesPerBatch = 20 // p50 of internal env
734759
tracesByID := make(map[uint64]*rebatchedTrace, tracesPerBatch)
735760
truncatedCount := truncatedAttributesCount{}
761+
762+
// truncationExample captures one example of a truncated attribute for rate-limited logging.
763+
var truncationExample truncatedAttrInfo
764+
736765
currentTime := uint32(time.Now().Unix())
737766
for _, b := range batches {
738767
spansByILS := make(map[uint64]*v1.ScopeSpans)
739768
// check resource for large attributes
740769
if maxSpanAttrSize > 0 && b.Resource != nil {
741-
resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize)
742-
truncatedCount.Resource += resourceAttrTruncatedCount
770+
truncatedCount.Resource += processAttributes(b.Resource.Attributes, maxSpanAttrSize, &truncationExample, "resource")
743771
}
744772

745773
for _, ils := range b.ScopeSpans {
746774

747775
// check instrumentation for large attributes
748776
if maxSpanAttrSize > 0 && ils.Scope != nil {
749-
scopeAttrTruncatedCount := processAttributes(ils.Scope.Attributes, maxSpanAttrSize)
750-
truncatedCount.Scope += scopeAttrTruncatedCount
777+
truncatedCount.Scope += processAttributes(ils.Scope.Attributes, maxSpanAttrSize, &truncationExample, "scope")
751778
}
752779

753780
for _, span := range ils.Spans {
754781
// check spans for large attributes
755782
if maxSpanAttrSize > 0 {
756-
spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize)
757-
truncatedCount.Span += spanAttrTruncatedCount
783+
truncatedCount.Span += processAttributes(span.Attributes, maxSpanAttrSize, &truncationExample, "span")
758784

759785
// check large attributes for events and links
760786
for _, event := range span.Events {
761-
eventAttrTruncatedCount := processAttributes(event.Attributes, maxSpanAttrSize)
762-
truncatedCount.Event += eventAttrTruncatedCount
787+
truncatedCount.Event += processAttributes(event.Attributes, maxSpanAttrSize, &truncationExample, "event")
763788
}
764789

765790
for _, link := range span.Links {
766-
linkAttrTruncatedCount := processAttributes(link.Attributes, maxSpanAttrSize)
767-
truncatedCount.Link += linkAttrTruncatedCount
791+
truncatedCount.Link += processAttributes(link.Attributes, maxSpanAttrSize, &truncationExample, "link")
768792
}
769793
}
770794
traceID := span.TraceId
771795
if !validation.ValidTraceID(traceID) {
772-
return nil, nil, truncatedAttributesCount{}, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
796+
return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
773797
}
774798

775799
if !validation.ValidSpanID(span.SpanId) {
776-
return nil, nil, truncatedAttributesCount{}, status.Errorf(codes.InvalidArgument, "span ids must be 64 bit and not all zero, received %d bits", len(span.SpanId)*8)
800+
return nil, nil, truncatedAttributesCount{}, nil, status.Errorf(codes.InvalidArgument, "span ids must be 64 bit and not all zero, received %d bits", len(span.SpanId)*8)
777801
}
778802

779803
traceKey := util.HashForTraceID(traceID)
@@ -846,21 +870,32 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma
846870
traces = append(traces, tr)
847871
}
848872

849-
return ringTokens, traces, truncatedCount, nil
873+
if truncationExample.origSize > 0 {
874+
return ringTokens, traces, truncatedCount, &truncationExample, nil
875+
}
876+
return ringTokens, traces, truncatedCount, nil, nil
850877
}
851878

852-
// find and truncate the span attributes that are too large
853-
func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int) int {
879+
// processAttributes finds and truncates attribute keys/values that exceed maxAttrSize.
880+
func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, truncationExample *truncatedAttrInfo, scope string) int {
854881
count := 0
855882
for _, attr := range attributes {
856883
if len(attr.Key) > maxAttrSize {
884+
origSize := len(attr.Key)
857885
attr.Key = attr.Key[:maxAttrSize]
886+
if truncationExample != nil && truncationExample.origSize == 0 { // only capture the first truncation
887+
// name is the truncated prefix; origSize records the full original length.
888+
*truncationExample = truncatedAttrInfo{scope: scope, name: attr.Key, field: "key", origSize: origSize}
889+
}
858890
count++
859891
}
860892

861893
switch value := attr.GetValue().Value.(type) {
862894
case *v1_common.AnyValue_StringValue:
863895
if len(value.StringValue) > maxAttrSize {
896+
if truncationExample != nil && truncationExample.origSize == 0 { // only capture the first truncation
897+
*truncationExample = truncatedAttrInfo{scope: scope, name: attr.Key, field: "value", origSize: len(value.StringValue)}
898+
}
864899
value.StringValue = value.StringValue[:maxAttrSize]
865900
count++
866901
}

modules/distributor/distributor_test.go

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ func TestRequestsByTraceID(t *testing.T) {
941941
if tt.emptyTenant {
942942
tenant = ""
943943
}
944-
ringTokens, rebatchedTraces, _, err := requestsByTraceID(tt.batches, tenant, 1, 1000)
944+
ringTokens, rebatchedTraces, _, _, err := requestsByTraceID(tt.batches, tenant, 1, 1000)
945945
require.Equal(t, len(ringTokens), len(rebatchedTraces))
946946

947947
for i, expectedID := range tt.expectedIDs {
@@ -1023,7 +1023,7 @@ func TestProcessAttributes(t *testing.T) {
10231023
},
10241024
}
10251025

1026-
_, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte)
1026+
_, rebatchedTrace, truncatedCount, _, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte)
10271027
// 2 at resource level, 2 at span level, 2 at event level, 2 at link level, 2 at scope level
10281028
assert.Equal(t, 10, truncatedCount.Total())
10291029
assert.Equal(t, 2, truncatedCount.Resource)
@@ -1092,6 +1092,105 @@ func TestProcessAttributes(t *testing.T) {
10921092
}
10931093
}
10941094

1095+
func TestRequestsByTraceID_TruncationDetail(t *testing.T) {
1096+
longString := strings.Repeat("t", 5000)
1097+
maxAttrByte := 1000
1098+
1099+
// No truncation — detail should be nil
1100+
trace := test.MakeTraceWithSpanCount(1, 1, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10})
1101+
_, _, truncatedCount, detail, err := requestsByTraceID(trace.ResourceSpans, "test", 1, maxAttrByte)
1102+
require.NoError(t, err)
1103+
assert.Equal(t, 0, truncatedCount.Total())
1104+
assert.Nil(t, detail)
1105+
1106+
// With truncation — detail is always populated
1107+
trace = test.MakeTraceWithSpanCount(1, 1, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10})
1108+
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
1109+
test.MakeAttribute("oversized", longString))
1110+
_, _, truncatedCount, detail, err = requestsByTraceID(trace.ResourceSpans, "test", 1, maxAttrByte)
1111+
require.NoError(t, err)
1112+
assert.Greater(t, truncatedCount.Total(), 0)
1113+
require.NotNil(t, detail)
1114+
assert.Equal(t, "resource", detail.scope)
1115+
assert.Equal(t, "value", detail.field)
1116+
assert.Equal(t, "oversized", detail.name)
1117+
assert.Equal(t, 5000, detail.origSize)
1118+
1119+
// maxSpanAttrSize == 0 — no truncation, no detail
1120+
trace = test.MakeTraceWithSpanCount(1, 1, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10})
1121+
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
1122+
test.MakeAttribute("oversized", longString))
1123+
_, _, truncatedCount, detail, err = requestsByTraceID(trace.ResourceSpans, "test", 1, 0)
1124+
require.NoError(t, err)
1125+
assert.Equal(t, 0, truncatedCount.Total())
1126+
assert.Nil(t, detail)
1127+
}
1128+
1129+
func TestProcessAttributesDetail(t *testing.T) {
1130+
// Without detail — nil detail, truncation still happens
1131+
attributes := []*v1_common.KeyValue{
1132+
test.MakeAttribute("key", strings.Repeat("v", 5000)),
1133+
}
1134+
count := processAttributes(attributes, 2048, nil, "span")
1135+
assert.Equal(t, 1, count)
1136+
assert.Equal(t, 2048, len(attributes[0].Value.GetStringValue()))
1137+
1138+
// Value truncation — detail captured deterministically
1139+
attributes = []*v1_common.KeyValue{
1140+
test.MakeAttribute("key", strings.Repeat("v", 5000)),
1141+
}
1142+
detail := truncatedAttrInfo{}
1143+
count = processAttributes(attributes, 2048, &detail, "span")
1144+
assert.Equal(t, 1, count)
1145+
assert.Equal(t, "key", detail.name)
1146+
assert.Equal(t, "span", detail.scope)
1147+
assert.Equal(t, "value", detail.field)
1148+
assert.Equal(t, 5000, detail.origSize)
1149+
1150+
// Key truncation — detail captured deterministically
1151+
attributes = []*v1_common.KeyValue{
1152+
test.MakeAttribute(strings.Repeat("k", 5000), "short"),
1153+
}
1154+
detail = truncatedAttrInfo{}
1155+
count = processAttributes(attributes, 2048, &detail, "resource")
1156+
assert.Equal(t, 1, count)
1157+
assert.Equal(t, "key", detail.field)
1158+
assert.Equal(t, "resource", detail.scope)
1159+
assert.Equal(t, 5000, detail.origSize)
1160+
assert.Equal(t, strings.Repeat("k", 2048), detail.name) // truncated prefix, not full original
1161+
1162+
// Only the first truncation is captured (first of two values)
1163+
attributes = []*v1_common.KeyValue{
1164+
test.MakeAttribute("key1", strings.Repeat("v", 5000)),
1165+
test.MakeAttribute("key2", strings.Repeat("v", 6000)),
1166+
}
1167+
detail = truncatedAttrInfo{}
1168+
count = processAttributes(attributes, 2048, &detail, "span")
1169+
assert.Equal(t, 2, count)
1170+
assert.Equal(t, "key1", detail.name)
1171+
assert.Equal(t, 5000, detail.origSize)
1172+
1173+
// Both key AND value oversized — key wins (checked first)
1174+
attributes = []*v1_common.KeyValue{
1175+
test.MakeAttribute(strings.Repeat("k", 5000), strings.Repeat("v", 6000)),
1176+
}
1177+
detail = truncatedAttrInfo{}
1178+
count = processAttributes(attributes, 2048, &detail, "span")
1179+
assert.Equal(t, 2, count)
1180+
assert.Equal(t, "key", detail.field)
1181+
assert.Equal(t, 5000, detail.origSize)
1182+
1183+
// Already-captured detail (origSize > 0) is not overwritten
1184+
detail = truncatedAttrInfo{scope: "resource", name: "first", field: "value", origSize: 3000}
1185+
attributes = []*v1_common.KeyValue{
1186+
test.MakeAttribute("key2", strings.Repeat("v", 6000)),
1187+
}
1188+
count = processAttributes(attributes, 2048, &detail, "span")
1189+
assert.Equal(t, 1, count)
1190+
assert.Equal(t, "first", detail.name)
1191+
assert.Equal(t, 3000, detail.origSize)
1192+
}
1193+
10951194
func BenchmarkTestsByRequestID(b *testing.B) {
10961195
spansPer := 5000
10971196
batches := 100
@@ -1114,7 +1213,7 @@ func BenchmarkTestsByRequestID(b *testing.B) {
11141213

11151214
for i := 0; i < b.N; i++ {
11161215
for _, blerg := range ils {
1117-
_, _, _, err := requestsByTraceID([]*v1.ResourceSpans{
1216+
_, _, _, _, err := requestsByTraceID([]*v1.ResourceSpans{
11181217
{
11191218
ScopeSpans: blerg,
11201219
},
@@ -2424,7 +2523,7 @@ func TestRequestsByTraceID_SpanIDValidation(t *testing.T) {
24242523
},
24252524
},
24262525
}
2427-
_, _, _, err := requestsByTraceID(batches, "test-tenant", 1, 1000)
2526+
_, _, _, _, err := requestsByTraceID(batches, "test-tenant", 1, 1000)
24282527
require.Error(t, err)
24292528
require.Contains(t, err.Error(), "span ids must be 64 bit")
24302529
}
@@ -2444,7 +2543,7 @@ func TestRequestsByTraceID_SpanIDValidation(t *testing.T) {
24442543
},
24452544
},
24462545
}
2447-
_, _, _, err := requestsByTraceID(batches, "test-tenant", 1, 1000)
2546+
_, _, _, _, err := requestsByTraceID(batches, "test-tenant", 1, 1000)
24482547
require.NoError(t, err)
24492548
}
24502549

modules/distributor/forwarder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestForwarder(t *testing.T) {
3030
require.NoError(t, err)
3131

3232
b := test.MakeBatch(10, id)
33-
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
33+
keys, rebatchedTraces, _, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
3434
require.NoError(t, err)
3535

3636
o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
@@ -81,7 +81,7 @@ func TestForwarder_shutdown(t *testing.T) {
8181
require.NoError(t, err)
8282

8383
b := test.MakeBatch(10, id)
84-
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
84+
keys, rebatchedTraces, _, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
8585
require.NoError(t, err)
8686

8787
o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)

0 commit comments

Comments
 (0)