Skip to content

Commit c2e798e

Browse files
authored
chore: Improve tracing visibility into distributor (#4361)
1 parent 10710fc commit c2e798e

File tree

4 files changed

+35
-3
lines changed

4 files changed

+35
-3
lines changed

pkg/distributor/distributor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,15 @@ func (d *Distributor) GetProfileLanguage(series *distributormodel.ProfileSeries)
292292
}
293293

294294
func (d *Distributor) PushBatch(ctx context.Context, req *distributormodel.PushRequest) error {
295+
sp, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.PushBatch")
296+
defer sp.Finish()
297+
295298
tenantID, err := tenant.ExtractTenantIDFromContext(ctx)
296299
if err != nil {
297300
return connect.NewError(connect.CodeUnauthenticated, err)
298301
}
302+
sp.SetTag("tenant_id", tenantID)
303+
299304
if len(req.Series) == 0 {
300305
return noNewProfilesReceivedError()
301306
}
@@ -415,9 +420,13 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
415420
// Normalisation is quite an expensive operation,
416421
// therefore it should be done after the rate limit check.
417422
if req.Language == "go" {
423+
sp, _ := opentracing.StartSpanFromContext(ctx, "pprof.FixGoProfile")
418424
req.Profile.Profile = pprof.FixGoProfile(req.Profile.Profile)
425+
sp.Finish()
419426
}
427+
sp, _ := opentracing.StartSpanFromContext(ctx, "Profile.Normalize")
420428
req.Profile.Normalize()
429+
sp.Finish()
421430

422431
if len(req.Profile.Sample) == 0 {
423432
// TODO(kolesnikovae):

pkg/distributor/writepath/router.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ func (m *Router) running(ctx context.Context) error {
9494
}
9595

9696
func (m *Router) Send(ctx context.Context, req *distributormodel.ProfileSeries) error {
97+
sp, ctx := opentracing.StartSpanFromContext(ctx, "Router.Send")
98+
defer sp.Finish()
9799
config := m.overrides.WritePathOverrides(req.TenantID)
98100
switch config.WritePath {
99101
case SegmentWriterPath:

pkg/ingester/pyroscope/ingest_handler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,17 @@ func readInputRawDataFromRequest(ctx context.Context, r *http.Request, input *in
177177
}
178178

179179
buf := bytes.NewBuffer(make([]byte, 0, 64<<10))
180-
if n, err := io.Copy(buf, r.Body); err != nil {
180+
n, err := io.Copy(buf, r.Body)
181+
if err != nil {
181182
return fmt.Errorf("error reading request body bytes_read %d: %w", n, err)
182183
}
184+
185+
if sp != nil {
186+
sp.SetTag("content_length", n)
187+
sp.LogFields(
188+
otlog.String("msg", "read body from request"),
189+
)
190+
}
183191
b := buf.Bytes()
184192

185193
switch {

pkg/util/delayhandler/http.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package delayhandler
33
import (
44
"net/http"
55
"time"
6+
7+
"github.com/opentracing/opentracing-go"
68
)
79

810
func wrapResponseWriter(w http.ResponseWriter, end time.Time) (http.ResponseWriter, *delayedResponseWriter) {
@@ -65,15 +67,21 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler {
6567
return func(h http.Handler) http.Handler {
6668
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
6769
start := timeNow()
70+
ctx := r.Context()
6871

69-
delay := getDelay(r.Context(), limits)
72+
delay := getDelay(ctx, limits)
7073
var delayRw *delayedResponseWriter
7174
if delay > 0 {
7275
w, delayRw = wrapResponseWriter(w, start.Add(delay))
76+
77+
// only add a span when delay is active
78+
var sp opentracing.Span
79+
sp, ctx = opentracing.StartSpanFromContext(ctx, "delayhandler.Handler")
80+
defer sp.Finish()
7381
}
7482

7583
// now run the chain after me
76-
h.ServeHTTP(w, r)
84+
h.ServeHTTP(w, r.WithContext(ctx))
7785

7886
// if we didn't delay, return immediately
7987
if delayRw == nil {
@@ -96,6 +104,11 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler {
96104
addDelayHeader(w.Header(), delayLeft)
97105
}
98106

107+
// create a separate span to make the artificial delay clear
108+
sp, _ := opentracing.StartSpanFromContext(ctx, "delayhandler.Delay")
109+
sp.SetTag("delayed_by", delayLeft.String())
110+
defer sp.Finish()
111+
99112
// wait for the delay to elapse
100113
<-timeAfter(delayLeft)
101114
})

0 commit comments

Comments
 (0)