Skip to content

Commit fdc7270

Browse files
committed
Add a processing pipeline to AsyncWorker
This makes it possible to do some processing/filtering of the traces before sending them to the agent. Add a FilterRequestsOnUrl processor to remove traces of incoming requests that match a regexp.
1 parent c2df100 commit fdc7270

File tree

8 files changed

+229
-6
lines changed

8 files changed

+229
-6
lines changed

ddtrace/processors.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import re
2+
3+
from .ext import http
4+
5+
class FilterRequestsOnUrl():
6+
"""Filter out traces from incoming http requests based on the request's url
7+
8+
This class takes as argument a list of regular expression patterns
9+
representing the urls to be excluded from tracing. A trace will be excluded
10+
if its root span contains a http.url tag and if this tag matches any of
11+
the provided regular expression using the standard python regexp match
12+
semantic (https://docs.python.org/2/library/re.html#re.match).
13+
14+
:param list regexps: the list of regular expressions (as strings) defining the urls that should be filtered out. (a single string is also accepted)
15+
16+
Examples:
17+
18+
To filter out http calls to domain api.example.com::
19+
20+
FilterRequestsOnUrl(r'http://api\.example\.com')
21+
22+
To filter out http calls to all first level subdomains from example.com::
23+
24+
FilterRequestOnUrl(r'http://.*+\.example\.com')
25+
26+
To filter out calls to both http://test.example.com and http://example.com/healthcheck::
27+
28+
FilterRequestOnUrl([r'http://test\.example\.com', r'http://example\.com/healthcheck'])
29+
30+
31+
"""
32+
def __init__(self, regexps):
33+
if isinstance(regexps, str):
34+
regexps = [regexps]
35+
self._regexps = [re.compile(regexp) for regexp in regexps]
36+
37+
def process_trace(self, trace):
38+
"""
39+
process_trace is called by the processing pipeline on each trace
40+
before it is sent to the agent, the returned value will be fed to the
41+
next step of the pipeline. If process_trace returns None, the whole
42+
trace is discarded.
43+
"""
44+
for span in trace:
45+
if span.parent_id == None and span.get_tag(http.URL) is not None:
46+
url = span.get_tag(http.URL)
47+
for regexp in self._regexps:
48+
if regexp.match(url):
49+
return None
50+
return trace

ddtrace/settings.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
PROCESSING_PIPELINE_KEY = "PROCESSING_PIPELINE"
2+
3+
#Shorter Alias
4+
PP_KEY = PROCESSING_PIPELINE_KEY

ddtrace/tracer.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .sampler import AllSampler
88
from .writer import AgentWriter
99
from .span import Span
10+
from .settings import PP_KEY
1011
from . import compat
1112
from os import getpid
1213

@@ -70,7 +71,7 @@ async def web_handler(request):
7071
return self._context_provider(*args, **kwargs)
7172

