diff --git a/ddtrace/tracer/civisibility_writer.go b/ddtrace/tracer/civisibility_writer.go index 038e8e6089..da34783d40 100644 --- a/ddtrace/tracer/civisibility_writer.go +++ b/ddtrace/tracer/civisibility_writer.go @@ -82,7 +82,7 @@ func (w *ciVisibilityTraceWriter) stop() { // flush sends the current payload to the transport. It ensures that the payload is reset // and the resources are freed after the flush operation is completed. -func (w *ciVisibilityTraceWriter) flush() { +func (w *ciVisibilityTraceWriter) flush(done ...chan<- struct{}) { if w.payload.stats().itemCount == 0 { return } @@ -93,6 +93,9 @@ func (w *ciVisibilityTraceWriter) flush() { w.payload = newCiVisibilityPayload() go func(p *ciVisibilityPayload) { + if len(done) > 0 && done[0] != nil { + defer close(done[0]) + } defer func(_ time.Time) { // Once the payload has been used, clear the buffer for garbage // collection to avoid a memory leak when references to this object diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 9c03f91cda..e0989ce8cc 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -279,6 +279,9 @@ type config struct { // from DD_TRACE_PARTIAL_FLUSH_ENABLED, default false. partialFlushEnabled bool + // syncFlushEnabled specifies whether the tracer should perform flush synchronously. + syncFlushEnabled bool + // statsComputationEnabled enables client-side stats computation (aka trace metrics). statsComputationEnabled bool @@ -1316,6 +1319,14 @@ func WithPartialFlushing(numSpans int) StartOption { } } +// WithSyncFlushing enables synchronous flushing, meaning that calls to +// Flush() will block until the payload has been sent. +func WithSyncFlushing(enabled bool) StartOption { + return func(c *config) { + c.syncFlushEnabled = enabled + } +} + // WithStatsComputation enables client-side stats computation, allowing // the tracer to compute stats from traces. This can reduce network traffic // to the Datadog Agent, and produce more accurate stats data. diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 3dfe172aea..d5705618b7 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -51,6 +51,7 @@ type TracerConf struct { //nolint:revive VersionTag string ServiceTag string TracingAsTransport bool + SyncFlushing bool } // Tracer specifies an implementation of the Datadog tracer which allows starting @@ -163,6 +164,11 @@ type tracer struct { // telemetry is the telemetry client for the tracer. telemetry telemetry.Client + + // lastFlushedAt tracks when the tracer last performed a flush to prevent + // thundering herd issues when multiple timers fire simultaneously after + // process suspension (e.g., process sleep). + lastFlushedAt time.Time } const ( @@ -415,9 +421,10 @@ func newUnstartedTracer(opts ...StartOption) (t *tracer, err error) { DollarQuotedFunc: c.agent.HasFlag("dollar_quoted_func"), }, }), - statsd: statsd, - dataStreams: dataStreamsProcessor, - logFile: logFile, + statsd: statsd, + dataStreams: dataStreamsProcessor, + logFile: logFile, + lastFlushedAt: time.Now(), // Initialize to prevent immediate flush throttling } return t, nil } @@ -497,6 +504,7 @@ func (t *tracer) Flush() { done := make(chan struct{}) t.flush <- done <-done + t.lastFlushedAt = time.Now() if t.dataStreams != nil { t.dataStreams.Flush() } @@ -513,20 +521,34 @@ func (t *tracer) worker(tick <-chan time.Time) { t.traceWriter.add(trace.spans) } case <-tick: + // Skip flush if we've flushed too recently to prevent thundering herd after process suspension + // Only apply throttling to real timers (not test-controlled ticks). + if t.config.tickChan == nil && time.Since(t.lastFlushedAt) <= flushInterval { + continue + } t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1) t.traceWriter.flush() + // This is not actual flush completion time, because traceWriter.flush() + // is non-blocking. However, it's the best we can do. + t.lastFlushedAt = time.Now() case done := <-t.flush: + // Manual flush requests should always proceed to respect user intent. t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1) - t.traceWriter.flush() + if t.config.syncFlushEnabled { + // If sync flushing is enabled, we want to wait until the flush is fully done. + t.traceWriter.flush(done) + } else { + t.traceWriter.flush() + } t.statsd.Flush() if !t.config.tracingAsTransport { t.stats.flushAndSend(time.Now(), withCurrentBucket) } - // TODO(x): In reality, the traceWriter.flush() call is not synchronous - // when using the agent traceWriter. However, this functionality is used - // in Lambda so for that purpose this mechanism should suffice. - done <- struct{}{} + if !t.config.syncFlushEnabled { + // If sync flushing is disabled, we can close the channel immediately. + done <- struct{}{} + } case <-t.stop: loop: @@ -939,6 +961,7 @@ func (t *tracer) TracerConf() TracerConf { VersionTag: t.config.version, ServiceTag: t.config.serviceName, TracingAsTransport: t.config.tracingAsTransport, + SyncFlushing: t.config.syncFlushEnabled, } } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 20b3531ae9..de608f24ab 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -2447,7 +2447,10 @@ func (w *testTraceWriter) add(spans []*Span) { w.mu.Unlock() } -func (w *testTraceWriter) flush() { +func (w *testTraceWriter) flush(done ...chan<- struct{}) { + if len(done) > 0 && done[0] != nil { + defer close(done[0]) + } w.mu.Lock() w.flushed = append(w.flushed, w.buf...) w.buf = w.buf[:0] @@ -2479,7 +2482,7 @@ func TestFlush(t *testing.T) { tr.traceWriter = tw ts := &statsdtest.TestStatsdClient{} - tr.statsd.Close() + // tr.statsd.Close() tr.statsd = ts transport := newDummyTransport() diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index ea5c1c55b6..a86f4b9b13 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -26,7 +26,7 @@ type traceWriter interface { add([]*Span) // flush causes the writer to send any buffered traces. - flush() + flush(done ...chan<- struct{}) // stop gracefully shuts down the writer. stop() @@ -102,12 +102,15 @@ func (h *agentTraceWriter) newPayload() payload { } // flush will push any currently buffered traces to the server. -func (h *agentTraceWriter) flush() { +func (h *agentTraceWriter) flush(done ...chan<- struct{}) { h.mu.Lock() oldp := h.payload - // Check after acquiring lock + // Check after acquiring lock. if oldp.itemCount() == 0 { h.mu.Unlock() + if len(done) > 0 && done[0] != nil { + close(done[0]) + } return } h.payload = h.newPayload() @@ -116,6 +119,9 @@ func (h *agentTraceWriter) flush() { h.climit <- struct{}{} h.wg.Add(1) go func(p payload) { + if len(done) > 0 && done[0] != nil { + defer close(done[0]) + } defer func(start time.Time) { // Once the payload has been used, clear the buffer for garbage // collection to avoid a memory leak when references to this object @@ -377,7 +383,10 @@ func (h *logTraceWriter) stop() { } // flush will write any buffered traces to standard output. -func (h *logTraceWriter) flush() { +func (h *logTraceWriter) flush(done ...chan<- struct{}) { + if len(done) > 0 && done[0] != nil { + defer close(done[0]) + } if !h.hasTraces { return }