Skip to content

Commit 91948b8

Browse files
authored
enhancement: supported deduping spans within block builder (#6539)
1 parent 2ac73ca commit 91948b8

File tree

9 files changed

+202
-45
lines changed

9 files changed

+202
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* [ENHANCEMENT] Used frontend MaxExemplars config as single source of truth for exemplar limits. Added a safety cap at the traceql engine entry points. [#6515](https://github.com/grafana/tempo/pull/6515) (@zhxiaogg)
44
* [CHANGE] Set default `max_result_limit` for search to 256*1024 [#6525](https://github.com/grafana/tempo/pull/6525) (@zhxiaogg)
55
* [CHANGE] **BREAKING CHANGE** Remove Opencensus receiver [#6523](https://github.com/grafana/tempo/pull/6523) (@javiermolinar)
6+
* [ENHANCEMENT] Block builder: deduplicate spans within traces during block creation and track removed duplicates via `tempo_block_builder_spans_deduped_total` metric [#6539](https://github.com/grafana/tempo/pull/6539) (@zhxiaogg)
67
* [CHANGE] Upgrade Tempo to Go 1.26.0 [#6443](https://github.com/grafana/tempo/pull/6443) (@stoewer)
78
* [CHANGE] Allow duplicate dimensions for span metrics and service graphs. This is a valid use case if using different instrumentation libraries, with spans having "deployment.environment" and others "deployment_environment", for example. [#6288](https://github.com/grafana/tempo/pull/6288) (@carles-grafana)
89
* [CHANGE] Updade default max duration for traceql metrics queries up to one day [#6285](https://github.com/grafana/tempo/pull/6285) (@javiermolinar)

modules/blockbuilder/blockbuilder.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ var (
8484
Name: "owned_partitions",
8585
Help: "Indicates partition ownership by this block-builder (1 = owned).",
8686
}, []string{"partition", "state"})
87+
metricDedupedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
88+
Namespace: "tempo",
89+
Subsystem: "block_builder",
90+
Name: "spans_deduped_total",
91+
Help: "Total number of duplicate spans removed during block building.",
92+
}, []string{"tenant"})
8793

8894
tracer = otel.Tracer("modules/blockbuilder")
8995

modules/blockbuilder/live_traces_iter.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package blockbuilder
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"slices"
78
"sync"
89

910
"github.com/grafana/tempo/pkg/livetraces"
1011
"github.com/grafana/tempo/pkg/tempopb"
12+
"github.com/grafana/tempo/pkg/util"
1113
"github.com/grafana/tempo/tempodb/encoding/common"
1214
)
1315

@@ -28,12 +30,14 @@ type chEntry struct {
2830
// Tracks the min/max timestamps seen across all traces that can be accessed
2931
// once all traces are iterated (unmarshaled), since this can't be known upfront.
3032
type liveTracesIter struct {
31-
mtx sync.Mutex
32-
liveTraces *livetraces.LiveTraces[[]byte]
33-
ch chan []chEntry
34-
chBuf []chEntry
35-
cancel func()
36-
start, end uint64
33+
mtx sync.Mutex
34+
liveTraces *livetraces.LiveTraces[[]byte]
35+
ch chan []chEntry
36+
chBuf []chEntry
37+
cancel func()
38+
start, end uint64
39+
dedupedSpans uint32
40+
exhausted bool
3741
}
3842

3943
func newLiveTracesIter(liveTraces *livetraces.LiveTraces[[]byte]) *liveTracesIter {
@@ -88,6 +92,12 @@ func (i *liveTracesIter) iter(ctx context.Context) {
8892
return bytes.Compare(a.id, b.id)
8993
})
9094

95+
// h and buffer are reused across all spans to avoid repeated allocations.
96+
h := util.NewTokenHasher()
97+
buffer := make([]byte, 4)
98+
// seen is reused across traces to avoid repeated allocations.
99+
seen := make(map[uint64]struct{}, 1024)
100+
91101
// Begin sending to channel in chunks to reduce channel overhead.
92102
seq := slices.Chunk(entries, 10)
93103
for entries := range seq {
@@ -109,19 +119,29 @@ func (i *liveTracesIter) iter(ctx context.Context) {
109119
}
110120
}
111121

112-
// Update block timestamp bounds
113-
for _, b := range tr.ResourceSpans {
114-
for _, ss := range b.ScopeSpans {
122+
// Deduplicate spans and update block timestamp bounds in one pass.
123+
for _, rs := range tr.ResourceSpans {
124+
for _, ss := range rs.ScopeSpans {
125+
unique := ss.Spans[:0]
115126
for _, s := range ss.Spans {
127+
token := util.TokenForID(h, buffer, int32(s.Kind), s.SpanId)
128+
if _, ok := seen[token]; ok {
129+
i.dedupedSpans++
130+
continue
131+
}
132+
seen[token] = struct{}{}
133+
unique = append(unique, s)
116134
if i.start == 0 || s.StartTimeUnixNano < i.start {
117135
i.start = s.StartTimeUnixNano
118136
}
119137
if s.EndTimeUnixNano > i.end {
120138
i.end = s.EndTimeUnixNano
121139
}
122140
}
141+
ss.Spans = unique
123142
}
124143
}
144+
clear(seen)
125145

126146
tempopb.ReuseByteSlices(entry.Batches)
127147
delete(i.liveTraces.Traces, e.hash)
@@ -139,6 +159,8 @@ func (i *liveTracesIter) iter(ctx context.Context) {
139159
return
140160
}
141161
}
162+
163+
i.exhausted = true
142164
}
143165

144166
// MinMaxTimestamps returns the earliest start, and latest end span timestamps,
@@ -151,6 +173,18 @@ func (i *liveTracesIter) MinMaxTimestamps() (uint64, uint64) {
151173
return i.start, i.end
152174
}
153175

176+
// DedupedSpans returns the total number of duplicate spans that were removed
177+
// across all traces. Returns an error if the iterator has not been fully exhausted.
178+
func (i *liveTracesIter) DedupedSpans() (uint32, error) {
179+
i.mtx.Lock()
180+
defer i.mtx.Unlock()
181+
182+
if !i.exhausted {
183+
return 0, errors.New("iterator must be exhausted before calling DedupedSpans")
184+
}
185+
return i.dedupedSpans, nil
186+
}
187+
154188
func (i *liveTracesIter) Close() {
155189
i.cancel()
156190
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package blockbuilder
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/grafana/tempo/pkg/livetraces"
8+
"github.com/grafana/tempo/pkg/util/test"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLiveTracesIter_DedupSpans(t *testing.T) {
13+
const spanCount = 5
14+
15+
traceID := generateTraceID(t)
16+
tr := test.MakeTraceWithSpanCount(1, spanCount, traceID)
17+
18+
trBytes, err := tr.Marshal()
19+
require.NoError(t, err)
20+
21+
// Push the same trace bytes twice to simulate replicated writes
22+
lt := livetraces.New(func(b []byte) uint64 { return uint64(len(b)) }, 0, 0, "test-tenant")
23+
lt.Push(traceID, trBytes, 0)
24+
lt.Push(traceID, trBytes, 0)
25+
26+
iter := newLiveTracesIter(lt)
27+
ctx := context.Background()
28+
29+
id, resultTr, err := iter.Next(ctx)
30+
require.NoError(t, err)
31+
require.NotNil(t, id)
32+
require.NotNil(t, resultTr)
33+
34+
// Exhaust the iterator
35+
_, _, err = iter.Next(ctx)
36+
require.NoError(t, err)
37+
38+
// Duplicate push should be fully deduped - only the original spans remain
39+
total := 0
40+
for _, rs := range resultTr.ResourceSpans {
41+
for _, ss := range rs.ScopeSpans {
42+
total += len(ss.Spans)
43+
}
44+
}
45+
require.Equal(t, spanCount, total)
46+
n, err := iter.DedupedSpans()
47+
require.NoError(t, err)
48+
require.Equal(t, uint32(spanCount), n)
49+
}
50+
51+
func TestLiveTracesIter_DedupedSpans_ErrorWhenNotExhausted(t *testing.T) {
52+
// Push 11 traces so iter() produces 2 chunks (chunk size is 10).
53+
// Without reading from the channel the buffer fills after the first chunk,
54+
// causing iter() to block on the second send. Cancelling via Close() makes
55+
// it exit early without setting exhausted, so DedupedSpans returns an error.
56+
lt := livetraces.New(func(b []byte) uint64 { return uint64(len(b)) }, 0, 0, "test-tenant")
57+
for j := 0; j < 11; j++ {
58+
traceID := generateTraceID(t)
59+
tr := test.MakeTraceWithSpanCount(1, 1, traceID)
60+
trBytes, err := tr.Marshal()
61+
require.NoError(t, err)
62+
lt.Push(traceID, trBytes, 0)
63+
}
64+
65+
iter := newLiveTracesIter(lt)
66+
iter.Close() // cancel before exhausting
67+
68+
_, err := iter.DedupedSpans() // blocks until iter() exits, then returns error
69+
require.Error(t, err)
70+
}

modules/blockbuilder/tenant_store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ func (s *tenantStore) Flush(ctx context.Context, r tempodb.Reader, w tempodb.Wri
147147
return err
148148
}
149149

150+
if n, err := iter.DedupedSpans(); err != nil {
151+
level.Error(s.logger).Log("msg", "failed to get deduped spans count", "err", err)
152+
} else if n > 0 {
153+
metricDedupedSpans.WithLabelValues(s.tenantID).Add(float64(n))
154+
}
155+
150156
// Update meta timestamps which couldn't be known until we unmarshaled
151157
// all of the traces.
152158
start, end := iter.MinMaxTimestamps()

pkg/model/trace/combine.go

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package trace
22

33
import (
4-
"encoding/binary"
54
"fmt"
6-
"hash"
7-
"hash/fnv"
85
"sync"
96

107
"github.com/grafana/tempo/pkg/tempopb"
8+
"github.com/grafana/tempo/pkg/util"
119
)
1210

1311
// token is uint64 to reduce hash collision rates. Experimentally, it was observed
@@ -16,23 +14,6 @@ import (
1614
// results in a dropped span during combine.
1715
type token uint64
1816

19-
func newHash() hash.Hash64 {
20-
return fnv.New64()
21-
}
22-
23-
// tokenForID returns a token for use in a hash map given a span id and span kind
24-
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
25-
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
26-
// as it is shared between client and server spans.
27-
func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token {
28-
binary.LittleEndian.PutUint32(buffer, uint32(kind))
29-
30-
h.Reset()
31-
_, _ = h.Write(b)
32-
_, _ = h.Write(buffer)
33-
return token(h.Sum64())
34-
}
35-
3617
var ErrTraceTooLarge = fmt.Errorf("trace exceeds max size")
3718

3819
// Combiner combines multiple partial traces into one, deduping spans based on
@@ -77,7 +58,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
7758
return spanCount, nil
7859
}
7960

80-
h := newHash()
61+
h := util.NewTokenHasher()
8162
buffer := make([]byte, 4)
8263

8364
// First call?
@@ -97,7 +78,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
9778
for _, b := range c.result.ResourceSpans {
9879
for _, ils := range b.ScopeSpans {
9980
for _, s := range ils.Spans {
100-
c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{}
81+
c.spans[token(util.TokenForID(h, buffer, int32(s.Kind), s.SpanId))] = struct{}{}
10182
}
10283
}
10384
}
@@ -117,7 +98,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
11798
notFoundSpans := ils.Spans[:0]
11899
for _, s := range ils.Spans {
119100
// if not already encountered, then keep
120-
token := tokenForID(h, buffer, int32(s.Kind), s.SpanId)
101+
token := token(util.TokenForID(h, buffer, int32(s.Kind), s.SpanId))
121102
_, ok := c.spans[token]
122103
if !ok {
123104
notFoundSpans = append(notFoundSpans, s)

pkg/model/trace/combine_test.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111

1212
"github.com/grafana/tempo/pkg/tempopb"
13+
"github.com/grafana/tempo/pkg/util"
1314
"github.com/grafana/tempo/pkg/util/test"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -126,7 +127,7 @@ func TestTokenForIDCollision(t *testing.T) {
126127
// Estimate the hash collision rate of tokenForID.
127128

128129
n := 1_000_000
129-
h := newHash()
130+
h := util.NewTokenHasher()
130131
buf := make([]byte, 4)
131132

132133
tokens := map[token]struct{}{}
@@ -140,7 +141,7 @@ func TestTokenForIDCollision(t *testing.T) {
140141
cpy := append([]byte(nil), spanID...)
141142
IDs = append(IDs, cpy)
142143

143-
tokens[tokenForID(h, buf, 0, spanID)] = struct{}{}
144+
tokens[token(util.TokenForID(h, buf, 0, spanID))] = struct{}{}
144145
}
145146

146147
// Ensure no duplicate span IDs accidentally generated
@@ -162,17 +163,6 @@ func TestTokenForIDCollision(t *testing.T) {
162163
require.Equal(t, n, len(tokens))
163164
}
164165

165-
func BenchmarkTokenForID(b *testing.B) {
166-
h := newHash()
167-
id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
168-
buffer := make([]byte, 4)
169-
170-
b.ResetTimer()
171-
for i := 0; i < b.N; i++ {
172-
_ = tokenForID(h, buffer, 0, id)
173-
}
174-
}
175-
176166
func BenchmarkCombine(b *testing.B) {
177167
parts := []int{2, 3, 4, 8}
178168
requests := 100 // 100K spans per part

pkg/util/traceid.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package util
22

33
import (
44
"bytes"
5+
"encoding/binary"
56
"encoding/hex"
67
"errors"
78
"fmt"
9+
"hash"
10+
"hash/fnv"
811
"strings"
912
"unsafe"
1013
)
@@ -68,6 +71,23 @@ var spanKindFNVHashes = [...]uint64{
6871
0x43869769eb4f75c8, // spare 2
6972
}
7073

74+
// NewTokenHasher returns a new hash.Hash64 for use with TokenForID.
75+
func NewTokenHasher() hash.Hash64 {
76+
return fnv.New64()
77+
}
78+
79+
// TokenForID returns a token for use as a key in a hash map given a span ID and span kind.
80+
// h and buffer (must be at least 4 bytes) are provided by the caller for reuse across calls
81+
// to avoid repeated allocations. Use NewTokenHasher to create h.
82+
// kind is included because in zipkin traces the span id is shared between client and server spans.
83+
func TokenForID(h hash.Hash64, buffer []byte, kind int32, id []byte) uint64 {
84+
binary.LittleEndian.PutUint32(buffer, uint32(kind))
85+
h.Reset()
86+
_, _ = h.Write(id)
87+
_, _ = h.Write(buffer[:4])
88+
return h.Sum64()
89+
}
90+
7191
// SpanIDAndKindToToken converts a span ID into a token for use as key in a hash map. The token is generated such
7292
// that it has a low collision probability. In zipkin traces the span id is not guaranteed to be unique as it
7393
// is shared between client and server spans. Therefore, it is sometimes required to take the span kind into account.

0 commit comments

Comments
 (0)