Skip to content

Commit d0a6f51

Browse files
authored
Fix runtime workers not flushing to Dogstatsd (#939)
* Force update to dogstatsd constant tags * Less confusing log message * Move service update to after runtime worker * Fix run_periodic for runtime worker * Fix test for runtime metrics worker * Force float division
1 parent 43f8b39 commit d0a6f51

File tree

3 files changed

+48
-26
lines changed

3 files changed

+48
-26
lines changed

ddtrace/internal/runtime/runtime_metrics.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class RuntimeWorker(_worker.PeriodicWorkerThread):
6060

6161
FLUSH_INTERVAL = 10
6262

63-
def __init__(self, statsd_client, flush_interval=FLUSH_INTERVAL):
63+
def __init__(self, statsd_client, flush_interval=None):
64+
flush_interval = self.FLUSH_INTERVAL if flush_interval is None else flush_interval
6465
super(RuntimeWorker, self).__init__(interval=flush_interval,
6566
name=self.__class__.__name__)
6667
self._statsd_client = statsd_client
@@ -78,7 +79,7 @@ def flush(self):
7879
for key, value in self._runtime_metrics:
7980
self._write_metric(key, value)
8081

81-
on_periodic = flush
82+
run_periodic = flush
8283
on_shutdown = flush
8384

8485
def reset(self):

ddtrace/tracer.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ def __init__(self):
6969
self._runtime_id = generate_runtime_id()
7070
self._runtime_worker = None
7171
self._dogstatsd_client = None
72+
self._dogstatsd_host = self.DEFAULT_HOSTNAME
73+
self._dogstatsd_port = self.DEFAULT_DOGSTATSD_PORT
7274

7375
def get_call_context(self, *args, **kwargs):
7476
"""
@@ -154,12 +156,11 @@ def configure(self, enabled=None, hostname=None, port=None, dogstatsd_host=None,
154156
self._wrap_executor = wrap_executor
155157

156158
if collect_metrics and self._runtime_worker is None:
159+
self._dogstatsd_host = dogstatsd_host or self._dogstatsd_host
160+
self._dogstatsd_port = dogstatsd_port or self._dogstatsd_port
157161
# start dogstatsd client if not already running
158162
if not self._dogstatsd_client:
159-
self._start_dogstatsd_client(
160-
dogstatsd_host or self.DEFAULT_HOSTNAME,
161-
dogstatsd_port or self.DEFAULT_DOGSTATSD_PORT,
162-
)
163+
self._start_dogstatsd_client()
163164

164165
self._start_runtime_worker()
165166

@@ -271,18 +272,18 @@ def start_span(self, name, child_of=None, service=None, resource=None, span_type
271272
# add it to the current context
272273
context.add_span(span)
273274

275+
# check for new process if runtime metrics worker has already been started
276+
if self._runtime_worker:
277+
self._check_new_process()
278+
274279
# update set of services handled by tracer
275-
if service:
280+
if service and service not in self._services:
276281
self._services.add(service)
277282

278283
# The constant tags for the dogstatsd client needs to updated with any new
279284
# service(s) that may have been added.
280285
self._update_dogstatsd_constant_tags()
281286

282-
# check for new process if runtime metrics worker has already been started
283-
if self._runtime_worker:
284-
self._check_new_process()
285-
286287
return span
287288

288289
def _update_dogstatsd_constant_tags(self):
@@ -299,12 +300,15 @@ def _update_dogstatsd_constant_tags(self):
299300
log.debug('Updating constant tags {}'.format(tags))
300301
self._dogstatsd_client.constant_tags = tags
301302

302-
def _start_dogstatsd_client(self, host, port):
303+
def _start_dogstatsd_client(self):
303304
# start dogstatsd as client with constant tags
304-
log.debug('Starting DogStatsd on {}:{}'.format(host, port))
305+
log.debug('Connecting to DogStatsd on {}:{}'.format(
306+
self._dogstatsd_host,
307+
self._dogstatsd_port
308+
))
305309
self._dogstatsd_client = DogStatsd(
306-
host=host,
307-
port=port,
310+
host=self._dogstatsd_host,
311+
port=self._dogstatsd_port,
308312
)
309313

310314
def _start_runtime_worker(self):
@@ -330,6 +334,10 @@ def _check_new_process(self):
330334

331335
self._start_runtime_worker()
332336

337+
# force an immediate update constant tags since we have reset services
338+
# and generated a new runtime id
339+
self._update_dogstatsd_constant_tags()
340+
333341
def trace(self, name, service=None, resource=None, span_type=None):
334342
"""
335343
Return a span that will trace an operation called `name`. The context that created

tests/internal/runtime/test_runtime_metrics.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
from ddtrace.internal.runtime.runtime_metrics import (
24
RuntimeTags,
35
RuntimeMetrics,
@@ -43,21 +45,30 @@ def test_one_metric(self):
4345

4446

4547
class TestRuntimeWorker(BaseTracerTestCase):
46-
def test_worker_metrics(self):
47-
self.tracer.configure(collect_metrics=True)
48+
def test_tracer_metrics(self):
49+
# mock dogstatsd client before configuring tracer for runtime metrics
50+
self.tracer._dogstatsd_client = DogStatsd()
51+
self.tracer._dogstatsd_client.socket = FakeSocket()
52+
53+
default_flush_interval = RuntimeWorker.FLUSH_INTERVAL
54+
try:
55+
# lower flush interval
56+
RuntimeWorker.FLUSH_INTERVAL = 1./4
57+
58+
# configure tracer for runtime metrics
59+
self.tracer.configure(collect_metrics=True)
60+
finally:
61+
# reset flush interval
62+
RuntimeWorker.FLUSH_INTERVAL = default_flush_interval
4863

4964
with self.override_global_tracer(self.tracer):
50-
self.tracer._dogstatsd_client = DogStatsd()
51-
self.tracer._dogstatsd_client.socket = FakeSocket()
52-
5365
root = self.start_span('parent', service='parent')
5466
context = root.context
5567
self.start_span('child', service='child', child_of=context)
5668

57-
self.worker = RuntimeWorker(self.tracer._dogstatsd_client, 0)
58-
self.worker.start()
59-
self.worker.stop()
60-
self.worker.join()
69+
time.sleep(self.tracer._runtime_worker.interval * 2)
70+
self.tracer._runtime_worker.stop()
71+
self.tracer._runtime_worker.join()
6172

6273
# get all received metrics
6374
received = []
@@ -69,7 +80,8 @@ def test_worker_metrics(self):
6980
received.append(new)
7081

7182
# expect received all default metrics
72-
self.assertEqual(len(received), len(DEFAULT_RUNTIME_METRICS))
83+
# we expect more than one flush since it is also called on shutdown
84+
assert len(received) / len(DEFAULT_RUNTIME_METRICS) > 1
7385

7486
# expect all metrics in default set are received
7587
# DEV: dogstatsd gauges in form "{metric_name}:{metric_value}|g#t{tag_name}:{tag_value},..."
@@ -78,7 +90,8 @@ def test_worker_metrics(self):
7890
DEFAULT_RUNTIME_METRICS
7991
)
8092

81-
for gauge in received:
93+
# check to last set of metrics returned to confirm tags were set
94+
for gauge in received[-len(DEFAULT_RUNTIME_METRICS):]:
8295
self.assertRegexpMatches(gauge, 'runtime-id:')
8396
self.assertRegexpMatches(gauge, 'service:parent')
8497
self.assertRegexpMatches(gauge, 'service:child')

0 commit comments

Comments
 (0)