Skip to content

Commit 69693dc

Browse files
author
Emanuele Palazzetti
authored
Merge pull request #303 from bmermet/filter-traces
Add a processing pipeline to AsyncWorker
2 parents b3bc803 + e42b881 commit 69693dc

File tree

8 files changed

+292
-7
lines changed

8 files changed

+292
-7
lines changed

ddtrace/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
FILTERS_KEY = 'FILTERS'

ddtrace/filters.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import re
2+
3+
from .ext import http
4+
5+
class FilterRequestsOnUrl(object):
6+
"""Filter out traces from incoming http requests based on the request's url
7+
8+
This class takes as argument a list of regular expression patterns
9+
representing the urls to be excluded from tracing. A trace will be excluded
10+
if its root span contains a http.url tag and if this tag matches any of
11+
the provided regular expression using the standard python regexp match
12+
semantic (https://docs.python.org/2/library/re.html#re.match).
13+
14+
:param list regexps: the list of regular expressions (as strings) defining
15+
the urls that should be filtered out. (a single string is also accepted)
16+
17+
Examples:
18+
19+
To filter out http calls to domain api.example.com::
20+
21+
FilterRequestsOnUrl(r'http://api\.example\.com')
22+
23+
To filter out http calls to all first level subdomains from example.com::
24+
25+
FilterRequestOnUrl(r'http://.*+\.example\.com')
26+
27+
To filter out calls to both http://test.example.com and http://example.com/healthcheck::
28+
29+
FilterRequestOnUrl([r'http://test\.example\.com', r'http://example\.com/healthcheck'])
30+
31+
32+
"""
33+
def __init__(self, regexps):
34+
if isinstance(regexps, str):
35+
regexps = [regexps]
36+
self._regexps = [re.compile(regexp) for regexp in regexps]
37+
38+
def process_trace(self, trace):
39+
"""
40+
When the filter is registered in the tracer, process_trace is called by
41+
on each trace before it is sent to the agent, the returned value will
42+
be fed to the next filter in the list. If process_trace returns None,
43+
the whole trace is discarded.
44+
"""
45+
for span in trace:
46+
if span.parent_id is None and span.get_tag(http.URL) is not None:
47+
url = span.get_tag(http.URL)
48+
for regexp in self._regexps:
49+
if regexp.match(url):
50+
return None
51+
return trace

ddtrace/tracer.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .sampler import AllSampler
88
from .writer import AgentWriter
99
from .span import Span
10+
from .constants import FILTERS_KEY
1011
from . import compat
1112
from os import getpid
1213

@@ -70,7 +71,7 @@ async def web_handler(request):
7071
return self._context_provider(*args, **kwargs)
7172

7273
def configure(self, enabled=None, hostname=None, port=None, sampler=None,
73-
context_provider=None, wrap_executor=None):
74+
context_provider=None, wrap_executor=None, settings=None):
7475
"""
7576
Configure an existing Tracer the easy way.
7677
Allow to configure or reconfigure a Tracer instance.
@@ -90,8 +91,16 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None,
9091
if enabled is not None:
9192
self.enabled = enabled
9293

93-
if hostname is not None or port is not None:
94-
self.writer = AgentWriter(hostname or self.DEFAULT_HOSTNAME, port or self.DEFAULT_PORT)
94+
filters = None
95+
if settings is not None:
96+
filters = settings.get(FILTERS_KEY)
97+
98+
if hostname is not None or port is not None or filters is not None:
99+
self.writer = AgentWriter(
100+
hostname or self.DEFAULT_HOSTNAME,
101+
port or self.DEFAULT_PORT,
102+
filters=filters
103+
)
95104

96105
if sampler is not None:
97106
self.sampler = sampler

ddtrace/writer.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222

2323
class AgentWriter(object):
2424

25-
def __init__(self, hostname='localhost', port=8126):
25+
def __init__(self, hostname='localhost', port=8126, filters=None):
2626
self._pid = None
2727
self._traces = None
2828
self._services = None
2929
self._worker = None
30+
self._filters = filters
3031
self.api = api.API(hostname, port)
3132

3233
def write(self, spans=None, services=None):
@@ -52,17 +53,23 @@ def _reset_worker(self):
5253

5354
# ensure we have an active thread working on this queue
5455
if not self._worker or not self._worker.is_alive():
55-
self._worker = AsyncWorker(self.api, self._traces, self._services)
56+
self._worker = AsyncWorker(
57+
self.api,
58+
self._traces,
59+
self._services,
60+
filters=self._filters,
61+
)
5662

5763

5864
class AsyncWorker(object):
5965

60-
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT):
66+
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, filters=None):
6167
self._trace_queue = trace_queue
6268
self._service_queue = service_queue
6369
self._lock = threading.Lock()
6470
self._thread = None
6571
self._shutdown_timeout = shutdown_timeout
72+
self._filters = filters
6673
self._last_error_ts = 0
6774
self.api = api
6875
self.start()
@@ -119,6 +126,13 @@ def _target(self):
119126

120127
while True:
121128
traces = self._trace_queue.pop()
129+
if traces:
130+
# Before sending the traces, make them go through the
131+
# filters
132+
try:
133+
traces = self._apply_filters(traces)
134+
except Exception as err:
135+
log.error("error while filtering traces:{0}".format(err))
122136
if traces:
123137
# If we have data, let's try to send it.
124138
try:
@@ -133,7 +147,7 @@ def _target(self):
133147
except Exception as err:
134148
log.error("cannot send services: {0}".format(err))
135149

136-
elif self._trace_queue.closed():
150+
if self._trace_queue.closed() and self._trace_queue.size() == 0:
137151
# no traces and the queue is closed. our work is done
138152
return
139153

@@ -155,6 +169,24 @@ def _log_error_status(self, result, result_name):
155169
getattr(result, "status", None), getattr(result, "reason", None),
156170
getattr(result, "msg", None))
157171

172+
def _apply_filters(self, traces):
173+
"""
174+
Here we make each trace go through the filters configured in the
175+
tracer. There is no need for a lock since the traces are owned by the
176+
AsyncWorker at that point.
177+
"""
178+
if self._filters is not None:
179+
filtered_traces = []
180+
for trace in traces:
181+
for filtr in self._filters:
182+
trace = filtr.process_trace(trace)
183+
if trace is None:
184+
break
185+
if trace is not None:
186+
filtered_traces.append(trace)
187+
return filtered_traces
188+
return traces
189+
158190

159191
class Q(object):
160192
"""

