Skip to content

Commit 15e5d9c

Browse files
authored
fix(sampling): Force manual sampling on locked traces (#4084)
Co-authored-by: benjamin.debernardi <[email protected]>
1 parent 26c62ab commit 15e5d9c

File tree

3 files changed

+93
-4
lines changed

3 files changed

+93
-4
lines changed

ddtrace/tracer/span.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,21 @@ func (s *Span) setSamplingPriorityLocked(priority int, sampler samplernames.Samp
408408
s.context.setSamplingPriority(priority, sampler)
409409
}
410410

411+
// forceSetSamplingPriorityLocked updates the sampling priority.
412+
// If the trace is locked, the sampling priority is forced to the given value.
413+
//
414+
// This function is should only be used when applying a manual keep or drop decision.
415+
func (s *Span) forceSetSamplingPriorityLocked(priority int, sampler samplernames.SamplerName) {
416+
// We don't lock spans when flushing, so we could have a data race when
417+
// modifying a span as it's being flushed. This protects us against that
418+
// race, since spans are marked `finished` before we flush them.
419+
if s.finished {
420+
return
421+
}
422+
s.setMetric(keySamplingPriority, float64(priority))
423+
s.context.forceSetSamplingPriority(priority, sampler)
424+
}
425+
411426
// setTagError sets the error tag. It accounts for various valid scenarios.
412427
// This method is not safe for concurrent use.
413428
func (s *Span) setTagError(value interface{}, cfg errorConfig) {
@@ -549,11 +564,11 @@ func (s *Span) setTagBool(key string, v bool) {
549564
}
550565
case ext.ManualDrop:
551566
if v {
552-
s.setSamplingPriorityLocked(ext.PriorityUserReject, samplernames.Manual)
567+
s.forceSetSamplingPriorityLocked(ext.PriorityUserReject, samplernames.Manual)
553568
}
554569
case ext.ManualKeep:
555570
if v {
556-
s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.Manual)
571+
s.forceSetSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.Manual)
557572
}
558573
default:
559574
if v {

ddtrace/tracer/span_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,52 @@ func TestTraceManualKeepAndManualDrop(t *testing.T) {
571571
span.SetTag(scenario.tag, true)
572572
assert.Equal(t, scenario.keep, shouldKeep(span))
573573
})
574+
t.Run(fmt.Sprintf("%s/upstream-drop-locked", scenario.tag), func(t *testing.T) {
575+
tracer, err := newTracer()
576+
defer tracer.Stop()
577+
assert.NoError(t, err)
578+
579+
spanCtx := &SpanContext{
580+
traceID: traceIDFrom64Bits(42),
581+
spanID: 42,
582+
trace: newTrace(),
583+
}
584+
585+
// Set sampling priority (0 = drop decision from upstream) & lock the trace
586+
// mimicking inheriting a trace from an upstream service with a drop decision.
587+
spanCtx.setSamplingPriority(ext.PriorityAutoReject, samplernames.Unknown)
588+
spanCtx.trace.setLocked(true)
589+
590+
span := tracer.StartSpan("child span with sampling decision", ChildOf(spanCtx))
591+
span.SetTag(scenario.tag, true)
592+
593+
// The sampling decision should be applied as manual sampling takes
594+
// precedence over propagated decision
595+
assert.Equal(t, scenario.keep, shouldKeep(span))
596+
})
597+
t.Run(fmt.Sprintf("%s/upstream-keep-locked", scenario.tag), func(t *testing.T) {
598+
tracer, err := newTracer()
599+
defer tracer.Stop()
600+
assert.NoError(t, err)
601+
602+
spanCtx := &SpanContext{
603+
traceID: traceIDFrom64Bits(42),
604+
spanID: 42,
605+
trace: newTrace(),
606+
}
607+
608+
// Set sampling priority (1 = keep decision from upstream) & lock the trace
609+
// mimicking inheriting a trace from an upstream service with a keep decision.
610+
spanCtx.setSamplingPriority(ext.PriorityAutoKeep, samplernames.Unknown)
611+
spanCtx.trace.setLocked(true)
612+
613+
span := tracer.StartSpan("child span with sampling decision", ChildOf(spanCtx))
614+
span.SetTag(scenario.tag, true)
615+
616+
// The sampling decision should be applied as manual sampling takes
617+
// precedence over propagated decision
618+
assert.Equal(t, scenario.keep, shouldKeep(span))
619+
})
574620
}
575621
}
576622

ddtrace/tracer/spancontext.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,17 @@ func (c *SpanContext) setSamplingPriority(p int, sampler samplernames.SamplerNam
274274
}
275275
}
276276

277+
// forceSetSamplingPriority sets (and forces if the trace is locked) the sampling priority and decision maker (based on `sampler`).
278+
func (c *SpanContext) forceSetSamplingPriority(p int, sampler samplernames.SamplerName) {
279+
if c.trace == nil {
280+
c.trace = newTrace()
281+
}
282+
if c.trace.forceSetSamplingPriority(p, sampler) {
283+
// the trace's sampling priority or sampler was updated: mark this as updated
284+
c.updated = true
285+
}
286+
}
287+
277288
func (c *SpanContext) SamplingPriority() (p int, ok bool) {
278289
if c == nil || c.trace == nil {
279290
return 0, false
@@ -396,6 +407,14 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName) boo
396407
return t.setSamplingPriorityLocked(p, sampler)
397408
}
398409

410+
// forceSetSamplingPriority forces the sampling priority and the decision maker
411+
// and returns true if it was modified.
412+
func (t *trace) forceSetSamplingPriority(p int, sampler samplernames.SamplerName) bool {
413+
t.mu.Lock()
414+
defer t.mu.Unlock()
415+
return t.setSamplingPriorityLockedWithForce(p, sampler, true)
416+
}
417+
399418
func (t *trace) keep() {
400419
atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep))
401420
}
@@ -421,8 +440,13 @@ func samplerToDM(sampler samplernames.SamplerName) string {
421440
return "-" + strconv.Itoa(int(sampler))
422441
}
423442

424-
func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerName) bool {
425-
if t.locked {
443+
// setSamplingPriority sets the sampling priority and the decision maker
444+
// and returns true if it was modified.
445+
//
446+
// The force parameter is used to bypass the locked sampling decision check
447+
// when setting the sampling priority. This is used to apply a manual keep or drop decision.
448+
func (t *trace) setSamplingPriorityLockedWithForce(p int, sampler samplernames.SamplerName, force bool) bool {
449+
if t.locked && !force {
426450
return false
427451
}
428452

@@ -455,6 +479,10 @@ func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerNam
455479
return updatedPriority
456480
}
457481

482+
func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerName) bool {
483+
return t.setSamplingPriorityLockedWithForce(p, sampler, false)
484+
}
485+
458486
func (t *trace) isLocked() bool {
459487
t.mu.RLock()
460488
defer t.mu.RUnlock()

0 commit comments

Comments
 (0)