|
3 | 3 | from itertools import chain |
4 | 4 | import logging |
5 | 5 | from threading import RLock |
6 | | -from typing import Any |
7 | 6 | from typing import DefaultDict |
8 | 7 | from typing import Dict |
9 | 8 | from typing import List |
10 | 9 | from typing import Optional |
11 | | -from typing import Union |
12 | 10 |
|
13 | 11 | from ddtrace._trace.sampler import DatadogSampler |
14 | | -from ddtrace._trace.sampler import RateSampler |
15 | 12 | from ddtrace._trace.span import Span |
16 | 13 | from ddtrace._trace.span import _get_64_highest_order_bits_as_hex |
17 | 14 | from ddtrace.constants import _APM_ENABLED_METRIC_KEY as MK_APM_ENABLED |
|
22 | 19 | from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY |
23 | 20 | from ddtrace.internal.constants import MAX_UINT_64BITS |
24 | 21 | from ddtrace.internal.logger import get_logger |
| 22 | +from ddtrace.internal.rate_limiter import RateLimiter |
25 | 23 | from ddtrace.internal.sampling import SpanSamplingRule |
26 | 24 | from ddtrace.internal.sampling import get_span_sampling_rules |
27 | 25 | from ddtrace.internal.service import ServiceStatusError |
@@ -119,28 +117,30 @@ def __init__( |
119 | 117 | compute_stats_enabled: bool, |
120 | 118 | single_span_rules: List[SpanSamplingRule], |
121 | 119 | apm_opt_out: bool, |
122 | | - agent_based_samplers: Optional[dict] = None, |
123 | 120 | ): |
124 | 121 | super(TraceSamplingProcessor, self).__init__() |
125 | 122 | self._compute_stats_enabled = compute_stats_enabled |
126 | 123 | self.single_span_rules = single_span_rules |
| 124 | + self.sampler = DatadogSampler() |
127 | 125 | self.apm_opt_out = apm_opt_out |
128 | 126 |
|
| 127 | + @property |
| 128 | + def apm_opt_out(self): |
| 129 | + return self._apm_opt_out |
| 130 | + |
| 131 | + @apm_opt_out.setter |
| 132 | + def apm_opt_out(self, value): |
129 | 133 | # If ASM is enabled but tracing is disabled, |
130 | 134 | # we need to set the rate limiting to 1 trace per minute |
131 | 135 | # for the backend to consider the service as alive. |
132 | | - sampler_kwargs: Dict[str, Any] = { |
133 | | - "agent_based_samplers": agent_based_samplers, |
134 | | - } |
135 | | - if self.apm_opt_out: |
136 | | - sampler_kwargs.update( |
137 | | - { |
138 | | - "rate_limit": 1, |
139 | | - "rate_limit_window": 60e9, |
140 | | - "rate_limit_always_on": True, |
141 | | - } |
142 | | - ) |
143 | | - self.sampler: Union[DatadogSampler, RateSampler] = DatadogSampler(**sampler_kwargs) |
| 136 | + if value: |
| 137 | + self.sampler.limiter = RateLimiter(rate_limit=1, time_window=60e9) |
| 138 | + self.sampler._rate_limit_always_on = True |
| 139 | + log.debug("Enabling apm opt out on DatadogSampler: %s", self.sampler) |
| 140 | + else: |
| 141 | + self.sampler.limiter = RateLimiter(rate_limit=int(config._trace_rate_limit), time_window=1e9) |
| 142 | + self.sampler._rate_limit_always_on = False |
| 143 | + self._apm_opt_out = value |
144 | 144 |
|
145 | 145 | def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: |
146 | 146 | if trace: |
@@ -325,7 +325,7 @@ def on_span_start(self, span: Span) -> None: |
325 | 325 | log.debug(self.SPAN_START_DEBUG_MESSAGE, span, len(trace.spans)) |
326 | 326 |
|
327 | 327 | def on_span_finish(self, span: Span) -> None: |
328 | | - # Aqcuire lock to get finished and update trace.spans |
| 328 | + # Acquire lock to get finished and update trace.spans |
329 | 329 | with self._lock: |
330 | 330 | integration_name = span._meta.get(COMPONENT, span._span_api) |
331 | 331 | self._span_metrics["spans_finished"][integration_name] += 1 |
@@ -470,22 +470,12 @@ def reset( |
470 | 470 | # Re-create the writer to ensure it is consistent with updated configurations (ex: api_version) |
471 | 471 | self.writer = self.writer.recreate(appsec_enabled=appsec_enabled) |
472 | 472 |
|
473 | | - # Recreate the sampling processor using new or existing config values. |
474 | | - # If an argument is None, the current value is preserved. |
475 | | - if compute_stats is None: |
476 | | - compute_stats = self.sampling_processor._compute_stats_enabled |
477 | | - if apm_opt_out is None: |
478 | | - apm_opt_out = self.sampling_processor.apm_opt_out |
479 | | - self.sampling_processor = TraceSamplingProcessor( |
480 | | - compute_stats, |
481 | | - get_span_sampling_rules(), |
482 | | - apm_opt_out, |
483 | | - self.sampling_processor.sampler._agent_based_samplers |
484 | | - if isinstance(self.sampling_processor.sampler, DatadogSampler) |
485 | | - else None, |
486 | | - ) |
| 473 | + if compute_stats is not None: |
| 474 | + self.sampling_processor._compute_stats_enabled = compute_stats |
| 475 | + |
| 476 | + if apm_opt_out is not None: |
| 477 | + self.sampling_processor.apm_opt_out = apm_opt_out |
487 | 478 |
|
488 | | - # Update user processors if provided. |
489 | 479 | if user_processors is not None: |
490 | 480 | self.user_processors = user_processors |
491 | 481 |
|
|
0 commit comments