7273
def configure(self, enabled=None, hostname=None, port=None, sampler=None,
73-
context_provider=None, wrap_executor=None):
74+
context_provider=None, wrap_executor=None, settings=None):
7475
"""
7576
Configure an existing Tracer the easy way.
7677
Allow to configure or reconfigure a Tracer instance.
@@ -90,8 +91,12 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None,
9091
if enabled is not None:
9192
self.enabled = enabled
9293

93-
if hostname is not None or port is not None:
94-
self.writer = AgentWriter(hostname or self.DEFAULT_HOSTNAME, port or self.DEFAULT_PORT)
94+
processing_pipeline = None
95+
if settings is not None and PP_KEY in settings:
96+
processing_pipeline = settings[PP_KEY]
97+
98+
if hostname is not None or port is not None or processing_pipeline is not None:
99+
self.writer = AgentWriter(hostname or self.DEFAULT_HOSTNAME, port or self.DEFAULT_PORT, processing_pipeline=processing_pipeline)
95100

96101
if sampler is not None:
97102
self.sampler = sampler

ddtrace/writer.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222

2323
class AgentWriter(object):
2424

25-
def __init__(self, hostname='localhost', port=8126):
25+
def __init__(self, hostname='localhost', port=8126, processing_pipeline=None):
2626
self._pid = None
2727
self._traces = None
2828
self._services = None
2929
self._worker = None
30+
self._processing_pipeline = processing_pipeline
3031
self.api = api.API(hostname, port)
3132

3233
def write(self, spans=None, services=None):
@@ -52,17 +53,18 @@ def _reset_worker(self):
5253

5354
# ensure we have an active thread working on this queue
5455
if not self._worker or not self._worker.is_alive():
55-
self._worker = AsyncWorker(self.api, self._traces, self._services)
56+
self._worker = AsyncWorker(self.api, self._traces, self._services, processing_pipeline=self._processing_pipeline)
5657

5758

5859
class AsyncWorker(object):
5960

60-
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT):
61+
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, processing_pipeline=None):
6162
self._trace_queue = trace_queue
6263
self._service_queue = service_queue
6364
self._lock = threading.Lock()
6465
self._thread = None
6566
self._shutdown_timeout = shutdown_timeout
67+
self._processing_pipeline = processing_pipeline
6668
self._last_error_ts = 0
6769
self.api = api
6870
self.start()
@@ -119,6 +121,13 @@ def _target(self):
119121

120122
while True:
121123
traces = self._trace_queue.pop()
124+
if traces:
125+
# Before sending the traces, make them go through the
126+
# processing pipeline
127+
try:
128+
traces = self._apply_processing_pipeline(traces)
129+
except Exception as err:
130+
log.error("error while processing traces:{0}".format(err))
122131
if traces:
123132
# If we have data, let's try to send it.
124133
try:
@@ -155,6 +164,19 @@ def _log_error_status(self, result, result_name):
155164
getattr(result, "status", None), getattr(result, "reason", None),
156165
getattr(result, "msg", None))
157166

167+
def _apply_processing_pipeline(self, traces):
168+
if self._processing_pipeline is not None:
169+
processed_traces = []
170+
for trace in traces:
171+
for processor in self._processing_pipeline:
172+
trace = processor.process_trace(trace)
173+
if trace is None:
174+
break
175+
if trace is not None:
176+
processed_traces.append(trace)
177+
return processed_traces
178+
return traces
179+
158180

159181
class Q(object):
160182
"""

docs/index.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,28 @@ Users can pass along the parent_trace_id and parent_span_id via whatever method
309309
Advanced Usage
310310
--------------
311311

312+
Trace Filtering
313+
~~~~~~~~~~~~~~~
314+
315+
It is possible to filter or modify traces before they are sent to the agent by configuring the tracer with a processing pipeline. For instance to filter out all traces of incoming requests to a specific url::
316+
317+
processing_pipeline = [FilterRequestsOnUrl(r'http://test\.example\.com')]
318+
Tracer.configure(settings={'PROCESSING_PIPELINE': processing_pipeline})
319+
320+
All the processors in the processing pipeline will be evaluated sequentially for each trace and the resulting trace will either be sent to the agent or discarded depending on the output of the pipeline.
321+
322+
**Use the standard processors**
323+
324+
The library comes with a FilterRequestsOnUrl processor that can be used to filter out incoming requests to specific urls:
325+
326+
.. autoclass:: ddtrace.processors.FilterRequestsOnUrl
327+
:members:
328+
329+
**Write a custom processor**
330+
331+
Creating your own processors is as simple as implementing a class with a process_trace method and adding it to the processing pipeline parameter of Tracer.configure. process_trace should either return a trace to be fed to the next step of the pipeline or None if the trace should be discarded. (see processors.py for example implementations)
332+
333+
312334
API
313335
~~~
314336

tests/test_integration.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
from nose.tools import eq_, ok_
1010

1111
from ddtrace.api import API
12+
from ddtrace.ext import http
13+
from ddtrace.processors import FilterRequestsOnUrl
14+
from ddtrace.settings import PP_KEY
1215
from ddtrace.span import Span
1316
from ddtrace.tracer import Tracer
1417
from ddtrace.encoding import JSONEncoder, MsgpackEncoder, get_encoder
@@ -200,6 +203,27 @@ def test_worker_http_error_logging(self):
200203
ok_('failed_to_send traces to Agent: HTTP error status 400, reason Bad Request, message Content-Type:'
201204
in logged_errors[0])
202205

