Skip to content

Commit dfa2b5b

Browse files
authored
fix(tracing): resolve concurrency bug in health metrics (backport #7413 to 1.20) (#7640)
Backports: #7413 ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [ ] This PR doesn't touch any of that.
1 parent 77c6566 commit dfa2b5b

File tree

5 files changed

+173
-168
lines changed

5 files changed

+173
-168
lines changed

ddtrace/internal/writer/writer.py

Lines changed: 26 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,9 @@
4444

4545

4646
if TYPE_CHECKING: # pragma: no cover
47-
from typing import Tuple
48-
4947
from ddtrace import Span
5048

51-
from .agent import ConnectionType
49+
from ..agent import ConnectionType
5250

5351

5452
log = get_logger(__name__)
@@ -178,7 +176,7 @@ def __init__(
178176

179177
self._clients = clients
180178
self.dogstatsd = dogstatsd
181-
self._metrics_reset()
179+
self._metrics = defaultdict(int) # type: Dict[str, int]
182180
self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW)
183181
self._sync_mode = sync_mode
184182
self._conn = None # type: Optional[ConnectionType]
@@ -213,36 +211,22 @@ def _intake_url(self, client=None):
213211
return client._intake_url
214212
return self.intake_url
215213

216-
def _metrics_dist(self, name, count=1, tags=tuple()):
217-
# type: (str, int, Tuple) -> None
218-
if tags in self._metrics[name]:
219-
self._metrics[name][tags] += count
220-
else:
221-
self._metrics[name][tags] = count
222-
223-
def _metrics_reset(self):
224-
# type: () -> None
225-
self._metrics = defaultdict(dict) # type: Dict[str, Dict[Tuple[str,...], int]]
214+
def _metrics_dist(self, name, count=1, tags=None):
215+
# type: (str, int, Optional[List]) -> None
216+
if config.health_metrics_enabled and self.dogstatsd:
217+
self.dogstatsd.distribution("datadog.%s.%s" % (self.STATSD_NAMESPACE, name), count, tags=tags)
226218

227219
def _set_drop_rate(self):
228-
dropped = sum(
229-
counts
230-
for metric in ("encoder.dropped.traces", "buffer.dropped.traces", "http.dropped.traces")
231-
for _tags, counts in self._metrics[metric].items()
232-
)
233-
accepted = sum(counts for _tags, counts in self._metrics["writer.accepted.traces"].items())
234-
235-
if dropped > accepted:
236-
# Sanity check, we cannot drop more traces than we accepted.
237-
log.debug(
238-
"dropped.traces metric is greater than accepted.traces metric"
239-
"This difference may be reconciled in future metric uploads (dropped.traces: %d, accepted.traces: %d)",
240-
dropped,
241-
accepted,
242-
)
243-
accepted = dropped
244-
220+
# type: () -> None
221+
accepted = self._metrics["accepted_traces"]
222+
sent = self._metrics["sent_traces"]
223+
encoded = sum([len(client.encoder) for client in self._clients])
224+
# The number of dropped traces is the number of accepted traces minus the number of traces in the encoder
225+
# This calculation is a best effort. Due to race conditions it may result in a slight underestimate.
226+
dropped = max(accepted - sent - encoded, 0) # dropped spans should never be negative
245227
self._drop_sma.set(dropped, accepted)
228+
self._metrics["sent_traces"] = 0 # reset sent traces for the next interval
229+
self._metrics["accepted_traces"] = encoded # sets accepted traces to number of spans in encoders
246230

247231
def _set_keep_rate(self, trace):
248232
if trace:
@@ -307,9 +291,10 @@ def _send_payload(self, payload, count, client):
307291
response = self._put(payload, headers, client, no_trace=True)
308292

309293
if response.status >= 400:
310-
self._metrics_dist("http.errors", tags=("type:%s" % response.status,))
294+
self._metrics_dist("http.errors", tags=["type:%s" % response.status])
311295
else:
312296
self._metrics_dist("http.sent.bytes", len(payload))
297+
self._metrics["sent_traces"] += count
313298

314299
if response.status not in (404, 415) and response.status >= 400:
315300
msg = "failed to send traces to intake at %s: HTTP error status %s, reason %s"
@@ -353,6 +338,7 @@ def _write_with_client(self, client, spans=None):
353338
pass
354339

355340
self._metrics_dist("writer.accepted.traces")
341+
self._metrics["accepted_traces"] += 1
356342
self._set_keep_rate(spans)
357343

358344
try:
@@ -364,8 +350,8 @@ def _write_with_client(self, client, spans=None):
364350
payload_size,
365351
client.encoder.max_item_size,
366352
)
367-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:t_too_big",))
368-
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=("reason:t_too_big",))
353+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:t_too_big"])
354+
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:t_too_big"])
369355
except BufferFull as e:
370356
payload_size = e.args[0]
371357
log.warning(
@@ -376,10 +362,10 @@ def _write_with_client(self, client, spans=None):
376362
payload_size,
377363
self.status.value,
378364
)
379-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:full",))
380-
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=("reason:full",))
365+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:full"])
366+
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:full"])
381367
except NoEncodableSpansError:
382-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:incompatible",))
368+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:incompatible"])
383369
else:
384370
self._metrics_dist("buffer.accepted.traces", 1)
385371
self._metrics_dist("buffer.accepted.spans", len(spans))
@@ -390,7 +376,6 @@ def flush_queue(self, raise_exc=False):
390376
self._flush_queue_with_client(client, raise_exc=raise_exc)
391377
finally:
392378
self._set_drop_rate()
393-
self._metrics_reset()
394379

