Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/usagestats"
tempo_util "github.com/grafana/tempo/pkg/util"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/validation"
)

Expand Down Expand Up @@ -384,7 +383,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
d.usage.Observe(userID, batches)
}

keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte)
ringTokens, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
Expand All @@ -395,13 +394,13 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount))
}

err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys)
err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, ringTokens)
if err != nil {
return nil, err
}

if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 {
d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces)
d.generatorForwarder.SendTraces(ctx, userID, ringTokens, rebatchedTraces)
}

if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil {
Expand Down Expand Up @@ -537,12 +536,12 @@ func (d *Distributor) UsageTrackerHandler() http.Handler {
// and traces to pass onto the ingesters.
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) {
const tracesPerBatch = 20 // p50 of internal env
tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch)
tracesByID := make(map[uint64]*rebatchedTrace, tracesPerBatch)
truncatedAttributeCount := 0

for _, b := range batches {
spansByILS := make(map[uint32]*v1.ScopeSpans)
// check for large resources for large attributes
spansByILS := make(map[uint64]*v1.ScopeSpans)
// check resource for large attributes
if maxSpanAttrSize > 0 && b.Resource != nil {
resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize)
truncatedAttributeCount += resourceAttrTruncatedCount
Expand All @@ -560,11 +559,11 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma
return nil, nil, 0, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
}

traceKey := tempo_util.TokenFor(userID, traceID)
traceKey := util.HashForTraceID(traceID)
ilsKey := traceKey
if ils.Scope != nil {
ilsKey = fnv1a.AddString32(ilsKey, ils.Scope.Name)
ilsKey = fnv1a.AddString32(ilsKey, ils.Scope.Version)
ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Name)
ilsKey = fnv1a.AddString64(ilsKey, ils.Scope.Version)
}

existingILS, ilsAdded := spansByILS[ilsKey]
Expand Down Expand Up @@ -615,15 +614,15 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma

metricTracesPerBatch.Observe(float64(len(tracesByID)))

keys := make([]uint32, 0, len(tracesByID))
ringTokens := make([]uint32, 0, len(tracesByID))
traces := make([]*rebatchedTrace, 0, len(tracesByID))

for k, r := range tracesByID {
keys = append(keys, k)
traces = append(traces, r)
for _, tr := range tracesByID {
ringTokens = append(ringTokens, util.TokenFor(userID, tr.id))
traces = append(traces, tr)
}

return keys, traces, truncatedAttributeCount, nil
return ringTokens, traces, truncatedAttributeCount, nil
}

// find and truncate the span attributes that are too large
Expand Down Expand Up @@ -808,7 +807,7 @@ func logSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logge
loggerWithAtts = log.With(
loggerWithAtts,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
util.StringifyAnyValue(a.GetValue()))
}
}

Expand All @@ -830,7 +829,7 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
util.StringifyAnyValue(a.GetValue()))
}

latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds())
Expand Down
203 changes: 191 additions & 12 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ func TestRequestsByTraceID(t *testing.T) {
traceIDA := []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
traceIDB := []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}

// These 2 trace IDs are known to collide under fnv32
collision1, _ := util.HexStringToTraceID("fd5980503add11f09f80f77608c1b2da")
collision2, _ := util.HexStringToTraceID("091ea7803ade11f0998a055186ee1243")