206+
def test_worker_filter_request(self):
207+
self.tracer.configure(settings={PP_KEY: [FilterRequestsOnUrl(r'http://example\.com/health')]})
208+
# spy the send() method
209+
self.api = self.tracer.writer.api
210+
self.api._put = mock.Mock(self.api._put, wraps=self.api._put)
211+
212+
span = self.tracer.trace('testing.filteredurl')
213+
span.set_tag(http.URL, 'http://example.com/health')
214+
span.finish()
215+
span = self.tracer.trace('testing.nonfilteredurl')
216+
span.set_tag(http.URL, 'http://example.com/api/resource')
217+
span.finish()
218+
self._wait_thread_flush()
219+
220+
# Only the second trace should have been sent
221+
eq_(self.api._put.call_count, 1)
222+
# check and retrieve the right call
223+
endpoint, payload = self._get_endpoint_payload(self.api._put.call_args_list, '/v0.3/traces')
224+
eq_(endpoint, '/v0.3/traces')
225+
eq_(len(payload), 1)
226+
eq_(payload[0][0]['name'], 'testing.nonfilteredurl')
203227

204228
@skipUnless(
205229
os.environ.get('TEST_DATADOG_INTEGRATION', False),

tests/test_processors.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from unittest import TestCase
2+
3+
from ddtrace.processors import FilterRequestsOnUrl
4+
from ddtrace.span import Span
5+
from ddtrace.ext.http import URL
6+
7+
class FilterRequestOnUrlTests(TestCase):
8+
def test_is_match(self):
9+
span = Span(name='Name', tracer=None)
10+
span.set_tag(URL, r'http://example.com')
11+
processor = FilterRequestsOnUrl('http://examp.*.com')
12+
trace = processor.process_trace([span])
13+
self.assertIsNone(trace)
14+
15+
def test_is_not_match(self):
16+
span = Span(name='Name', tracer=None)
17+
span.set_tag(URL, r'http://anotherexample.com')
18+
processor = FilterRequestsOnUrl('http://examp.*.com')
19+
trace = processor.process_trace([span])
20+
self.assertIsNotNone(trace)
21+
22+
def test_list_match(self):
23+
span = Span(name='Name', tracer=None)
24+
span.set_tag(URL, r'http://anotherdomain.example.com')
25+
processor = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
26+
trace = processor.process_trace([span])
27+
self.assertIsNone(trace)
28+
29+
def test_list_no_match(self):
30+
span = Span(name='Name', tracer=None)
31+
span.set_tag(URL, r'http://cooldomain.example.com')
32+
processor = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
33+
trace = processor.process_trace([span])
34+
self.assertIsNotNone(trace)

tests/test_writer.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from unittest import TestCase
2+
3+
from ddtrace.span import Span
4+
from ddtrace.writer import AsyncWorker, Q
5+
6+
class RemoveAllProcessor():
7+
def process_trace(self, trace):
8+
return None
9+
10+
class KeepAllProcessor():
11+
def process_trace(self, trace):
12+
return trace
13+
14+
class AddTagProcessor():
15+
def __init__(self, tag_name):
16+
self.tag_name = tag_name
17+
def process_trace(self, trace):
18+
for span in trace:
19+
span.set_tag(self.tag_name, "A value")
20+
return trace
21+
22+
class DummmyAPI():
23+
def __init__(self):
24+
self.traces = []
25+
def send_traces(self, traces):
26+
for trace in traces:
27+
self.traces.append(trace)
28+
29+
N_TRACES = 11
30+
31+
class AsyncWorkerTests(TestCase):
32+
def setUp(self):
33+
self.api = DummmyAPI()
34+
self.traces = Q()
35+
self.services = Q()
36+
for i in range(N_TRACES):
37+
self.traces.add([Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j-1 or None) for j in range(7)])
38+
39+
def test_processing_pipeline_keep_all(self):
40+
processing_pipeline = [KeepAllProcessor()]
41+
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
42+
worker.stop()
43+
worker.join()
44+
self.assertEqual(len(self.api.traces), N_TRACES)
45+
46+
def test_processing_pipeline_remove_all(self):
47+
processing_pipeline = [RemoveAllProcessor()]
48+
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
49+
worker.stop()
50+
worker.join()
51+
self.assertEqual(len(self.api.traces), 0)
52+
53+
def test_processing_pipeline_add_tag(self):
54+
tag_name = "Tag"
55+
processing_pipeline = [AddTagProcessor(tag_name)]
56+
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
57+
worker.stop()
58+
worker.join()
59+
self.assertEqual(len(self.api.traces), N_TRACES)
60+
for trace in self.api.traces:
61+
for span in trace:
62+
self.assertIsNotNone(span.get_tag(tag_name))

0 commit comments

Comments
 (0)