Skip to content

Commit e113ef6

Browse files
authored
fix(tracing): resolves concurrent dictionary modification error raised by the trace writer (backport #7413 to 2.2) (#7643)
Backports: #7413 ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that.
1 parent f595495 commit e113ef6

File tree

5 files changed

+179
-165
lines changed

5 files changed

+179
-165
lines changed

ddtrace/internal/writer/writer.py

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
from ddtrace import Span
5050

51-
from .agent import ConnectionType
51+
from ..agent import ConnectionType
5252

5353

5454
log = get_logger(__name__)
@@ -178,7 +178,7 @@ def __init__(
178178

179179
self._clients = clients
180180
self.dogstatsd = dogstatsd
181-
self._metrics_reset()
181+
self._metrics = defaultdict(int) # type: Dict[str, int]
182182
self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW)
183183
self._sync_mode = sync_mode
184184
self._conn = None # type: Optional[ConnectionType]
@@ -213,36 +213,22 @@ def _intake_url(self, client=None):
213213
return client._intake_url
214214
return self.intake_url
215215

216-
def _metrics_dist(self, name, count=1, tags=tuple()):
217-
# type: (str, int, Tuple) -> None
218-
if tags in self._metrics[name]:
219-
self._metrics[name][tags] += count
220-
else:
221-
self._metrics[name][tags] = count
222-
223-
def _metrics_reset(self):
224-
# type: () -> None
225-
self._metrics = defaultdict(dict) # type: Dict[str, Dict[Tuple[str,...], int]]
216+
def _metrics_dist(self, name, count=1, tags=None):
217+
# type: (str, int, Optional[List]) -> None
218+
if config.health_metrics_enabled and self.dogstatsd:
219+
self.dogstatsd.distribution("datadog.%s.%s" % (self.STATSD_NAMESPACE, name), count, tags=tags)
226220

227221
def _set_drop_rate(self):
228-
dropped = sum(
229-
counts
230-
for metric in ("encoder.dropped.traces", "buffer.dropped.traces", "http.dropped.traces")
231-
for _tags, counts in self._metrics[metric].items()
232-
)
233-
accepted = sum(counts for _tags, counts in self._metrics["writer.accepted.traces"].items())
234-
235-
if dropped > accepted:
236-
# Sanity check, we cannot drop more traces than we accepted.
237-
log.debug(
238-
"dropped.traces metric is greater than accepted.traces metric"
239-
"This difference may be reconciled in future metric uploads (dropped.traces: %d, accepted.traces: %d)",
240-
dropped,
241-
accepted,
242-
)
243-
accepted = dropped
244-
222+
# type: () -> None
223+
accepted = self._metrics["accepted_traces"]
224+
sent = self._metrics["sent_traces"]
225+
encoded = sum([len(client.encoder) for client in self._clients])
226+
# The number of dropped traces is the number of accepted traces minus the number of traces in the encoder
227+
# This calculation is a best effort. Due to race conditions it may result in a slight underestimate.
228+
dropped = max(accepted - sent - encoded, 0) # dropped spans should never be negative
245229
self._drop_sma.set(dropped, accepted)
230+
self._metrics["sent_traces"] = 0 # reset sent traces for the next interval
231+
self._metrics["accepted_traces"] = encoded # sets accepted traces to number of spans in encoders
246232

247233
def _set_keep_rate(self, trace):
248234
if trace:
@@ -307,9 +293,10 @@ def _send_payload(self, payload, count, client):
307293
response = self._put(payload, headers, client, no_trace=True)
308294

309295
if response.status >= 400:
310-
self._metrics_dist("http.errors", tags=("type:%s" % response.status,))
296+
self._metrics_dist("http.errors", tags=["type:%s" % response.status])
311297
else:
312298
self._metrics_dist("http.sent.bytes", len(payload))
299+
self._metrics["sent_traces"] += count
313300

314301
if response.status not in (404, 415) and response.status >= 400:
315302
msg = "failed to send traces to intake at %s: HTTP error status %s, reason %s"
@@ -353,6 +340,7 @@ def _write_with_client(self, client, spans=None):
353340
pass
354341

355342
self._metrics_dist("writer.accepted.traces")
343+
self._metrics["accepted_traces"] += 1
356344
self._set_keep_rate(spans)
357345

358346
try:
@@ -364,8 +352,8 @@ def _write_with_client(self, client, spans=None):
364352
payload_size,
365353
client.encoder.max_item_size,
366354
)
367-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:t_too_big",))
368-
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=("reason:t_too_big",))
355+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:t_too_big"])
356+
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:t_too_big"])
369357
except BufferFull as e:
370358
payload_size = e.args[0]
371359
log.warning(
@@ -376,10 +364,10 @@ def _write_with_client(self, client, spans=None):
376364
payload_size,
377365
self.status.value,
378366
)
379-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:full",))
380-
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=("reason:full",))
367+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:full"])
368+
self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:full"])
381369
except NoEncodableSpansError:
382-
self._metrics_dist("buffer.dropped.traces", 1, tags=("reason:incompatible",))
370+
self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:incompatible"])
383371
else:
384372
self._metrics_dist("buffer.accepted.traces", 1)
385373
self._metrics_dist("buffer.accepted.spans", len(spans))
@@ -390,7 +378,6 @@ def flush_queue(self, raise_exc=False):
390378
self._flush_queue_with_client(client, raise_exc=raise_exc)
391379
finally:
392380
self._set_drop_rate()
393-
self._metrics_reset()
394381

