Skip to content

Commit a1f5dcb

Browse files
ingester: use a 64-bit hash to avoid collisions (#5276)
* ingester: use a 64-bit hash to avoid collisions We had this issue in the distributor, it it can happen in the ingester too with live traces. * Update changelog * Add test for live traces collision
1 parent 8e88975 commit a1f5dcb

File tree

2 files changed

+76
-21
lines changed

2 files changed

+76
-21
lines changed

modules/ingester/instance.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"encoding/hex"
77
"errors"
88
"fmt"
9-
"hash"
10-
"hash/fnv"
119
"sort"
1210
"sync"
1311
"time"
@@ -25,6 +23,7 @@ import (
2523
"github.com/grafana/tempo/pkg/model/trace"
2624
"github.com/grafana/tempo/pkg/tempopb"
2725
"github.com/grafana/tempo/pkg/tracesizes"
26+
"github.com/grafana/tempo/pkg/util"
2827
"github.com/grafana/tempo/pkg/util/log"
2928
"github.com/grafana/tempo/pkg/validation"
3029
"github.com/grafana/tempo/tempodb"
@@ -79,7 +78,7 @@ var (
7978

8079
type instance struct {
8180
tracesMtx sync.Mutex
82-
traces map[uint32]*liveTrace
81+
traces map[uint64]*liveTrace
8382
traceSizes *tracesizes.Tracker
8483
traceSizeBytes uint64
8584

@@ -105,8 +104,6 @@ type instance struct {
105104
localReader backend.Reader
106105
localWriter backend.Writer
107106

108-
hash hash.Hash32
109-
110107
logger kitlog.Logger
111108
maxTraceLogger *log.RateLimitedLogger
112109
}
@@ -115,7 +112,7 @@ func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverride
115112
logger := kitlog.With(log.Logger, "tenant", instanceID)
116113

117114
i := &instance{
118-
traces: map[uint32]*liveTrace{},
115+
traces: map[uint64]*liveTrace{},
119116
traceSizes: tracesizes.New(),
120117

121118
instanceID: instanceID,
@@ -131,8 +128,6 @@ func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverride
131128
localReader: backend.NewReader(l),
132129
localWriter: backend.NewWriter(l),
133130

134-
hash: fnv.New32(),
135-
136131
logger: logger,
137132
maxTraceLogger: log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
138133
}
@@ -214,7 +209,7 @@ func (i *instance) push(ctx context.Context, id, traceBytes []byte) error {
214209
return errTraceTooLarge
215210
}
216211

217-
tkn := i.tokenForTraceID(id)
212+
tkn := util.HashForTraceID(id)
218213
trace := i.getOrCreateTrace(id, tkn)
219214

220215
err = trace.Push(ctx, i.instanceID, traceBytes)
@@ -412,7 +407,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
412407

413408
// live traces
414409
i.tracesMtx.Lock()
415-
if liveTrace, ok := i.traces[i.tokenForTraceID(id)]; ok {
410+
if liveTrace, ok := i.traces[util.HashForTraceID(id)]; ok {
416411
completeTrace, err = model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForRead(liveTrace.batches)
417412
if err != nil {
418413
i.tracesMtx.Unlock()
@@ -486,7 +481,7 @@ func (i *instance) AddCompletingBlock(b common.WALBlock) {
486481
// getOrCreateTrace will return a new trace object for the given request
487482
//
488483
// It must be called under the i.tracesMtx lock
489-
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
484+
func (i *instance) getOrCreateTrace(traceID []byte, fp uint64) *liveTrace {
490485
trace, ok := i.traces[fp]
491486
if ok {
492487
return trace
@@ -498,13 +493,6 @@ func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
498493
return trace
499494
}
500495

501-
// tokenForTraceID hash trace ID, should be called under lock
502-
func (i *instance) tokenForTraceID(id []byte) uint32 {
503-
i.hash.Reset()
504-
_, _ = i.hash.Write(id)
505-
return i.hash.Sum32()
506-
}
507-
508496
// resetHeadBlock() should be called under lock
509497
func (i *instance) resetHeadBlock() error {
510498
dedicatedColumns := i.getDedicatedColumns()

modules/ingester/instance_test.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/grafana/tempo/pkg/model/trace"
2121
"github.com/grafana/tempo/pkg/tempopb"
2222
v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1"
23+
"github.com/grafana/tempo/pkg/util"
2324
"github.com/grafana/tempo/pkg/util/test"
2425
)
2526

@@ -397,7 +398,7 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
397398
instance, _ := defaultInstance(t)
398399

399400
for _, trace := range tc.input {
400-
fp := instance.tokenForTraceID(trace.traceID)
401+
fp := util.HashForTraceID(trace.traceID)
401402
instance.traces[fp] = trace
402403
}
403404

@@ -406,12 +407,12 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
406407

407408
require.Equal(t, len(tc.expectedExist), len(instance.traces))
408409
for _, expectedExist := range tc.expectedExist {
409-
_, ok := instance.traces[instance.tokenForTraceID(expectedExist.traceID)]
410+
_, ok := instance.traces[util.HashForTraceID(expectedExist.traceID)]
410411
require.True(t, ok)
411412
}
412413

413414
for _, expectedNotExist := range tc.expectedNotExist {
414-
_, ok := instance.traces[instance.tokenForTraceID(expectedNotExist.traceID)]
415+
_, ok := instance.traces[util.HashForTraceID(expectedNotExist.traceID)]
415416
require.False(t, ok)
416417
}
417418
})
@@ -665,6 +666,72 @@ func TestInstancePartialSuccess(t *testing.T) {
665666
assert.Equal(t, expected, result)
666667
}
667668

669+
func TestFindTraceByIDNoCollisions(t *testing.T) {
670+
ctx := context.Background()
671+
ingester, _, _ := defaultIngester(t, t.TempDir())
672+
instance, err := ingester.getOrCreateInstance(testTenantID)
673+
require.NoError(t, err, "unexpected error creating new instance")
674+
675+
// Verify that Trace IDs that collide with fnv-32 hash do not collide with fnv-64 hash
676+
hexIDs := []string{
677+
"fd5980503add11f09f80f77608c1b2da",
678+
"091ea7803ade11f0998a055186ee1243",
679+
"9e0d446036dc11f09ac04988d2097052",
680+
"a61ed97036dc11f0883771db3b51b1ec",
681+
"6b27f5501eda11f09e99db1b2c23c542",
682+
"6b4149b01eda11f0b0e2a966cf7ebbc8",
683+
"3e9582202f9a11f0afb01b7c06024bd6",
684+
"370db6802f9a11f0a9a212dff3125239",
685+
"978d70802a7311f0991f350653ef0ab4",
686+
"9b66da202a7311f09d292db17ccfd31a",
687+
"de567f703bb711f0b8c377682d1667e6",
688+
"dc2d0fc03bb711f091de732fcf93048c",
689+
}
690+
691+
ids := make([][]byte, len(hexIDs))
692+
for i, hexID := range hexIDs {
693+
traceID, err := util.HexStringToTraceID(hexID)
694+
require.NoError(t, err)
695+
ids[i] = traceID
696+
}
697+
698+
multiMaxBytes := make([]int, len(ids))
699+
for j := range multiMaxBytes {
700+
multiMaxBytes[j] = 1000
701+
}
702+
req := makePushBytesRequestMultiTraces(ids, multiMaxBytes)
703+
require.Equal(t, len(ids), len(req.Traces))
704+
response := instance.PushBytesRequest(ctx, req)
705+
errored, maxLiveCount, traceTooLargeCount := CheckPushBytesError(response)
706+
707+
require.False(t, errored)
708+
require.Equal(t, 0, maxLiveCount)
709+
require.Equal(t, 0, traceTooLargeCount)
710+
711+
traceResults := make([]*tempopb.Trace, len(ids))
712+
for i := range ids {
713+
result, err := instance.FindTraceByID(ctx, ids[i], false)
714+
require.NoError(t, err, "error finding trace by id")
715+
require.NotNil(t, result)
716+
require.NotNil(t, result.Trace)
717+
traceResults[i] = result.Trace
718+
}
719+
720+
// Verify that spans do not appear in multiple traces
721+
spanIDs := map[string]struct{}{}
722+
for _, trace := range traceResults {
723+
for _, resourceSpan := range trace.ResourceSpans {
724+
for _, scopeSpan := range resourceSpan.ScopeSpans {
725+
for _, span := range scopeSpan.Spans {
726+
_, found := spanIDs[string(span.SpanId)]
727+
require.False(t, found, "span %s appears in multiple traces", span.SpanId)
728+
spanIDs[string(span.SpanId)] = struct{}{}
729+
}
730+
}
731+
}
732+
}
733+
}
734+
668735
func TestSortByteSlices(t *testing.T) {
669736
numTraces := 100
670737

0 commit comments

Comments
 (0)