Skip to content

Commit 70f5222

Browse files
chore(tracing): simplify trace sampling and span aggregation (#12749)
## Motivation This PR improves the isolation of tracing components by removing direct references from the Tracer to several components, including DatadogSampler, TraceSamplingProcessor, TraceTagsProcessor, TraceWriter as well as partial flushing, trace agent and StatsD connection configurations. Instead, these components will now be defined where they are used. This change eliminates unnecessary attributes and ensures a single source of truth for each component. ## Changes - Moves the Tracer._writer to the SpanAggregator. The Tracer object no longer directly configure the submission of spans to the agent. This functionality is fully delegated to the SpanAggregator. - Introduces Tracer._span_aggregator, moving SpanAggregator to its own property for faster access to DatadogSampler and AgentWriter. - Removes partial flush attributes from Tracer; partial flushing is now only defined in the SpanAggregator. - Moves Tracer._agent_response_callback(), Tracer._use_sync_mode() and Tracer._use_log_writer() to the SpanAggregator. The tracer no longer directly references the agent writer. - Removes Tracer._compute_stats property, `ddtrace.config._compute_stats` (the global config) is now used directly. This ensures the global tracer always uses up to date configurations. - Moves TraceSamplingProcessor and TraceTagsProcessor initialization to SpanAggregator. - Centralizes all sampling logic in the TraceSamplingProcessor. This includes agent based, user-defined, dynamic, and single-span sampling. - **Core Enhancement: Ensures `Tracer.sample(...)` can access the SpanAggregator and the TraceSampling processor in constant time.** ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <[email protected]>
1 parent dc4cd33 commit 70f5222

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+404
-446
lines changed

benchmarks/appsec_iast_django_startup/views.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,9 @@ def index(request):
2424

2525

2626
def shutdown_view(request):
27-
tracer._writer.flush_queue()
27+
if hasattr(tracer, "_span_aggregator"):
28+
writer = tracer._span_aggregator.writer
29+
else:
30+
writer = tracer._writer
31+
writer.flush_queue()
2832
return HttpResponse("SHUTDOWN")

benchmarks/bm/utils.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from ddtrace import __version__ as ddtrace_version
1010
from ddtrace._trace.span import Span
1111
from ddtrace.internal import telemetry
12-
from ddtrace.trace import TraceFilter
1312

1413

1514
_Span = Span
@@ -59,13 +58,13 @@
5958
_Span = partial(_Span, None)
6059

6160

62-
class _DropTraces(TraceFilter):
63-
def process_trace(self, trace):
64-
return
65-
66-
6761
def drop_traces(tracer):
68-
tracer.configure(trace_processors=[_DropTraces()])
62+
if hasattr(tracer, "_span_aggregator"):
63+
writer = tracer._span_aggregator.writer
64+
else:
65+
writer = tracer._writer
66+
# Avoids sending traces to the agent
67+
writer.write = lambda x: None
6968

7069

7170
def drop_telemetry_events():

ddtrace/_trace/processor/__init__.py

Lines changed: 113 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import abc
22
from collections import defaultdict
3-
from threading import Lock
3+
from itertools import chain
4+
from os import environ
45
from threading import RLock
56
from typing import Dict
67
from typing import Iterable
78
from typing import List
89
from typing import Optional
9-
from typing import Union
1010

1111
from ddtrace._trace.sampler import DatadogSampler
1212
from ddtrace._trace.span import Span
@@ -19,14 +19,26 @@
1919
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
2020
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
2121
from ddtrace.internal.constants import MAX_UINT_64BITS
22+
from ddtrace.internal.dogstatsd import get_dogstatsd_client
2223
from ddtrace.internal.logger import get_logger
2324
from ddtrace.internal.sampling import SpanSamplingRule
25+
from ddtrace.internal.sampling import get_span_sampling_rules
2426
from ddtrace.internal.sampling import is_single_span_sampled
27+
from ddtrace.internal.serverless import has_aws_lambda_agent_extension
28+
from ddtrace.internal.serverless import in_aws_lambda
29+
from ddtrace.internal.serverless import in_azure_function
30+
from ddtrace.internal.serverless import in_gcp_function
2531
from ddtrace.internal.service import ServiceStatusError
2632
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
2733
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
34+
from ddtrace.internal.utils.http import verify_url
35+
from ddtrace.internal.writer import AgentResponse
36+
from ddtrace.internal.writer import AgentWriter
37+
from ddtrace.internal.writer import LogWriter
2838
from ddtrace.internal.writer import TraceWriter
39+
from ddtrace.settings._agent import config as agent_config
2940
from ddtrace.settings._config import config
41+
from ddtrace.settings.asm import config as asm_config
3042

3143

3244
try:
@@ -116,18 +128,18 @@ class TraceSamplingProcessor(TraceProcessor):
116128
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
117129
"""
118130

119-
def __init__(
120-
self,
121-
compute_stats_enabled: bool,
122-
sampler: DatadogSampler,
123-
single_span_rules: List[SpanSamplingRule],
124-
apm_opt_out: bool,
125-
):
131+
def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool):
126132
super(TraceSamplingProcessor, self).__init__()
127133
self._compute_stats_enabled = compute_stats_enabled
128-
self.sampler = sampler
129134
self.single_span_rules = single_span_rules
130135
self.apm_opt_out = apm_opt_out
136+
if self.apm_opt_out:
137+
# If ASM is enabled but tracing is disabled,
138+
# we need to set the rate limiting to 1 trace per minute
139+
# for the backend to consider the service as alive.
140+
self.sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True)
141+
else:
142+
self.sampler = DatadogSampler()
131143

132144
def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
133145
if trace:
@@ -249,17 +261,38 @@ def __init__(
249261
partial_flush_enabled: bool,
250262
partial_flush_min_spans: int,
251263
trace_processors: Iterable[TraceProcessor],
252-
writer: TraceWriter,
264+
writer: Optional[TraceWriter] = None,
253265
):
254-
self._partial_flush_enabled = partial_flush_enabled
255-
self._partial_flush_min_spans = partial_flush_min_spans
256-
self._trace_processors = trace_processors
257-
self._writer = writer
258-
266+
# Set partial flushing
267+
self.partial_flush_enabled = partial_flush_enabled
268+
self.partial_flush_min_spans = partial_flush_min_spans
269+
# Initialize trace processors
270+
self.sampling_processor = TraceSamplingProcessor(
271+
config._trace_compute_stats, get_span_sampling_rules(), asm_config._apm_opt_out
272+
)
273+
self.tags_processor = TraceTagsProcessor()
274+
self.trace_processors = trace_processors
275+
# Initialize writer
276+
if writer is not None:
277+
self.writer: TraceWriter = writer
278+
elif SpanAggregator._use_log_writer():
279+
self.writer = LogWriter()
280+
else:
281+
verify_url(agent_config.trace_agent_url)
282+
self.writer = AgentWriter(
283+
agent_url=agent_config.trace_agent_url,
284+
dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
285+
sync_mode=SpanAggregator._use_sync_mode(),
286+
headers={"Datadog-Client-Computed-Stats": "yes"}
287+
if (config._trace_compute_stats or asm_config._apm_opt_out)
288+
else {},
289+
report_metrics=not asm_config._apm_opt_out,
290+
response_callback=self._agent_response_callback,
291+
)
292+
# Initialize the trace buffer and lock
259293
self._traces: DefaultDict[int, _Trace] = defaultdict(lambda: _Trace())
260-
self._lock: Union[RLock, Lock] = RLock() if config._span_aggregator_rlock else Lock()
261-
262-
# Tracks the number of spans created and tags each count with the api that was used
294+
self._lock: RLock = RLock()
295+
# Track telemetry span metrics by span api
263296
# ex: otel api, opentracing api, datadog api
264297
self._span_metrics: Dict[str, DefaultDict] = {
265298
"spans_created": defaultdict(int),
@@ -270,10 +303,12 @@ def __init__(
270303
def __repr__(self) -> str:
271304
return (
272305
f"{self.__class__.__name__}("
273-
f"{self._partial_flush_enabled}, "
274-
f"{self._partial_flush_min_spans}, "
275-
f"{self._trace_processors}, "
276-
f"{self._writer})"
306+
f"{self.partial_flush_enabled}, "
307+
f"{self.partial_flush_min_spans}, "
308+
f"{self.sampling_processor},"
309+
f"{self.tags_processor},"
310+
f"{self.trace_processors}, "
311+
f"{self.writer})"
277312
)
278313

279314
def on_span_start(self, span: Span) -> None:
@@ -298,7 +333,7 @@ def on_span_finish(self, span: Span) -> None:
298333

299334
trace = self._traces[span.trace_id]
300335
trace.num_finished += 1
301-
should_partial_flush = self._partial_flush_enabled and trace.num_finished >= self._partial_flush_min_spans
336+
should_partial_flush = self.partial_flush_enabled and trace.num_finished >= self.partial_flush_min_spans
302337
if trace.num_finished == len(trace.spans) or should_partial_flush:
303338
trace_spans = trace.spans
304339
trace.spans = []
@@ -334,7 +369,7 @@ def on_span_finish(self, span: Span) -> None:
334369
finished[0].set_metric("_dd.py.partial_flush", num_finished)
335370

336371
spans: Optional[List[Span]] = finished
337-
for tp in self._trace_processors:
372+
for tp in chain(self.trace_processors, [self.sampling_processor, self.tags_processor]):
338373
try:
339374
if spans is None:
340375
return
@@ -343,12 +378,62 @@ def on_span_finish(self, span: Span) -> None:
343378
log.error("error applying processor %r", tp, exc_info=True)
344379

345380
self._queue_span_count_metrics("spans_finished", "integration_name")
346-
self._writer.write(spans)
381+
self.writer.write(spans)
347382
return
348383

349384
log.debug("trace %d has %d spans, %d finished", span.trace_id, len(trace.spans), trace.num_finished)
350385
return None
351386

387+
def _agent_response_callback(self, resp: AgentResponse) -> None:
388+
"""Handle the response from the agent.
389+
390+
The agent can return updated sample rates for the priority sampler.
391+
"""
392+
try:
393+
self.sampling_processor.sampler.update_rate_by_service_sample_rates(
394+
resp.rate_by_service,
395+
)
396+
except ValueError as e:
397+
log.error("Failed to set agent service sample rates: %s", str(e))
398+
399+
@staticmethod
400+
def _use_log_writer() -> bool:
401+
"""Returns whether the LogWriter should be used in the environment by
402+
default.
403+
404+
The LogWriter required by default in AWS Lambdas when the Datadog Agent extension
405+
is not available in the Lambda.
406+
"""
407+
if (
408+
environ.get("DD_AGENT_HOST")
409+
or environ.get("DATADOG_TRACE_AGENT_HOSTNAME")
410+
or environ.get("DD_TRACE_AGENT_URL")
411+
):
412+
# If one of these variables are set, we definitely have an agent
413+
return False
414+
elif in_aws_lambda() and has_aws_lambda_agent_extension():
415+
# If the Agent Lambda extension is available then an AgentWriter is used.
416+
return False
417+
elif in_gcp_function() or in_azure_function():
418+
return False
419+
else:
420+
return in_aws_lambda()
421+
422+
@staticmethod
423+
def _use_sync_mode() -> bool:
424+
"""Returns, if an `AgentWriter` is to be used, whether it should be run
425+
in synchronous mode by default.
426+
427+
There are only two cases in which this is desirable:
428+
429+
- AWS Lambdas can have the Datadog agent installed via an extension.
430+
When it's available traces must be sent synchronously to ensure all
431+
are received before the Lambda terminates.
432+
- Google Cloud Functions and Azure Functions have a mini-agent spun up by the tracer.
433+
Similarly to AWS Lambdas, sync mode should be used to avoid data loss.
434+
"""
435+
return (in_aws_lambda() and has_aws_lambda_agent_extension()) or in_gcp_function() or in_azure_function()
436+
352437
def shutdown(self, timeout: Optional[float]) -> None:
353438
"""
354439
This will stop the background writer/worker and flush any finished traces in the buffer. The tracer cannot be
@@ -379,7 +464,8 @@ def shutdown(self, timeout: Optional[float]) -> None:
379464
)
380465

381466
try:
382-
self._writer.stop(timeout)
467+
self._traces.clear()
468+
self.writer.stop(timeout)
383469
except ServiceStatusError:
384470
# It's possible the writer never got started in the first place :(
385471
pass

0 commit comments

Comments
 (0)