Skip to content

Commit 5320610

Browse files
fix(sampling): revert "refactor(writer): handle agent responses in the tracer (#6178)" [backport 2.0] (#7337)
Backport 31ef044 from #7333 to 2.0. This reverts commit ec3e14d. Which caused the tracer to ignore trace-agent sampling rates, often times leading to much higher ingestion rates. I tested this manually by looking at `datadog.trace_agent.receiver.traces_priority` between <1.19 and >=1.19. <1.19: some P1 and a lot of P0 >=1.19: a lot more P1 (and a little more P-1) and no P0 Basically before 1.19 there were a ton of traces getting dropped with P0 priority due to the trace-agent recommended rates. After 1.19 we ignored the trace-agent recommended rates, leading to no P0 traces, and far more being sampled overall. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. Co-authored-by: Zachary Groves <[email protected]>
1 parent df4a9a3 commit 5320610

File tree

5 files changed

+34
-42
lines changed

5 files changed

+34
-42
lines changed

ddtrace/internal/ci_visibility/writer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
from ddtrace.vendor.dogstatsd import DogStatsd
3131

32+
from ...sampler import BaseSampler
33+
3234

3335
class CIVisibilityEventClient(WriterClientBase):
3436
def __init__(self):
@@ -79,6 +81,7 @@ class CIVisibilityWriter(HTTPWriter):
7981
def __init__(
8082
self,
8183
intake_url="", # type: str
84+
sampler=None, # type: Optional[BaseSampler]
8285
processing_interval=None, # type: Optional[float]
8386
timeout=None, # type: Optional[float]
8487
dogstatsd=None, # type: Optional[DogStatsd]
@@ -126,6 +129,7 @@ def __init__(
126129
super(CIVisibilityWriter, self).__init__(
127130
intake_url=intake_url,
128131
clients=clients,
132+
sampler=sampler,
129133
processing_interval=processing_interval,
130134
timeout=timeout,
131135
dogstatsd=dogstatsd,
@@ -142,6 +146,7 @@ def recreate(self):
142146
# type: () -> HTTPWriter
143147
return self.__class__(
144148
intake_url=self.intake_url,
149+
sampler=self._sampler,
145150
processing_interval=self._interval,
146151
timeout=self._timeout,
147152
dogstatsd=self.dogstatsd,

ddtrace/internal/writer/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from .writer import AgentResponse # noqa
21
from .writer import AgentWriter # noqa
32
from .writer import DEFAULT_SMA_WINDOW # noqa
43
from .writer import HTTPWriter # noqa

ddtrace/internal/writer/writer.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import os
66
import sys
77
import threading
8-
from typing import Any
9-
from typing import Callable
108
from typing import Dict
119
from typing import List
1210
from typing import Optional
@@ -28,6 +26,8 @@
2826
from ...internal.utils.formats import parse_tags_str
2927
from ...internal.utils.http import Response
3028
from ...internal.utils.time import StopWatch
29+
from ...sampler import BasePrioritySampler
30+
from ...sampler import BaseSampler
3131
from .._encoding import BufferFull
3232
from .._encoding import BufferItemTooLarge
3333
from ..agent import get_connection
@@ -103,8 +103,10 @@ class LogWriter(TraceWriter):
103103
def __init__(
104104
self,
105105
out=sys.stdout, # type: TextIO
106+
sampler=None, # type: Optional[BaseSampler]
106107
):
107108
# type: (...) -> None
109+
self._sampler = sampler
108110
self.encoder = JSONEncoderV2()
109111
self.out = out
110112

@@ -115,7 +117,7 @@ def recreate(self):
115117
:rtype: :class:`LogWriter`
116118
:returns: A new :class:`LogWriter` instance
117119
"""
118-
writer = self.__class__(out=self.out)
120+
writer = self.__class__(out=self.out, sampler=self._sampler)
119121
return writer
120122

121123
def stop(self, timeout=None):
@@ -147,6 +149,7 @@ def __init__(
147149
self,
148150
intake_url, # type: str
149151
clients, # type: List[WriterClientBase]
152+
sampler=None, # type: Optional[BaseSampler]
150153
processing_interval=None, # type: Optional[float]
151154
# Match the payload size since there is no functionality
152155
# to flush dynamically.
@@ -168,6 +171,7 @@ def __init__(
168171
self.intake_url = intake_url
169172
self._buffer_size = buffer_size
170173
self._max_payload_size = max_payload_size
174+
self._sampler = sampler
171175
self._headers = headers or {}
172176
self._timeout = timeout
173177

@@ -295,7 +299,6 @@ def _get_finalized_headers(self, count, client):
295299
return headers
296300

297301
def _send_payload(self, payload, count, client):
298-
# type: (...) -> Response
299302
headers = self._get_finalized_headers(count, client)
300303

301304
self._metrics_dist("http.requests")
@@ -313,15 +316,15 @@ def _send_payload(self, payload, count, client):
313316
self._intake_endpoint(client),
314317
response.status,
315318
response.reason,
316-
) # type: Tuple[Any, Any, Any]
319+
)
317320
# Append the payload if requested
318321
if config._trace_writer_log_err_payload:
319322
msg += ", payload %s"
320323
# If the payload is bytes then hex encode the value before logging
321324
if isinstance(payload, six.binary_type):
322-
log_args += (binascii.hexlify(payload).decode(),) # type: ignore
325+
log_args += (binascii.hexlify(payload).decode(),)
323326
else:
324-
log_args += (payload,) # type: ignore
327+
log_args += (payload,)
325328

326329
log.error(msg, *log_args)
327330
self._metrics_dist("http.dropped.bytes", len(payload))
@@ -447,12 +450,6 @@ def on_shutdown(self):
447450
self._reset_connection()
448451

449452

450-
class AgentResponse(object):
451-
def __init__(self, rate_by_service):
452-
# type: (Dict[str, float]) -> None
453-
self.rate_by_service = rate_by_service
454-
455-
456453
class AgentWriter(HTTPWriter):
457454
"""
458455
The Datadog Agent supports (at the time of writing this) receiving trace
@@ -468,6 +465,7 @@ class AgentWriter(HTTPWriter):
468465
def __init__(
469466
self,
470467
agent_url, # type: str
468+
sampler=None, # type: Optional[BaseSampler]
471469
priority_sampling=False, # type: bool
472470
processing_interval=None, # type: Optional[float]
473471
# Match the payload size since there is no functionality
@@ -481,7 +479,6 @@ def __init__(
481479
api_version=None, # type: Optional[str]
482480
reuse_connections=None, # type: Optional[bool]
483481
headers=None, # type: Optional[Dict[str, str]]
484-
response_callback=None, # type: Optional[Callable[[AgentResponse], None]]
485482
):
486483
# type: (...) -> None
487484
if processing_interval is None:
@@ -537,10 +534,10 @@ def __init__(
537534
additional_header_str = os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS")
538535
if additional_header_str is not None:
539536
_headers.update(parse_tags_str(additional_header_str))
540-
self._response_cb = response_callback
541537
super(AgentWriter, self).__init__(
542538
intake_url=agent_url,
543539
clients=[client],
540+
sampler=sampler,
544541
processing_interval=processing_interval,
545542
buffer_size=buffer_size,
546543
max_payload_size=max_payload_size,
@@ -555,6 +552,7 @@ def recreate(self):
555552
# type: () -> HTTPWriter
556553
return self.__class__(
557554
agent_url=self.agent_url,
555+
sampler=self._sampler,
558556
processing_interval=self._interval,
559557
buffer_size=self._buffer_size,
560558
max_payload_size=self._max_payload_size,
@@ -592,7 +590,6 @@ def _downgrade(self, payload, response, client):
592590
raise ValueError()
593591

594592
def _send_payload(self, payload, count, client):
595-
# type: (...) -> Response
596593
response = super(AgentWriter, self)._send_payload(payload, count, client)
597594
if response.status in [404, 415]:
598595
log.debug("calling endpoint '%s' but received %s; downgrading API", client.ENDPOINT, response.status)
@@ -608,15 +605,16 @@ def _send_payload(self, payload, count, client):
608605
else:
609606
if payload is not None:
610607
self._send_payload(payload, count, client)
611-
elif response.status < 400:
612-
if self._response_cb:
613-
raw_resp = response.get_json()
614-
if raw_resp and "rate_by_service" in raw_resp:
615-
self._response_cb(
616-
AgentResponse(
617-
rate_by_service=raw_resp["rate_by_service"],
608+
elif response.status < 400 and isinstance(self._sampler, BasePrioritySampler):
609+
result_traces_json = response.get_json()
610+
if result_traces_json and "rate_by_service" in result_traces_json:
611+
try:
612+
if isinstance(self._sampler, BasePrioritySampler):
613+
self._sampler.update_rate_by_service_sample_rates(
614+
result_traces_json["rate_by_service"],
618615
)
619-
)
616+
except ValueError:
617+
log.error("sample_rate is negative, cannot update the rate samplers")
620618
return response
621619

622620
def start(self):

ddtrace/tracer.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@
5252
from .internal.serverless.mini_agent import maybe_start_serverless_mini_agent
5353
from .internal.service import ServiceStatusError
5454
from .internal.utils.http import verify_url
55-
from .internal.writer import AgentResponse
5655
from .internal.writer import AgentWriter
5756
from .internal.writer import LogWriter
5857
from .internal.writer import TraceWriter
5958
from .provider import DefaultContextProvider
60-
from .sampler import BasePrioritySampler
6159
from .sampler import BaseSampler
6260
from .sampler import DatadogSampler
6361
from .sampler import RateSampler
@@ -240,6 +238,7 @@ def __init__(
240238
else:
241239
writer = AgentWriter(
242240
agent_url=self._agent_url,
241+
sampler=self._sampler,
243242
priority_sampling=config._priority_sampling,
244243
dogstatsd=get_dogstatsd_client(self._dogstatsd_url),
245244
sync_mode=self._use_sync_mode(),
@@ -458,12 +457,12 @@ def configure(
458457
elif any(x is not None for x in [new_url, api_version, sampler, dogstatsd_url]):
459458
self._writer = AgentWriter(
460459
self._agent_url,
460+
sampler=self._sampler,
461461
priority_sampling=priority_sampling in (None, True) or config._priority_sampling,
462462
dogstatsd=get_dogstatsd_client(self._dogstatsd_url),
463463
sync_mode=self._use_sync_mode(),
464464
api_version=api_version,
465465
headers={"Datadog-Client-Computed-Stats": "yes"} if compute_stats_enabled else {},
466-
response_callback=self._agent_response_callback,
467466
)
468467
elif writer is None and isinstance(self._writer, LogWriter):
469468
# No need to do anything for the LogWriter.
@@ -511,20 +510,6 @@ def configure(
511510

512511
self._generate_diagnostic_logs()
513512

514-
def _agent_response_callback(self, resp):
515-
# type: (AgentResponse) -> None
516-
"""Handle the response from the agent.
517-
518-
The agent can return updated sample rates for the priority sampler.
519-
"""
520-
try:
521-
if isinstance(self._sampler, BasePrioritySampler):
522-
self._sampler.update_rate_by_service_sample_rates(
523-
resp.rate_by_service,
524-
)
525-
except ValueError:
526-
log.error("sample_rate is negative, cannot update the rate samplers")
527-
528513
def _generate_diagnostic_logs(self):
529514
if config._debug_mode or config._startup_logs_enabled:
530515
try:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
sampling: This fix reverts a refactor which affected how the tracer handled the trace-agent's
5+
recommended trace sampling rates, leading to an unintended increase in traces sampled.

0 commit comments

Comments
 (0)