tests := []struct {
name string
emptyTenant bool
batches []*v1.ResourceSpans
expectedKeys []uint32
expectedTraces []*tempopb.Trace
Expand Down Expand Up @@ -709,32 +714,206 @@ func TestRequestsByTraceID(t *testing.T) {
expectedStarts: []uint32{10, 60},
expectedEnds: []uint32{50, 80},
},
{
// These 2 trace IDs are known to collide under fnv32
name: "known collisions",
emptyTenant: true,
batches: []*v1.ResourceSpans{
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 3,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test",
},
Spans: []*v1.Span{
{
TraceId: collision2,
Name: "spanA",
StartTimeUnixNano: uint64(30 * time.Second),
EndTimeUnixNano: uint64(40 * time.Second),
},
{
TraceId: collision2,
Name: "spanC",
StartTimeUnixNano: uint64(20 * time.Second),
EndTimeUnixNano: uint64(50 * time.Second),
},
{
TraceId: collision1,
Name: "spanE",
StartTimeUnixNano: uint64(70 * time.Second),
EndTimeUnixNano: uint64(80 * time.Second),
},
},
},
},
},
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 4,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test2",
},
Spans: []*v1.Span{
{
TraceId: collision2,
Name: "spanB",
StartTimeUnixNano: uint64(10 * time.Second),
EndTimeUnixNano: uint64(30 * time.Second),
},
{
TraceId: collision1,
Name: "spanD",
StartTimeUnixNano: uint64(60 * time.Second),
EndTimeUnixNano: uint64(80 * time.Second),
},
},
},
},
},
},
expectedKeys: []uint32{
util.TokenFor("", collision1),
util.TokenFor("", collision2),
},
expectedTraces: []*tempopb.Trace{
{
ResourceSpans: []*v1.ResourceSpans{
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 3,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test",
},
Spans: []*v1.Span{
{
TraceId: collision1,
Name: "spanE",
StartTimeUnixNano: uint64(70 * time.Second),
EndTimeUnixNano: uint64(80 * time.Second),
},
},
},
},
},
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 4,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test2",
},
Spans: []*v1.Span{
{
TraceId: collision1,
Name: "spanD",
StartTimeUnixNano: uint64(60 * time.Second),
EndTimeUnixNano: uint64(80 * time.Second),
},
},
},
},
},
},
},
{
ResourceSpans: []*v1.ResourceSpans{
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 3,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test",
},
Spans: []*v1.Span{
{
TraceId: collision2,
Name: "spanA",
StartTimeUnixNano: uint64(30 * time.Second),
EndTimeUnixNano: uint64(40 * time.Second),
},
{
TraceId: collision2,
Name: "spanC",
StartTimeUnixNano: uint64(20 * time.Second),
EndTimeUnixNano: uint64(50 * time.Second),
},
},
},
},
},
{
Resource: &v1_resource.Resource{
DroppedAttributesCount: 4,
},
ScopeSpans: []*v1.ScopeSpans{
{
Scope: &v1_common.InstrumentationScope{
Name: "test2",
},
Spans: []*v1.Span{
{
TraceId: collision2,
Name: "spanB",
StartTimeUnixNano: uint64(10 * time.Second),
EndTimeUnixNano: uint64(30 * time.Second),
},
},
},
},
},
},
},
},
expectedIDs: [][]byte{
collision1,
collision2,
},
expectedStarts: []uint32{60, 10},
expectedEnds: []uint32{80, 50},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
keys, rebatchedTraces, _, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1, 1000)
require.Equal(t, len(keys), len(rebatchedTraces))
tenant := util.FakeTenantID
if tt.emptyTenant {
tenant = ""
}
ringTokens, rebatchedTraces, _, err := requestsByTraceID(tt.batches, tenant, 1, 1000)
require.Equal(t, len(ringTokens), len(rebatchedTraces))

for i, expectedKey := range tt.expectedKeys {
for i, expectedID := range tt.expectedIDs {
foundIndex := -1
for j, key := range keys {
if expectedKey == key {
for j, tr := range rebatchedTraces {
if bytes.Equal(expectedID, tr.id) {
foundIndex = j
break
}
}
require.NotEqual(t, -1, foundIndex, "expected key %d not found", foundIndex)

// now confirm that the request at this position is the expected one
expectedReq := tt.expectedTraces[i]
actualReq := rebatchedTraces[foundIndex].trace
assert.Equal(t, expectedReq, actualReq)
assert.Equal(t, tt.expectedIDs[i], rebatchedTraces[foundIndex].id)
assert.Equal(t, tt.expectedStarts[i], rebatchedTraces[foundIndex].start)
assert.Equal(t, tt.expectedEnds[i], rebatchedTraces[foundIndex].end)
require.Equal(t, tt.expectedIDs[i], rebatchedTraces[foundIndex].id)
require.Equal(t, tt.expectedTraces[i], rebatchedTraces[foundIndex].trace)
require.Equal(t, tt.expectedStarts[i], rebatchedTraces[foundIndex].start)
require.Equal(t, tt.expectedEnds[i], rebatchedTraces[foundIndex].end)
}

assert.Equal(t, tt.expectedErr, err)
require.Equal(t, tt.expectedErr, err)
})
}
}
Expand Down
Loading
Loading