395382
def _flush_queue_with_client(self, client, raise_exc=False):
396383
# type: (WriterClientBase, bool) -> None
@@ -407,7 +394,7 @@ def _flush_queue_with_client(self, client, raise_exc=False):
407394
try:
408395
self._send_payload_with_backoff(encoded, n_traces, client)
409396
except Exception:
410-
self._metrics_dist("http.errors", tags=("type:err",))
397+
self._metrics_dist("http.errors", tags=["type:err"])
411398
self._metrics_dist("http.dropped.bytes", len(encoded))
412399
self._metrics_dist("http.dropped.traces", n_traces)
413400
if raise_exc:
@@ -420,17 +407,8 @@ def _flush_queue_with_client(self, client, raise_exc=False):
420407
self.RETRY_ATTEMPTS,
421408
)
422409
finally:
423-
if config.health_metrics_enabled and self.dogstatsd:
424-
namespace = self.STATSD_NAMESPACE
425-
# Note that we cannot use the batching functionality of dogstatsd because
426-
# it's not thread-safe.
427-
# https://github.com/DataDog/datadogpy/issues/439
428-
# This really isn't ideal as now we're going to do a ton of socket calls.
429-
self.dogstatsd.distribution("datadog.%s.http.sent.bytes" % namespace, len(encoded))
430-
self.dogstatsd.distribution("datadog.%s.http.sent.traces" % namespace, n_traces)
431-
for name, metric_tags in self._metrics.items():
432-
for tags, count in metric_tags.items():
433-
self.dogstatsd.distribution("datadog.%s.%s" % (namespace, name), count, tags=list(tags))
410+
self._metrics_dist("http.sent.bytes", len(encoded))
411+
self._metrics_dist("http.sent.traces", n_traces)
434412

435413
def periodic(self):
436414
self.flush_queue(raise_exc=False)

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ services:
143143
- DD_POOL_TRACE_CHECK_FAILURES=true
144144
- DD_DISABLE_ERROR_RESPONSES=true
145145
- ENABLED_CHECKS=trace_content_length,trace_stall,meta_tracer_version_header,trace_count_header,trace_peer_service,trace_dd_service
146+
- SNAPSHOT_IGNORED_ATTRS=span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,meta.runtime-id,meta._dd.p.tid,meta.pathway.hash,metrics._dd.tracer_kr
146147
vertica:
147148
image: sumitchawla/vertica
148149
environment:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: Fixes an issue where the thread responsible for sending traces is killed due to concurrent dictionary modification.

tests/integration/test_integration.py

Lines changed: 68 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -237,79 +237,95 @@ def test_child_spans_do_not_cause_warning_logs():
237237
log.error.assert_not_called()
238238

239239