docs/index.rst

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,51 @@ Users can pass along the parent_trace_id and parent_span_id via whatever method
312312
Advanced Usage
313313
--------------
314314

315+
Trace Filtering
316+
~~~~~~~~~~~~~~~
317+
318+
It is possible to filter or modify traces before they are sent to the agent by
319+
configuring the tracer with a filters list. For instance, to filter out
320+
all traces of incoming requests to a specific url::
321+
322+
Tracer.configure(settings={
323+
'FILTERS': [
324+
FilterRequestsOnUrl(r'http://test\.example\.com'),
325+
],
326+
})
327+
328+
All the filters in the filters list will be evaluated sequentially
329+
for each trace and the resulting trace will either be sent to the agent or
330+
discarded depending on the output.
331+
332+
**Use the standard filters**
333+
334+
The library comes with a FilterRequestsOnUrl filter that can be used to
335+
filter out incoming requests to specific urls:
336+
337+
.. autoclass:: ddtrace.filters.FilterRequestsOnUrl
338+
:members:
339+
340+
**Write a custom filter**
341+
342+
Creating your own filters is as simple as implementing a class with a
343+
process_trace method and adding it to the filters parameter of
344+
Tracer.configure. process_trace should either return a trace to be fed to the
345+
next step of the pipeline or None if the trace should be discarded::
346+
347+
class FilterExample(object):
348+
def process_trace(self, trace):
349+
# write here your logic to return the `trace` or None;
350+
# `trace` instance is owned by the thread and you can alter
351+
# each single span or the whole trace if needed
352+
353+
# And then instantiate it with
354+
filters = [FilterExample()]
355+
Tracer.configure(settings={'FILTERS': filters})
356+
357+
(see filters.py for other example implementations)
358+
359+
315360
API
316361
~~~
317362

tests/test_filters.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from unittest import TestCase
2+
3+
from ddtrace.filters import FilterRequestsOnUrl
4+
from ddtrace.span import Span
5+
from ddtrace.ext.http import URL
6+
7+
class FilterRequestOnUrlTests(TestCase):
8+
def test_is_match(self):
9+
span = Span(name='Name', tracer=None)
10+
span.set_tag(URL, r'http://example.com')
11+
filtr = FilterRequestsOnUrl('http://examp.*.com')
12+
trace = filtr.process_trace([span])
13+
self.assertIsNone(trace)
14+
15+
def test_is_not_match(self):
16+
span = Span(name='Name', tracer=None)
17+
span.set_tag(URL, r'http://anotherexample.com')
18+
filtr = FilterRequestsOnUrl('http://examp.*.com')
19+
trace = filtr.process_trace([span])
20+
self.assertIsNotNone(trace)
21+
22+
def test_list_match(self):
23+
span = Span(name='Name', tracer=None)
24+
span.set_tag(URL, r'http://anotherdomain.example.com')
25+
filtr = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
26+
trace = filtr.process_trace([span])
27+
self.assertIsNone(trace)
28+
29+
def test_list_no_match(self):
30+
span = Span(name='Name', tracer=None)
31+
span.set_tag(URL, r'http://cooldomain.example.com')
32+
filtr = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
33+
trace = filtr.process_trace([span])
34+
self.assertIsNotNone(trace)