395380
def _flush_queue_with_client(self, client, raise_exc=False):
396381
# type: (WriterClientBase, bool) -> None
@@ -411,7 +396,7 @@ def _flush_queue_with_client(self, client, raise_exc=False):
411396
try:
412397
self._send_payload_with_backoff(encoded, n_traces, client)
413398
except Exception:
414-
self._metrics_dist("http.errors", tags=("type:err",))
399+
self._metrics_dist("http.errors", tags=["type:err"])
415400
self._metrics_dist("http.dropped.bytes", len(encoded))
416401
self._metrics_dist("http.dropped.traces", n_traces)
417402
if raise_exc:
@@ -424,17 +409,8 @@ def _flush_queue_with_client(self, client, raise_exc=False):
424409
self.RETRY_ATTEMPTS,
425410
)
426411
finally:
427-
if config.health_metrics_enabled and self.dogstatsd:
428-
namespace = self.STATSD_NAMESPACE
429-
# Note that we cannot use the batching functionality of dogstatsd because
430-
# it's not thread-safe.
431-
# https://github.com/DataDog/datadogpy/issues/439
432-
# This really isn't ideal as now we're going to do a ton of socket calls.
433-
self.dogstatsd.distribution("datadog.%s.http.sent.bytes" % namespace, len(encoded))
434-
self.dogstatsd.distribution("datadog.%s.http.sent.traces" % namespace, n_traces)
435-
for name, metric_tags in self._metrics.items():
436-
for tags, count in metric_tags.items():
437-
self.dogstatsd.distribution("datadog.%s.%s" % (namespace, name), count, tags=list(tags))
412+
self._metrics_dist("http.sent.bytes", len(encoded))
413+
self._metrics_dist("http.sent.traces", n_traces)
438414

439415
def periodic(self):
440416
self.flush_queue(raise_exc=False)

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ services:
141141
- DD_POOL_TRACE_CHECK_FAILURES=true
142142
- DD_DISABLE_ERROR_RESPONSES=true
143143
- ENABLED_CHECKS=trace_content_length,trace_stall,meta_tracer_version_header,trace_count_header,trace_peer_service,trace_dd_service
144+
- SNAPSHOT_IGNORED_ATTRS=span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,meta.runtime-id,meta._dd.p.tid,meta.pathway.hash,metrics._dd.tracer_kr
144145
vertica:
145146
image: sumitchawla/vertica
146147
environment:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: Fixes an issue where the thread responsible for sending traces is killed due to concurrent dictionary modification.

tests/integration/test_integration.py

Lines changed: 62 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from tests.integration.utils import send_invalid_payload_and_get_logs
1919
from tests.integration.utils import skip_if_testagent
2020
from tests.utils import call_program
21-
from tests.utils import override_global_config
2221

2322

2423
FOUR_KB = 1 << 12
@@ -235,79 +234,89 @@ def test_child_spans_do_not_cause_warning_logs():
235234
log.error.assert_not_called()
236235

237236

