Skip to content

Commit 62813fb

Browse files
fix(sampling): revert "refactor(writer): handle agent responses in the tracer (#6178)" [backport 1.20] (#7336)
Backport 31ef044 from #7333 to 1.20. This reverts commit ec3e14d. Which caused the tracer to ignore trace-agent sampling rates, often times leading to much higher ingestion rates. I tested this manually by looking at `datadog.trace_agent.receiver.traces_priority` between <1.19 and >=1.19. <1.19: some P1 and a lot of P0 >=1.19: a lot more P1 (and a little more P-1) and no P0 Basically before 1.19 there were a ton of traces getting dropped with P0 priority due to the trace-agent recommended rates. After 1.19 we ignored the trace-agent recommended rates, leading to no P0 traces, and far more being sampled overall. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. Co-authored-by: Zachary Groves <[email protected]>
1 parent 0a76eaf commit 62813fb

File tree

5 files changed

+34
-42
lines changed

5 files changed

+34
-42
lines changed

ddtrace/internal/ci_visibility/writer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
from ddtrace.vendor.dogstatsd import DogStatsd
3131

32+
from ...sampler import BaseSampler
33+
3234

3335
class CIVisibilityEventClient(WriterClientBase):
3436
def __init__(self):
@@ -79,6 +81,7 @@ class CIVisibilityWriter(HTTPWriter):
7981
def __init__(
8082
self,
8183
intake_url="", # type: str
84+
sampler=None, # type: Optional[BaseSampler]
8285
processing_interval=None, # type: Optional[float]
8386
timeout=None, # type: Optional[float]
8487
dogstatsd=None, # type: Optional[DogStatsd]
@@ -126,6 +129,7 @@ def __init__(
126129
super(CIVisibilityWriter, self).__init__(
127130
intake_url=intake_url,
128131
clients=clients,
132+
sampler=sampler,
129133
processing_interval=processing_interval,
130134
timeout=timeout,
131135
dogstatsd=dogstatsd,
@@ -142,6 +146,7 @@ def recreate(self):
142146
# type: () -> HTTPWriter
143147
return self.__class__(
144148
intake_url=self.intake_url,
149+
sampler=self._sampler,
145150
processing_interval=self._interval,
146151
timeout=self._timeout,
147152
dogstatsd=self.dogstatsd,

ddtrace/internal/writer/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from .writer import AgentResponse # noqa
21
from .writer import AgentWriter # noqa
32
from .writer import DEFAULT_SMA_WINDOW # noqa
43
from .writer import HTTPWriter # noqa

ddtrace/internal/writer/writer.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import os
66
import sys
77
import threading
8-
from typing import Any
9-
from typing import Callable
108
from typing import Dict
119
from typing import List
1210
from typing import Optional
@@ -28,6 +26,8 @@
2826
from ...internal.utils.formats import parse_tags_str
2927
from ...internal.utils.http import Response
3028
from ...internal.utils.time import StopWatch
29+
from ...sampler import BasePrioritySampler
30+
from ...sampler import BaseSampler
3131
from .._encoding import BufferFull
3232
from .._encoding import BufferItemTooLarge
3333
from .._encoding import EncodingValidationError
@@ -104,8 +104,10 @@ class LogWriter(TraceWriter):
104104
def __init__(
105105
self,
106106
out=sys.stdout, # type: TextIO
107+
sampler=None, # type: Optional[BaseSampler]
107108
):
108109
# type: (...) -> None
110+
self._sampler = sampler
109111
self.encoder = JSONEncoderV2()
110112
self.out = out
111113

@@ -116,7 +118,7 @@ def recreate(self):
116118
:rtype: :class:`LogWriter`
117119
:returns: A new :class:`LogWriter` instance
118120
"""
119-
writer = self.__class__(out=self.out)
121+
writer = self.__class__(out=self.out, sampler=self._sampler)
120122
return writer
121123

122124
def stop(self, timeout=None):
@@ -148,6 +150,7 @@ def __init__(
148150
self,
149151
intake_url, # type: str
150152
clients, # type: List[WriterClientBase]
153+
sampler=None, # type: Optional[BaseSampler]
151154
processing_interval=None, # type: Optional[float]
152155
# Match the payload size since there is no functionality
153156
# to flush dynamically.
@@ -169,6 +172,7 @@ def __init__(
169172
self.intake_url = intake_url
170173
self._buffer_size = buffer_size
171174
self._max_payload_size = max_payload_size
175+
self._sampler = sampler
172176
self._headers = headers or {}
173177
self._timeout = timeout
174178

@@ -296,7 +300,6 @@ def _get_finalized_headers(self, count, client):
296300
return headers
297301

298302
def _send_payload(self, payload, count, client):
299-
# type: (...) -> Response
300303
headers = self._get_finalized_headers(count, client)
301304

302305
self._metrics_dist("http.requests")
@@ -314,15 +317,15 @@ def _send_payload(self, payload, count, client):
314317
self._intake_endpoint(client),
315318
response.status,
316319
response.reason,
317-
) # type: Tuple[Any, Any, Any]
320+
)
318321
# Append the payload if requested
319322
if config._trace_writer_log_err_payload:
320323
msg += ", payload %s"
321324
# If the payload is bytes then hex encode the value before logging
322325
if isinstance(payload, six.binary_type):
323-
log_args += (binascii.hexlify(payload).decode(),) # type: ignore
326+
log_args += (binascii.hexlify(payload).decode(),)
324327
else:
325-
log_args += (payload,) # type: ignore
328+
log_args += (payload,)
326329

327330
log.error(msg, *log_args)
328331
self._metrics_dist("http.dropped.bytes", len(payload))
@@ -453,12 +456,6 @@ def on_shutdown(self):
453456
self._reset_connection()
454457

455458

456-
class AgentResponse(object):
457-
def __init__(self, rate_by_service):
458-
# type: (Dict[str, float]) -> None
459-
self.rate_by_service = rate_by_service
460-
461-
462459
class AgentWriter(HTTPWriter):
463460
"""
464461
The Datadog Agent supports (at the time of writing this) receiving trace
@@ -474,6 +471,7 @@ class AgentWriter(HTTPWriter):
474471
def __init__(
475472
self,
476473
agent_url, # type: str
474+
sampler=None, # type: Optional[BaseSampler]
477475
priority_sampling=False, # type: bool
478476
processing_interval=None, # type: Optional[float]
479477
# Match the payload size since there is no functionality
@@ -487,7 +485,6 @@ def __init__(
487485
api_version=None, # type: Optional[str]
488486
reuse_connections=None, # type: Optional[bool]
489487
headers=None, # type: Optional[Dict[str, str]]
490-
response_callback=None, # type: Optional[Callable[[AgentResponse], None]]
491488
):
492489
# type: (...) -> None
493490
if processing_interval is None:
@@ -543,10 +540,10 @@ def __init__(
543540
additional_header_str = os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS")
544541
if additional_header_str is not None:
545542
_headers.update(parse_tags_str(additional_header_str))
546-
self._response_cb = response_callback
547543
super(AgentWriter, self).__init__(
548544
intake_url=agent_url,
549545
clients=[client],
546+
sampler=sampler,
550547
processing_interval=processing_interval,
551548
buffer_size=buffer_size,
552549
max_payload_size=max_payload_size,
@@ -561,6 +558,7 @@ def recreate(self):
561558
# type: () -> HTTPWriter
562559
return self.__class__(
563560
agent_url=self.agent_url,
561+
sampler=self._sampler,
564562
processing_interval=self._interval,
565563
buffer_size=self._buffer_size,
566564
max_payload_size=self._max_payload_size,
@@ -598,7 +596,6 @@ def _downgrade(self, payload, response, client):
598596
raise ValueError()
599597

600598
def _send_payload(self, payload, count, client):
601-
# type: (...) -> Response
602599
response = super(AgentWriter, self)._send_payload(payload, count, client)
603600
if response.status in [404, 415]:
604601
log.debug("calling endpoint '%s' but received %s; downgrading API", client.ENDPOINT, response.status)
@@ -614,15 +611,16 @@ def _send_payload(self, payload, count, client):
614611
else:
615612
if payload is not None:
616613
self._send_payload(payload, count, client)
617-
elif response.status < 400:
618-
if self._response_cb:
619-
raw_resp = response.get_json()
620-
if raw_resp and "rate_by_service" in raw_resp:
621-
self._response_cb(
622-
AgentResponse(
623-
rate_by_service=raw_resp["rate_by_service"],
614+
elif response.status < 400 and isinstance(self._sampler, BasePrioritySampler):
615+
result_traces_json = response.get_json()
616+
if result_traces_json and "rate_by_service" in result_traces_json:
617+
try:
618+
if isinstance(self._sampler, BasePrioritySampler):
619+
self._sampler.update_rate_by_service_sample_rates(
620+
result_traces_json["rate_by_service"],
624621
)
625-
)
622+
except ValueError:
623+
log.error("sample_rate is negative, cannot update the rate samplers")
626624
return response
627625

628626
def start(self):

ddtrace/tracer.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,10 @@
5555
from .internal.serverless.mini_agent import maybe_start_serverless_mini_agent
5656
from .internal.service import ServiceStatusError
5757
from .internal.utils.http import verify_url
58-
from .internal.writer import AgentResponse
5958
from .internal.writer import AgentWriter
6059
from .internal.writer import LogWriter
6160
from .internal.writer import TraceWriter
6261
from .provider import DefaultContextProvider
63-
from .sampler import BasePrioritySampler
6462
from .sampler import BaseSampler
6563
from .sampler import DatadogSampler
6664
from .sampler import RateSampler
@@ -261,6 +259,7 @@ def __init__(
261259
else:
262260
writer = AgentWriter(
263261
agent_url=self._agent_url,
262+
sampler=self._sampler,
264263
priority_sampling=config._priority_sampling,
265264
dogstatsd=get_dogstatsd_client(self._dogstatsd_url),
266265
sync_mode=self._use_sync_mode(),
@@ -479,12 +478,12 @@ def configure(
479478
elif any(x is not None for x in [new_url, api_version, sampler, dogstatsd_url]):
480479
self._writer = AgentWriter(
481480
self._agent_url,
481+
sampler=self._sampler,
482482
priority_sampling=priority_sampling in (None, True) or config._priority_sampling,
483483
dogstatsd=get_dogstatsd_client(self._dogstatsd_url),
484484
sync_mode=self._use_sync_mode(),
485485
api_version=api_version,
486486
headers={"Datadog-Client-Computed-Stats": "yes"} if compute_stats_enabled else {},
487-
response_callback=self._agent_response_callback,
488487
)
489488
elif writer is None and isinstance(self._writer, LogWriter):
490489
# No need to do anything for the LogWriter.
@@ -532,20 +531,6 @@ def configure(
532531

533532
self._generate_diagnostic_logs()
534533

535-
def _agent_response_callback(self, resp):
536-
# type: (AgentResponse) -> None
537-
"""Handle the response from the agent.
538-
539-
The agent can return updated sample rates for the priority sampler.
540-
"""
541-
try:
542-
if isinstance(self._sampler, BasePrioritySampler):
543-
self._sampler.update_rate_by_service_sample_rates(
544-
resp.rate_by_service,
545-
)
546-
except ValueError:
547-
log.error("sample_rate is negative, cannot update the rate samplers")
548-
549534
def _generate_diagnostic_logs(self):
550535
if config._debug_mode or config._startup_logs_enabled:
551536
try:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
sampling: This fix reverts a refactor which affected how the tracer handled the trace-agent's
5+
recommended trace sampling rates, leading to an unintended increase in traces sampled.

0 commit comments

Comments
 (0)