Skip to content

Commit 4f88de8

Browse files
committed
Adressing Manu's comments
1 parent 0ce2956 commit 4f88de8

File tree

7 files changed

+73
-23
lines changed

7 files changed

+73
-23
lines changed

ddtrace/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
PROCESSING_PIPELINE_KEY = 'PROCESSING_PIPELINE'

ddtrace/processors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .ext import http
44

5-
class FilterRequestsOnUrl():
5+
class FilterRequestsOnUrl(object):
66
"""Filter out traces from incoming http requests based on the request's url
77
88
This class takes as argument a list of regular expression patterns

ddtrace/settings.py

Lines changed: 0 additions & 4 deletions
This file was deleted.

ddtrace/tracer.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .sampler import AllSampler
88
from .writer import AgentWriter
99
from .span import Span
10-
from .settings import PP_KEY
10+
from .constants import PROCESSING_PIPELINE_KEY as PP_KEY
1111
from . import compat
1212
from os import getpid
1313

@@ -92,15 +92,15 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None,
9292
self.enabled = enabled
9393

9494
processing_pipeline = None
95-
if settings is not None and PP_KEY in settings:
96-
processing_pipeline = settings[PP_KEY]
95+
if settings is not None:
96+
processing_pipeline = settings.get([PP_KEY])
9797

9898
if hostname is not None or port is not None or processing_pipeline is not None:
9999
self.writer = AgentWriter(
100-
hostname or self.DEFAULT_HOSTNAME,
101-
port or self.DEFAULT_PORT,
102-
processing_pipeline=processing_pipeline
103-
)
100+
hostname or self.DEFAULT_HOSTNAME,
101+
port or self.DEFAULT_PORT,
102+
processing_pipeline=processing_pipeline
103+
)
104104

105105
if sampler is not None:
106106
self.sampler = sampler

ddtrace/writer.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ def _reset_worker(self):
5454
# ensure we have an active thread working on this queue
5555
if not self._worker or not self._worker.is_alive():
5656
self._worker = AsyncWorker(
57-
self.api,
58-
self._traces,
59-
self._services, processing_pipeline=self._processing_pipeline
60-
)
57+
self.api,
58+
self._traces,
59+
self._services,
60+
processing_pipeline=self._processing_pipeline,
61+
)
6162

6263

6364
class AsyncWorker(object):
@@ -169,6 +170,11 @@ def _log_error_status(self, result, result_name):
169170
getattr(result, "msg", None))
170171

171172
def _apply_processing_pipeline(self, traces):
173+
"""
174+
Here we make each trace go through the processing pipeline configured
175+
in the tracer. There is no need for a lock since the traces are owned
176+
by the AsyncWorker at that point.
177+
"""
172178
if self._processing_pipeline is not None:
173179
processed_traces = []
174180
for trace in traces:

docs/index.rst

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,23 +312,43 @@ Advanced Usage
312312
Trace Filtering
313313
~~~~~~~~~~~~~~~
314314

315-
It is possible to filter or modify traces before they are sent to the agent by configuring the tracer with a processing pipeline. For instance to filter out all traces of incoming requests to a specific url::
315+
It is possible to filter or modify traces before they are sent to the agent by
316+
configuring the tracer with a processing pipeline. For instance to filter out
317+
all traces of incoming requests to a specific url::
316318

317319
processing_pipeline = [FilterRequestsOnUrl(r'http://test\.example\.com')]
318320
Tracer.configure(settings={'PROCESSING_PIPELINE': processing_pipeline})
319321

320-
All the processors in the processing pipeline will be evaluated sequentially for each trace and the resulting trace will either be sent to the agent or discarded depending on the output of the pipeline.
322+
All the processors in the processing pipeline will be evaluated sequentially
323+
for each trace and the resulting trace will either be sent to the agent or
324+
discarded depending on the output of the pipeline.
321325

322326
**Use the standard processors**
323327

324-
The library comes with a FilterRequestsOnUrl processor that can be used to filter out incoming requests to specific urls:
328+
The library comes with a FilterRequestsOnUrl processor that can be used to
329+
filter out incoming requests to specific urls:
325330

326331
.. autoclass:: ddtrace.processors.FilterRequestsOnUrl
327332
:members:
328333

329334
**Write a custom processor**
330335

331-
Creating your own processors is as simple as implementing a class with a process_trace method and adding it to the processing pipeline parameter of Tracer.configure. process_trace should either return a trace to be fed to the next step of the pipeline or None if the trace should be discarded. (see processors.py for example implementations)
336+
Creating your own processors is as simple as implementing a class with a
337+
process_trace method and adding it to the processing pipeline parameter of
338+
Tracer.configure. process_trace should either return a trace to be fed to the
339+
next step of the pipeline or None if the trace should be discarded::
340+
341+
class ProcessorExample(object):
342+
def process_trace(self, trace):
343+
# write here your logic to return the `trace` or None;
344+
# `trace` instance is owned by the thread and you can alter
345+
# each single span or the whole trace if needed
346+
347+
# And then instantiate it with
348+
processing_pipeline = [ProcessorExample()]
349+
Tracer.configure(settings={'PROCESSING_PIPELINE': processing_pipeline})
350+
351+
(see processors.py for other example implementations)
332352

333353

334354
API

tests/test_writer.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,36 @@
44
from ddtrace.writer import AsyncWorker, Q
55

66
class RemoveAllProcessor():
7+
def __init__(self):
8+
self.processed_traces = 0
9+
710
def process_trace(self, trace):
11+
self.processed_traces += 1
812
return None
913

1014
class KeepAllProcessor():
15+
def __init__(self):
16+
self.processed_traces = 0
17+
1118
def process_trace(self, trace):
19+
self.processed_traces += 1
1220
return trace
1321

1422
class AddTagProcessor():
1523
def __init__(self, tag_name):
1624
self.tag_name = tag_name
25+
self.processed_traces = 0
26+
1727
def process_trace(self, trace):
28+
self.processed_traces += 1
1829
for span in trace:
1930
span.set_tag(self.tag_name, "A value")
2031
return trace
2132

2233
class DummmyAPI():
2334
def __init__(self):
2435
self.traces = []
36+
2537
def send_traces(self, traces):
2638
for trace in traces:
2739
self.traces.append(trace)
@@ -37,26 +49,41 @@ def setUp(self):
3749
self.traces.add([Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j-1 or None) for j in range(7)])
3850

3951
def test_processing_pipeline_keep_all(self):
40-
processing_pipeline = [KeepAllProcessor()]
52+
processor = KeepAllProcessor()
53+
processing_pipeline = [processor]
4154
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
4255
worker.stop()
4356
worker.join()
4457
self.assertEqual(len(self.api.traces), N_TRACES)
58+
self.assertEqual(processor.processed_traces, N_TRACES)
4559

4660
def test_processing_pipeline_remove_all(self):
47-
processing_pipeline = [RemoveAllProcessor()]
61+
processor = RemoveAllProcessor()
62+
processing_pipeline = [processor]
4863
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
4964
worker.stop()
5065
worker.join()
5166
self.assertEqual(len(self.api.traces), 0)
67+
self.assertEqual(processor.processed_traces, N_TRACES)
5268

5369
def test_processing_pipeline_add_tag(self):
5470
tag_name = "Tag"
55-
processing_pipeline = [AddTagProcessor(tag_name)]
71+
processor = AddTagProcessor(tag_name)
72+
processing_pipeline = [processor]
5673
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
5774
worker.stop()
5875
worker.join()
5976
self.assertEqual(len(self.api.traces), N_TRACES)
77+
self.assertEqual(processor.processed_traces, N_TRACES)
6078
for trace in self.api.traces:
6179
for span in trace:
6280
self.assertIsNotNone(span.get_tag(tag_name))
81+
82+
def test_processing_pipeline_short_circuit(self):
83+
processor = KeepAllProcessor()
84+
processing_pipeline = [RemoveAllProcessor(), processor]
85+
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
86+
worker.stop()
87+
worker.join()
88+
self.assertEqual(len(self.api.traces), 0)
89+
self.assertEqual(processor.processed_traces, 0)

0 commit comments

Comments
 (0)