tests/test_integration.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
from nose.tools import eq_, ok_
1010

1111
from ddtrace.api import API
12+
from ddtrace.ext import http
13+
from ddtrace.filters import FilterRequestsOnUrl
14+
from ddtrace.constants import FILTERS_KEY
1215
from ddtrace.span import Span
1316
from ddtrace.tracer import Tracer
1417
from ddtrace.encoding import JSONEncoder, MsgpackEncoder, get_encoder
@@ -200,6 +203,27 @@ def test_worker_http_error_logging(self):
200203
ok_('failed_to_send traces to Agent: HTTP error status 400, reason Bad Request, message Content-Type:'
201204
in logged_errors[0])
202205

206+
def test_worker_filter_request(self):
207+
self.tracer.configure(settings={FILTERS_KEY: [FilterRequestsOnUrl(r'http://example\.com/health')]})
208+
# spy the send() method
209+
self.api = self.tracer.writer.api
210+
self.api._put = mock.Mock(self.api._put, wraps=self.api._put)
211+
212+
span = self.tracer.trace('testing.filteredurl')
213+
span.set_tag(http.URL, 'http://example.com/health')
214+
span.finish()
215+
span = self.tracer.trace('testing.nonfilteredurl')
216+
span.set_tag(http.URL, 'http://example.com/api/resource')
217+
span.finish()
218+
self._wait_thread_flush()
219+
220+
# Only the second trace should have been sent
221+
eq_(self.api._put.call_count, 1)
222+
# check and retrieve the right call
223+
endpoint, payload = self._get_endpoint_payload(self.api._put.call_args_list, '/v0.3/traces')
224+
eq_(endpoint, '/v0.3/traces')
225+
eq_(len(payload), 1)
226+
eq_(payload[0][0]['name'], 'testing.nonfilteredurl')
203227

204228
@skipUnless(
205229
os.environ.get('TEST_DATADOG_INTEGRATION', False),

tests/test_writer.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from unittest import TestCase
2+
3+
from ddtrace.span import Span
4+
from ddtrace.writer import AsyncWorker, Q
5+
6+
class RemoveAllFilter():
7+
def __init__(self):
8+
self.filtered_traces = 0
9+
10+
def process_trace(self, trace):
11+
self.filtered_traces += 1
12+
return None
13+
14+
class KeepAllFilter():
15+
def __init__(self):
16+
self.filtered_traces = 0
17+
18+
def process_trace(self, trace):
19+
self.filtered_traces += 1
20+
return trace
21+
22+
class AddTagFilter():
23+
def __init__(self, tag_name):
24+
self.tag_name = tag_name
25+
self.filtered_traces = 0
26+
27+
def process_trace(self, trace):
28+
self.filtered_traces += 1
29+
for span in trace:
30+
span.set_tag(self.tag_name, "A value")
31+
return trace
32+
33+
class DummmyAPI():
34+
def __init__(self):
35+
self.traces = []
36+
37+
def send_traces(self, traces):
38+
for trace in traces:
39+
self.traces.append(trace)
40+
41+
N_TRACES = 11
42+
43+
class AsyncWorkerTests(TestCase):
44+
def setUp(self):
45+
self.api = DummmyAPI()
46+
self.traces = Q()
47+
self.services = Q()
48+
for i in range(N_TRACES):
49+
self.traces.add([Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j-1 or None) for j in range(7)])
50+
51+
def test_filters_keep_all(self):
52+
filtr = KeepAllFilter()
53+
filters = [filtr]
54+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
55+
worker.stop()
56+
worker.join()
57+
self.assertEqual(len(self.api.traces), N_TRACES)
58+
self.assertEqual(filtr.filtered_traces, N_TRACES)
59+
60+
def test_filters_remove_all(self):
61+
filtr = RemoveAllFilter()
62+
filters = [filtr]
63+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
64+
worker.stop()
65+
worker.join()
66+
self.assertEqual(len(self.api.traces), 0)
67+
self.assertEqual(filtr.filtered_traces, N_TRACES)
68+
69+
def test_filters_add_tag(self):
70+
tag_name = "Tag"
71+
filtr = AddTagFilter(tag_name)
72+
filters = [filtr]
73+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
74+
worker.stop()
75+
worker.join()
76+
self.assertEqual(len(self.api.traces), N_TRACES)
77+
self.assertEqual(filtr.filtered_traces, N_TRACES)
78+
for trace in self.api.traces:
79+
for span in trace:
80+
self.assertIsNotNone(span.get_tag(tag_name))
81+
82+
def test_filters_short_circuit(self):
83+
filtr = KeepAllFilter()
84+
filters = [RemoveAllFilter(), filtr]
85+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
86+
worker.stop()
87+
worker.join()
88+
self.assertEqual(len(self.api.traces), 0)
89+
self.assertEqual(filtr.filtered_traces, 0)

0 commit comments

Comments
 (0)