Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddtrace/tracer/civisibility_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (w *ciVisibilityTraceWriter) stop() {

// flush sends the current payload to the transport. It ensures that the payload is reset
// and the resources are freed after the flush operation is completed.
func (w *ciVisibilityTraceWriter) flush() {
func (w *ciVisibilityTraceWriter) flush(done ...chan<- struct{}) {
if w.payload.stats().itemCount == 0 {
return
}
Expand All @@ -93,6 +93,9 @@ func (w *ciVisibilityTraceWriter) flush() {
w.payload = newCiVisibilityPayload()

go func(p *ciVisibilityPayload) {
if len(done) > 0 && done[0] != nil {
defer close(done[0])
}
defer func(_ time.Time) {
// Once the payload has been used, clear the buffer for garbage
// collection to avoid a memory leak when references to this object
Expand Down
11 changes: 11 additions & 0 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ type config struct {
// from DD_TRACE_PARTIAL_FLUSH_ENABLED, default false.
partialFlushEnabled bool

// syncFlushEnabled specifies whether the tracer should perform flush synchronously.
syncFlushEnabled bool

// statsComputationEnabled enables client-side stats computation (aka trace metrics).
statsComputationEnabled bool

Expand Down Expand Up @@ -1316,6 +1319,14 @@ func WithPartialFlushing(numSpans int) StartOption {
}
}

// WithSyncFlushing enables synchronous flushing, meaning that calls to
// Flush() will block until the payload has been sent.
func WithSyncFlushing(enabled bool) StartOption {
return func(c *config) {
c.syncFlushEnabled = enabled
}
}

// WithStatsComputation enables client-side stats computation, allowing
// the tracer to compute stats from traces. This can reduce network traffic
// to the Datadog Agent, and produce more accurate stats data.
Expand Down
39 changes: 31 additions & 8 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TracerConf struct { //nolint:revive
VersionTag string
ServiceTag string
TracingAsTransport bool
SyncFlushing bool
}

// Tracer specifies an implementation of the Datadog tracer which allows starting
Expand Down Expand Up @@ -163,6 +164,11 @@ type tracer struct {

// telemetry is the telemetry client for the tracer.
telemetry telemetry.Client

// lastFlushedAt tracks when the tracer last performed a flush to prevent
// thundering herd issues when multiple timers fire simultaneously after
// process suspension (e.g., process sleep).
lastFlushedAt time.Time
}

const (
Expand Down Expand Up @@ -415,9 +421,10 @@ func newUnstartedTracer(opts ...StartOption) (t *tracer, err error) {
DollarQuotedFunc: c.agent.HasFlag("dollar_quoted_func"),
},
}),
statsd: statsd,
dataStreams: dataStreamsProcessor,
logFile: logFile,
statsd: statsd,
dataStreams: dataStreamsProcessor,
logFile: logFile,
lastFlushedAt: time.Now(), // Initialize to prevent immediate flush throttling
}
return t, nil
}
Expand Down Expand Up @@ -497,6 +504,7 @@ func (t *tracer) Flush() {
done := make(chan struct{})
t.flush <- done
<-done
t.lastFlushedAt = time.Now()
if t.dataStreams != nil {
t.dataStreams.Flush()
}
Expand All @@ -513,20 +521,34 @@ func (t *tracer) worker(tick <-chan time.Time) {
t.traceWriter.add(trace.spans)
}
case <-tick:
// Skip flush if we've flushed too recently to prevent thundering herd after process suspension
// Only apply throttling to real timers (not test-controlled ticks).
if t.config.tickChan == nil && time.Since(t.lastFlushedAt) <= flushInterval {
continue
}
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1)
t.traceWriter.flush()
// This is not actual flush completion time, because traceWriter.flush()
// is non-blocking. However, it's the best we can do.
t.lastFlushedAt = time.Now()

case done := <-t.flush:
// Manual flush requests should always proceed to respect user intent.
t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1)
t.traceWriter.flush()
if t.config.syncFlushEnabled {
// If sync flushing is enabled, we want to wait until the flush is fully done.
t.traceWriter.flush(done)
} else {
t.traceWriter.flush()
}
t.statsd.Flush()
if !t.config.tracingAsTransport {
t.stats.flushAndSend(time.Now(), withCurrentBucket)
}
// TODO(x): In reality, the traceWriter.flush() call is not synchronous
// when using the agent traceWriter. However, this functionality is used
// in Lambda so for that purpose this mechanism should suffice.
done <- struct{}{}
if !t.config.syncFlushEnabled {
// If sync flushing is disabled, we can close the channel immediately.
done <- struct{}{}
}

case <-t.stop:
loop:
Expand Down Expand Up @@ -939,6 +961,7 @@ func (t *tracer) TracerConf() TracerConf {
VersionTag: t.config.version,
ServiceTag: t.config.serviceName,
TracingAsTransport: t.config.tracingAsTransport,
SyncFlushing: t.config.syncFlushEnabled,
}
}

Expand Down
7 changes: 5 additions & 2 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2447,7 +2447,10 @@ func (w *testTraceWriter) add(spans []*Span) {
w.mu.Unlock()
}

func (w *testTraceWriter) flush() {
func (w *testTraceWriter) flush(done ...chan<- struct{}) {
if len(done) > 0 && done[0] != nil {
defer close(done[0])
}
w.mu.Lock()
w.flushed = append(w.flushed, w.buf...)
w.buf = w.buf[:0]
Expand Down Expand Up @@ -2479,7 +2482,7 @@ func TestFlush(t *testing.T) {
tr.traceWriter = tw

ts := &statsdtest.TestStatsdClient{}
tr.statsd.Close()
// tr.statsd.Close()
tr.statsd = ts

transport := newDummyTransport()
Expand Down
17 changes: 13 additions & 4 deletions ddtrace/tracer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type traceWriter interface {
add([]*Span)

// flush causes the writer to send any buffered traces.
flush()
flush(done ...chan<- struct{})

// stop gracefully shuts down the writer.
stop()
Expand Down Expand Up @@ -102,12 +102,15 @@ func (h *agentTraceWriter) newPayload() payload {
}

// flush will push any currently buffered traces to the server.
func (h *agentTraceWriter) flush() {
func (h *agentTraceWriter) flush(done ...chan<- struct{}) {
h.mu.Lock()
oldp := h.payload
// Check after acquiring lock
// Check after acquiring lock.
if oldp.itemCount() == 0 {
h.mu.Unlock()
if len(done) > 0 && done[0] != nil {
close(done[0])
}
return
}
h.payload = h.newPayload()
Expand All @@ -116,6 +119,9 @@ func (h *agentTraceWriter) flush() {
h.climit <- struct{}{}
h.wg.Add(1)
go func(p payload) {
if len(done) > 0 && done[0] != nil {
defer close(done[0])
}
defer func(start time.Time) {
// Once the payload has been used, clear the buffer for garbage
// collection to avoid a memory leak when references to this object
Expand Down Expand Up @@ -377,7 +383,10 @@ func (h *logTraceWriter) stop() {
}

// flush will write any buffered traces to standard output.
func (h *logTraceWriter) flush() {
func (h *logTraceWriter) flush(done ...chan<- struct{}) {
if len(done) > 0 && done[0] != nil {
defer close(done[0])
}
if !h.hasTraces {
return
}
Expand Down
Loading