240-
def _test_metrics(
241-
tracer,
242-
http_sent_traces=-1,
243-
writer_accepted_traces=-1,
244-
buffer_accepted_traces=-1,
245-
buffer_accepted_spans=-1,
246-
http_requests=-1,
247-
http_sent_bytes=-1,
248-
):
240+
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
241+
def test_metrics():
242+
import mock
243+
244+
from ddtrace import tracer as t
245+
from tests.utils import AnyInt
246+
from tests.utils import override_global_config
247+
248+
assert t._partial_flush_min_spans == 300
249+
249250
with override_global_config(dict(health_metrics_enabled=True)):
250251
statsd_mock = mock.Mock()
251-
tracer._writer.dogstatsd = statsd_mock
252+
t._writer.dogstatsd = statsd_mock
252253
with mock.patch("ddtrace.internal.writer.writer.log") as log:
253-
for _ in range(5):
254+
for _ in range(2):
254255
spans = []
255-
for _ in range(3000):
256-
spans.append(tracer.trace("op"))
256+
for _ in range(600):
257+
spans.append(t.trace("op"))
257258
for s in spans:
258259
s.finish()
259260

260-
tracer.shutdown()
261+
t.shutdown()
261262
log.warning.assert_not_called()
262263
log.error.assert_not_called()
263264

264-
for metric_name, metric_value, check_tags in (
265-
("datadog.tracer.http.sent.traces", http_sent_traces, False),
266-
("datadog.tracer.writer.accepted.traces", writer_accepted_traces, True),
267-
("datadog.tracer.buffer.accepted.traces", buffer_accepted_traces, True),
268-
("datadog.tracer.buffer.accepted.spans", buffer_accepted_spans, True),
269-
("datadog.tracer.http.requests", http_requests, True),
270-
("datadog.tracer.http.sent.bytes", http_sent_bytes, True),
271-
):
272-
if metric_value != -1:
273-
kwargs = {"tags": []} if check_tags else {}
274-
statsd_mock.distribution.assert_has_calls(
275-
[mock.call(metric_name, metric_value, **kwargs)], any_order=True
276-
)
277-
278-
279-
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
280-
def test_metrics():
281-
from ddtrace import tracer as t
282-
from tests.integration.test_integration import _test_metrics
283-
from tests.utils import AnyInt
284-
285-
assert t._partial_flush_min_spans == 300
286-
_test_metrics(
287-
t,
288-
http_sent_bytes=AnyInt(),
289-
http_sent_traces=50,
290-
writer_accepted_traces=50,
291-
buffer_accepted_traces=50,
292-
buffer_accepted_spans=15000,
293-
http_requests=1,
265+
statsd_mock.distribution.assert_has_calls(
266+
[
267+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
268+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
269+
mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None),
270+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
271+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
272+
mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None),
273+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
274+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
275+
mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None),
276+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
277+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
278+
mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None),
279+
mock.call("datadog.tracer.http.requests", 1, tags=None),
280+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
281+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
282+
mock.call("datadog.tracer.http.sent.traces", 4, tags=None),
283+
],
284+
any_order=True,
294285
)
295286

296287

297-
@skip_if_testagent
298288
@parametrize_with_all_encodings(env={"DD_TRACE_HEALTH_METRICS_ENABLED": "true"})
299289
def test_metrics_partial_flush_disabled():
290+
import mock
291+
300292
from ddtrace import tracer as t
301-
from tests.integration.test_integration import _test_metrics
302293
from tests.utils import AnyInt
294+
from tests.utils import override_global_config
303295

304296
t.configure(
305297
partial_flush_enabled=False,
306298
)
307-
_test_metrics(
308-
t,
309-
http_sent_bytes=AnyInt(),
310-
buffer_accepted_traces=5,
311-
buffer_accepted_spans=15000,
312-
http_requests=1,
299+
300+
with override_global_config(dict(health_metrics_enabled=True)):
301+
statsd_mock = mock.Mock()
302+
t._writer.dogstatsd = statsd_mock
303+
with mock.patch("ddtrace.internal.writer.writer.log") as log:
304+
for _ in range(2):
305+
spans = []
306+
for _ in range(600):
307+
spans.append(t.trace("op"))
308+
for s in spans:
309+
s.finish()
310+
311+
t.shutdown()
312+
log.warning.assert_not_called()
313+
log.error.assert_not_called()
314+
315+
statsd_mock.distribution.assert_has_calls(
316+
[
317+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
318+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
319+
mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None),
320+
mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None),
321+
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None),
322+
mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None),
323+
mock.call("datadog.tracer.http.requests", 1, tags=None),
324+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
325+
mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None),
326+
mock.call("datadog.tracer.http.sent.traces", 2, tags=None),
327+
],
328+
any_order=True,
313329
)
314330

315331

0 commit comments

Comments
 (0)