238-
def _test_metrics(
239-
tracer,
240-
http_sent_traces=-1,
241-
writer_accepted_traces=-1,
242-
buffer_accepted_traces=-1,
243-
buffer_accepted_spans=-1,
244-
http_requests=-1,
245-
http_sent_bytes=-1,
246-
):
237+
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
238+
def test_metrics():
239+
import mock
240+
241+
from ddtrace import tracer as t
242+
from tests.utils import AnyInt
243+
from tests.utils import override_global_config
244+
245+
assert t._partial_flush_min_spans == 500
246+
247247
with override_global_config(dict(health_metrics_enabled=True)):
248248
statsd_mock = mock.Mock()
249-
tracer._writer.dogstatsd = statsd_mock
249+
t._writer.dogstatsd = statsd_mock
250250
with mock.patch("ddtrace.internal.writer.writer.log") as log:
251-
for _ in range(5):
251+
for _ in range(2):
252252
spans = []
253-
for _ in range(3000):
254-
spans.append(tracer.trace("op"))
253+
for _ in range(600):
254+
spans.append(t.trace("op"))
255255
for s in spans:
256256
s.finish()
257257

258-
tracer.shutdown()
258+
t.shutdown()
259259
log.warning.assert_not_called()
260260
log.error.assert_not_called()
261261

262-
for metric_name, metric_value, check_tags in (
263-
("datadog.tracer.http.sent.traces", http_sent_traces, False),
264-
("datadog.tracer.writer.accepted.traces", writer_accepted_traces, True),
265-
("datadog.tracer.buffer.accepted.traces", buffer_accepted_traces, True),
266-
("datadog.tracer.buffer.accepted.spans", buffer_accepted_spans, True),
267-
("datadog.tracer.http.requests", http_requests, True),
268-
("datadog.tracer.http.sent.bytes", http_sent_bytes, True),
269-
):
270-
if metric_value != -1:
271-
kwargs = {"tags": []} if check_tags else {}
272-
statsd_mock.distribution.assert_has_calls(
273-
[mock.call(metric_name, metric_value, **kwargs)], any_order=True
274-
)
275-
276-
277-
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
278-
def test_metrics():
279-
from ddtrace import tracer as t
280-
from tests.integration.test_integration import _test_metrics
281-
from tests.utils import AnyInt
282-
283-
assert t._partial_flush_min_spans == 500
284-
_test_metrics(
285-
t,
286-
http_sent_bytes=AnyInt(),
287-
http_sent_traces=30,
288-
writer_accepted_traces=30,
289-
buffer_accepted_traces=30,
290-
buffer_accepted_spans=15000,
291-
http_requests=1,
262+
statsd_mock.distribution.assert_has_calls(
263+
[
264+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
265+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
266+
mock.call("datadog.tracer.buffer.accepted.spans", 500, tags=None),
267+
mock.call("datadog.tracer.buffer.accepted.spans", 100, tags=None),
268+
mock.call("datadog.tracer.buffer.accepted.spans", 500, tags=None),
269+
mock.call("datadog.tracer.buffer.accepted.spans", 100, tags=None),
270+
mock.call("datadog.tracer.http.requests", 1, tags=None),
271+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
272+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
273+
mock.call("datadog.tracer.http.sent.traces", 4, tags=None),
274+
],
275+
any_order=True,
292276
)
293277

294278

295-
@skip_if_testagent
296279
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
297280
def test_metrics_partial_flush_disabled():
281+
import mock
282+
298283
from ddtrace import tracer as t
299-
from tests.integration.test_integration import _test_metrics
300284
from tests.utils import AnyInt
285+
from tests.utils import override_global_config
301286

302287
t.configure(
303288
partial_flush_enabled=False,
304289
)
305-
_test_metrics(
306-
t,
307-
http_sent_bytes=AnyInt(),
308-
buffer_accepted_traces=5,
309-
buffer_accepted_spans=15000,
310-
http_requests=1,
290+
291+
with override_global_config(dict(health_metrics_enabled=True)):
292+
statsd_mock = mock.Mock()
293+
t._writer.dogstatsd = statsd_mock
294+
with mock.patch("ddtrace.internal.writer.writer.log") as log:
295+
for _ in range(2):
296+
spans = []
297+
for _ in range(600):
298+
spans.append(t.trace("op"))
299+
for s in spans:
300+
s.finish()
301+
302+
t.shutdown()
303+
log.warning.assert_not_called()
304+
log.error.assert_not_called()
305+
306+
statsd_mock.distribution.assert_has_calls(
307+
[
308+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
309+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
310+
mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None),
311+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
312+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
313+
mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None),
314+
mock.call("datadog.tracer.http.requests", 1, tags=None),
315+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
316+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
317+
mock.call("datadog.tracer.http.sent.traces", 2, tags=None),
318+
],
319+
any_order=True,
311320
)
312321

313322

0 commit comments

Comments
 (0)