Skip to content

Commit 4d23c32

Browse files
fix(sampling): avoid resetting sampling rules on forks [backport 3.14] (#14602)
Backport 1692801 from #14557 to 3.14. Change logic to avoid recreating DatadogSampler unnecessarily when the SpanAggregator is recreated. If apm_opt_out is enabled, update the sampler to use a new `RateLimiter` with updated values. This was potentially causing issues with sampling rules getting reset when the DatadogSampler gets recreated. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Quinna Halim <[email protected]>
1 parent da1f899 commit 4d23c32

File tree

4 files changed

+64
-38
lines changed

4 files changed

+64
-38
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
from itertools import chain
44
import logging
55
from threading import RLock
6-
from typing import Any
76
from typing import DefaultDict
87
from typing import Dict
98
from typing import List
109
from typing import Optional
11-
from typing import Union
1210

1311
from ddtrace._trace.sampler import DatadogSampler
14-
from ddtrace._trace.sampler import RateSampler
1512
from ddtrace._trace.span import Span
1613
from ddtrace._trace.span import _get_64_highest_order_bits_as_hex
1714
from ddtrace.constants import _APM_ENABLED_METRIC_KEY as MK_APM_ENABLED
@@ -22,6 +19,7 @@
2219
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
2320
from ddtrace.internal.constants import MAX_UINT_64BITS
2421
from ddtrace.internal.logger import get_logger
22+
from ddtrace.internal.rate_limiter import RateLimiter
2523
from ddtrace.internal.sampling import SpanSamplingRule
2624
from ddtrace.internal.sampling import get_span_sampling_rules
2725
from ddtrace.internal.service import ServiceStatusError
@@ -119,28 +117,30 @@ def __init__(
119117
compute_stats_enabled: bool,
120118
single_span_rules: List[SpanSamplingRule],
121119
apm_opt_out: bool,
122-
agent_based_samplers: Optional[dict] = None,
123120
):
124121
super(TraceSamplingProcessor, self).__init__()
125122
self._compute_stats_enabled = compute_stats_enabled
126123
self.single_span_rules = single_span_rules
124+
self.sampler = DatadogSampler()
127125
self.apm_opt_out = apm_opt_out
128126

127+
@property
128+
def apm_opt_out(self):
129+
return self._apm_opt_out
130+
131+
@apm_opt_out.setter
132+
def apm_opt_out(self, value):
129133
# If ASM is enabled but tracing is disabled,
130134
# we need to set the rate limiting to 1 trace per minute
131135
# for the backend to consider the service as alive.
132-
sampler_kwargs: Dict[str, Any] = {
133-
"agent_based_samplers": agent_based_samplers,
134-
}
135-
if self.apm_opt_out:
136-
sampler_kwargs.update(
137-
{
138-
"rate_limit": 1,
139-
"rate_limit_window": 60e9,
140-
"rate_limit_always_on": True,
141-
}
142-
)
143-
self.sampler: Union[DatadogSampler, RateSampler] = DatadogSampler(**sampler_kwargs)
136+
if value:
137+
self.sampler.limiter = RateLimiter(rate_limit=1, time_window=60e9)
138+
self.sampler._rate_limit_always_on = True
139+
log.debug("Enabling apm opt out on DatadogSampler: %s", self.sampler)
140+
else:
141+
self.sampler.limiter = RateLimiter(rate_limit=int(config._trace_rate_limit), time_window=1e9)
142+
self.sampler._rate_limit_always_on = False
143+
self._apm_opt_out = value
144144

145145
def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
146146
if trace:
@@ -325,7 +325,7 @@ def on_span_start(self, span: Span) -> None:
325325
log.debug(self.SPAN_START_DEBUG_MESSAGE, span, len(trace.spans))
326326

327327
def on_span_finish(self, span: Span) -> None:
328-
# Aqcuire lock to get finished and update trace.spans
328+
# Acquire lock to get finished and update trace.spans
329329
with self._lock:
330330
integration_name = span._meta.get(COMPONENT, span._span_api)
331331
self._span_metrics["spans_finished"][integration_name] += 1
@@ -470,22 +470,12 @@ def reset(
470470
# Re-create the writer to ensure it is consistent with updated configurations (ex: api_version)
471471
self.writer = self.writer.recreate(appsec_enabled=appsec_enabled)
472472

473-
# Recreate the sampling processor using new or existing config values.
474-
# If an argument is None, the current value is preserved.
475-
if compute_stats is None:
476-
compute_stats = self.sampling_processor._compute_stats_enabled
477-
if apm_opt_out is None:
478-
apm_opt_out = self.sampling_processor.apm_opt_out
479-
self.sampling_processor = TraceSamplingProcessor(
480-
compute_stats,
481-
get_span_sampling_rules(),
482-
apm_opt_out,
483-
self.sampling_processor.sampler._agent_based_samplers
484-
if isinstance(self.sampling_processor.sampler, DatadogSampler)
485-
else None,
486-
)
473+
if compute_stats is not None:
474+
self.sampling_processor._compute_stats_enabled = compute_stats
475+
476+
if apm_opt_out is not None:
477+
self.sampling_processor.apm_opt_out = apm_opt_out
487478

488-
# Update user processors if provided.
489479
if user_processors is not None:
490480
self.user_processors = user_processors
491481

ddtrace/_trace/sampler.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ def __init__(
9191
rate_limit: Optional[int] = None,
9292
rate_limit_window: float = 1e9,
9393
rate_limit_always_on: bool = False,
94-
agent_based_samplers: Optional[Dict[str, RateSampler]] = None,
9594
):
9695
"""
9796
Constructor for DatadogSampler sampler
@@ -102,8 +101,6 @@ def __init__(
102101
:param rate_limit_window: The time window in nanoseconds for the rate limit, default is 1 second
103102
:param rate_limit_always_on: If set to `True`, the rate limit is always applied, even if no sampling rules
104103
are provided.
105-
:param agent_based_samplers: A dictionary of service-based samplers, mapping a key in the format
106-
`service:<service>,env:<env>` to a :class:`RateSampler` instance.
107104
"""
108105
# Set sampling rules
109106
global_sampling_rules = config._trace_sampling_rules
@@ -112,7 +109,7 @@ def __init__(
112109
else:
113110
self.rules: List[SamplingRule] = rules or []
114111
# Set Agent based samplers
115-
self._agent_based_samplers = agent_based_samplers or {}
112+
self._agent_based_samplers: Dict = {}
116113
# Set rate limiter
117114
self._rate_limit_always_on: bool = rate_limit_always_on
118115
if rate_limit is None:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
sampling: This change prevents the DatadogSampler from getting recreated whenever the SpanAggregator is reset, and instead updates the rate limiter that the sampler uses.

tests/tracer/test_processors.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def process_trace(self, trace):
107107

108108
def test_aggregator_reset_default_args():
109109
"""
110-
Test that on reset, the aggregator recreates the sampling processor and trace writer.
110+
Test that on reset, the aggregator recreates trace writer but not the sampling processor (by default).
111111
Processors and trace buffers should be reset not reset.
112112
"""
113113
dd_proc = DummyProcessor()
@@ -134,11 +134,46 @@ def test_aggregator_reset_default_args():
134134
assert dd_proc in aggr.dd_processors
135135
assert user_proc in aggr.user_processors
136136
assert aggr.writer is not dm_writer
137-
assert sampling_proc is not aggr.sampling_processor
137+
assert sampling_proc is aggr.sampling_processor
138138
assert not aggr._traces
139139
assert len(aggr._span_metrics["spans_created"]) == 0
140140

141141

142+
def test_aggregator_reset_apm_opt_out_preserves_sampling():
143+
"""
144+
Test that calling aggr.reset(apm_opt_out=True) updates the apm_opt_out setting
145+
but preserves the sampling rules on the TraceSamplingProcessor.
146+
"""
147+
sampling_rule = SpanSamplingRule(service="test_service", name="test_name", sample_rate=0.5, max_per_second=10)
148+
149+
dd_proc = DummyProcessor()
150+
user_proc = DummyProcessor()
151+
aggr = SpanAggregator(
152+
partial_flush_enabled=False,
153+
partial_flush_min_spans=1,
154+
dd_processors=[dd_proc],
155+
user_processors=[user_proc],
156+
)
157+
158+
sampling_proc = aggr.sampling_processor
159+
original_apm_opt_out = sampling_proc.apm_opt_out
160+
161+
sampling_proc.sampler.rules = [TraceSamplingRule(sample_rate=0.1)]
162+
rule = sampling_proc.sampler.rules[0]
163+
sampling_proc.single_span_rules = [sampling_rule]
164+
165+
assert sampling_proc.single_span_rules == [sampling_rule]
166+
assert sampling_proc.apm_opt_out == original_apm_opt_out
167+
168+
aggr.reset(apm_opt_out=True)
169+
170+
# Assert that sampling rules are preserved after reset
171+
assert sampling_proc.apm_opt_out is True
172+
assert sampling_proc.single_span_rules == [sampling_rule]
173+
assert sampling_proc.sampler.rules[0] == rule
174+
assert sampling_proc is aggr.sampling_processor
175+
176+
142177
@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter))
143178
def test_aggregator_reset_with_args(writer_class):
144179
"""

0 commit comments

Comments
 (0)