Skip to content

Commit e42b881

Browse files
committed
Renamed processing pipeline into filters
1 parent 2be5500 commit e42b881

File tree

8 files changed

+83
-83
lines changed

8 files changed

+83
-83
lines changed

ddtrace/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
PROCESSING_PIPELINE_KEY = 'PROCESSING_PIPELINE'
1+
FILTERS_KEY = 'FILTERS'

ddtrace/processors.py renamed to ddtrace/filters.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ def __init__(self, regexps):
3737

3838
def process_trace(self, trace):
3939
"""
40-
process_trace is called by the processing pipeline on each trace
41-
before it is sent to the agent, the returned value will be fed to the
42-
next step of the pipeline. If process_trace returns None, the whole
43-
trace is discarded.
40+
When the filter is registered in the tracer, process_trace is called by
41+
on each trace before it is sent to the agent, the returned value will
42+
be fed to the next filter in the list. If process_trace returns None,
43+
the whole trace is discarded.
4444
"""
4545
for span in trace:
4646
if span.parent_id is None and span.get_tag(http.URL) is not None:

ddtrace/tracer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .sampler import AllSampler
88
from .writer import AgentWriter
99
from .span import Span
10-
from .constants import PROCESSING_PIPELINE_KEY as PP_KEY
10+
from .constants import FILTERS_KEY
1111
from . import compat
1212
from os import getpid
1313

@@ -91,15 +91,15 @@ def configure(self, enabled=None, hostname=None, port=None, sampler=None,
9191
if enabled is not None:
9292
self.enabled = enabled
9393

94-
processing_pipeline = None
94+
filters = None
9595
if settings is not None:
96-
processing_pipeline = settings.get(PP_KEY)
96+
filters = settings.get(FILTERS_KEY)
9797

98-
if hostname is not None or port is not None or processing_pipeline is not None:
98+
if hostname is not None or port is not None or filters is not None:
9999
self.writer = AgentWriter(
100100
hostname or self.DEFAULT_HOSTNAME,
101101
port or self.DEFAULT_PORT,
102-
processing_pipeline=processing_pipeline
102+
filters=filters
103103
)
104104

105105
if sampler is not None:

ddtrace/writer.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222

2323
class AgentWriter(object):
2424

25-
def __init__(self, hostname='localhost', port=8126, processing_pipeline=None):
25+
def __init__(self, hostname='localhost', port=8126, filters=None):
2626
self._pid = None
2727
self._traces = None
2828
self._services = None
2929
self._worker = None
30-
self._processing_pipeline = processing_pipeline
30+
self._filters = filters
3131
self.api = api.API(hostname, port)
3232

3333
def write(self, spans=None, services=None):
@@ -57,19 +57,19 @@ def _reset_worker(self):
5757
self.api,
5858
self._traces,
5959
self._services,
60-
processing_pipeline=self._processing_pipeline,
60+
filters=self._filters,
6161
)
6262

6363

6464
class AsyncWorker(object):
6565

66-
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, processing_pipeline=None):
66+
def __init__(self, api, trace_queue, service_queue, shutdown_timeout=DEFAULT_TIMEOUT, filters=None):
6767
self._trace_queue = trace_queue
6868
self._service_queue = service_queue
6969
self._lock = threading.Lock()
7070
self._thread = None
7171
self._shutdown_timeout = shutdown_timeout
72-
self._processing_pipeline = processing_pipeline
72+
self._filters = filters
7373
self._last_error_ts = 0
7474
self.api = api
7575
self.start()
@@ -128,11 +128,11 @@ def _target(self):
128128
traces = self._trace_queue.pop()
129129
if traces:
130130
# Before sending the traces, make them go through the
131-
# processing pipeline
131+
# filters
132132
try:
133-
traces = self._apply_processing_pipeline(traces)
133+
traces = self._apply_filters(traces)
134134
except Exception as err:
135-
log.error("error while processing traces:{0}".format(err))
135+
log.error("error while filtering traces:{0}".format(err))
136136
if traces:
137137
# If we have data, let's try to send it.
138138
try:
@@ -169,22 +169,22 @@ def _log_error_status(self, result, result_name):
169169
getattr(result, "status", None), getattr(result, "reason", None),
170170
getattr(result, "msg", None))
171171

172-
def _apply_processing_pipeline(self, traces):
172+
def _apply_filters(self, traces):
173173
"""
174-
Here we make each trace go through the processing pipeline configured
175-
in the tracer. There is no need for a lock since the traces are owned
176-
by the AsyncWorker at that point.
174+
Here we make each trace go through the filters configured in the
175+
tracer. There is no need for a lock since the traces are owned by the
176+
AsyncWorker at that point.
177177
"""
178-
if self._processing_pipeline is not None:
179-
processed_traces = []
178+
if self._filters is not None:
179+
filtered_traces = []
180180
for trace in traces:
181-
for processor in self._processing_pipeline:
182-
trace = processor.process_trace(trace)
181+
for filtr in self._filters:
182+
trace = filtr.process_trace(trace)
183183
if trace is None:
184184
break
185185
if trace is not None:
186-
processed_traces.append(trace)
187-
return processed_traces
186+
filtered_traces.append(trace)
187+
return filtered_traces
188188
return traces
189189

190190

docs/index.rst

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,45 +313,45 @@ Trace Filtering
313313
~~~~~~~~~~~~~~~
314314

315315
It is possible to filter or modify traces before they are sent to the agent by
316-
configuring the tracer with a processing pipeline. For instance to filter out
316+
configuring the tracer with a filters list. For instance, to filter out
317317
all traces of incoming requests to a specific url::
318318

319319
Tracer.configure(settings={
320-
'PROCESSING_PIPELINE': [
320+
'FILTERS': [
321321
FilterRequestsOnUrl(r'http://test\.example\.com'),
322322
],
323323
})
324324

325-
All the processors in the processing pipeline will be evaluated sequentially
325+
All the filters in the filters list will be evaluated sequentially
326326
for each trace and the resulting trace will either be sent to the agent or
327-
discarded depending on the output of the pipeline.
327+
discarded depending on the output.
328328

329-
**Use the standard processors**
329+
**Use the standard filters**
330330

331-
The library comes with a FilterRequestsOnUrl processor that can be used to
331+
The library comes with a FilterRequestsOnUrl filter that can be used to
332332
filter out incoming requests to specific urls:
333333

334-
.. autoclass:: ddtrace.processors.FilterRequestsOnUrl
334+
.. autoclass:: ddtrace.filters.FilterRequestsOnUrl
335335
:members:
336336

337-
**Write a custom processor**
337+
**Write a custom filter**
338338

339-
Creating your own processors is as simple as implementing a class with a
340-
process_trace method and adding it to the processing pipeline parameter of
339+
Creating your own filters is as simple as implementing a class with a
340+
process_trace method and adding it to the filters parameter of
341341
Tracer.configure. process_trace should either return a trace to be fed to the
342342
next step of the pipeline or None if the trace should be discarded::
343343

344-
class ProcessorExample(object):
344+
class FilterExample(object):
345345
def process_trace(self, trace):
346346
# write here your logic to return the `trace` or None;
347347
# `trace` instance is owned by the thread and you can alter
348348
# each single span or the whole trace if needed
349349

350350
# And then instantiate it with
351-
processing_pipeline = [ProcessorExample()]
352-
Tracer.configure(settings={'PROCESSING_PIPELINE': processing_pipeline})
351+
filters = [FilterExample()]
352+
Tracer.configure(settings={'FILTERS': filters})
353353

354-
(see processors.py for other example implementations)
354+
(see filters.py for other example implementations)
355355

356356

357357
API
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,34 @@
11
from unittest import TestCase
22

3-
from ddtrace.processors import FilterRequestsOnUrl
3+
from ddtrace.filters import FilterRequestsOnUrl
44
from ddtrace.span import Span
55
from ddtrace.ext.http import URL
66

77
class FilterRequestOnUrlTests(TestCase):
88
def test_is_match(self):
99
span = Span(name='Name', tracer=None)
1010
span.set_tag(URL, r'http://example.com')
11-
processor = FilterRequestsOnUrl('http://examp.*.com')
12-
trace = processor.process_trace([span])
11+
filtr = FilterRequestsOnUrl('http://examp.*.com')
12+
trace = filtr.process_trace([span])
1313
self.assertIsNone(trace)
1414

1515
def test_is_not_match(self):
1616
span = Span(name='Name', tracer=None)
1717
span.set_tag(URL, r'http://anotherexample.com')
18-
processor = FilterRequestsOnUrl('http://examp.*.com')
19-
trace = processor.process_trace([span])
18+
filtr = FilterRequestsOnUrl('http://examp.*.com')
19+
trace = filtr.process_trace([span])
2020
self.assertIsNotNone(trace)
2121

2222
def test_list_match(self):
2323
span = Span(name='Name', tracer=None)
2424
span.set_tag(URL, r'http://anotherdomain.example.com')
25-
processor = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
26-
trace = processor.process_trace([span])
25+
filtr = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
26+
trace = filtr.process_trace([span])
2727
self.assertIsNone(trace)
2828

2929
def test_list_no_match(self):
3030
span = Span(name='Name', tracer=None)
3131
span.set_tag(URL, r'http://cooldomain.example.com')
32-
processor = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
33-
trace = processor.process_trace([span])
32+
filtr = FilterRequestsOnUrl(['http://domain\.example\.com', 'http://anotherdomain\.example\.com'])
33+
trace = filtr.process_trace([span])
3434
self.assertIsNotNone(trace)

tests/test_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010

1111
from ddtrace.api import API
1212
from ddtrace.ext import http
13-
from ddtrace.processors import FilterRequestsOnUrl
14-
from ddtrace.constants import PROCESSING_PIPELINE_KEY as PP_KEY
13+
from ddtrace.filters import FilterRequestsOnUrl
14+
from ddtrace.constants import FILTERS_KEY
1515
from ddtrace.span import Span
1616
from ddtrace.tracer import Tracer
1717
from ddtrace.encoding import JSONEncoder, MsgpackEncoder, get_encoder
@@ -204,7 +204,7 @@ def test_worker_http_error_logging(self):
204204
in logged_errors[0])
205205

206206
def test_worker_filter_request(self):
207-
self.tracer.configure(settings={PP_KEY: [FilterRequestsOnUrl(r'http://example\.com/health')]})
207+
self.tracer.configure(settings={FILTERS_KEY: [FilterRequestsOnUrl(r'http://example\.com/health')]})
208208
# spy the send() method
209209
self.api = self.tracer.writer.api
210210
self.api._put = mock.Mock(self.api._put, wraps=self.api._put)

tests/test_writer.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,29 @@
33
from ddtrace.span import Span
44
from ddtrace.writer import AsyncWorker, Q
55

6-
class RemoveAllProcessor():
6+
class RemoveAllFilter():
77
def __init__(self):
8-
self.processed_traces = 0
8+
self.filtered_traces = 0
99

1010
def process_trace(self, trace):
11-
self.processed_traces += 1
11+
self.filtered_traces += 1
1212
return None
1313

14-
class KeepAllProcessor():
14+
class KeepAllFilter():
1515
def __init__(self):
16-
self.processed_traces = 0
16+
self.filtered_traces = 0
1717

1818
def process_trace(self, trace):
19-
self.processed_traces += 1
19+
self.filtered_traces += 1
2020
return trace
2121

22-
class AddTagProcessor():
22+
class AddTagFilter():
2323
def __init__(self, tag_name):
2424
self.tag_name = tag_name
25-
self.processed_traces = 0
25+
self.filtered_traces = 0
2626

2727
def process_trace(self, trace):
28-
self.processed_traces += 1
28+
self.filtered_traces += 1
2929
for span in trace:
3030
span.set_tag(self.tag_name, "A value")
3131
return trace
@@ -48,42 +48,42 @@ def setUp(self):
4848
for i in range(N_TRACES):
4949
self.traces.add([Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j-1 or None) for j in range(7)])
5050

51-
def test_processing_pipeline_keep_all(self):
52-
processor = KeepAllProcessor()
53-
processing_pipeline = [processor]
54-
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
51+
def test_filters_keep_all(self):
52+
filtr = KeepAllFilter()
53+
filters = [filtr]
54+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
5555
worker.stop()
5656
worker.join()
5757
self.assertEqual(len(self.api.traces), N_TRACES)
58-
self.assertEqual(processor.processed_traces, N_TRACES)
58+
self.assertEqual(filtr.filtered_traces, N_TRACES)
5959

60-
def test_processing_pipeline_remove_all(self):
61-
processor = RemoveAllProcessor()
62-
processing_pipeline = [processor]
63-
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
60+
def test_filters_remove_all(self):
61+
filtr = RemoveAllFilter()
62+
filters = [filtr]
63+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
6464
worker.stop()
6565
worker.join()
6666
self.assertEqual(len(self.api.traces), 0)
67-
self.assertEqual(processor.processed_traces, N_TRACES)
67+
self.assertEqual(filtr.filtered_traces, N_TRACES)
6868

69-
def test_processing_pipeline_add_tag(self):
69+
def test_filters_add_tag(self):
7070
tag_name = "Tag"
71-
processor = AddTagProcessor(tag_name)
72-
processing_pipeline = [processor]
73-
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
71+
filtr = AddTagFilter(tag_name)
72+
filters = [filtr]
73+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
7474
worker.stop()
7575
worker.join()
7676
self.assertEqual(len(self.api.traces), N_TRACES)
77-
self.assertEqual(processor.processed_traces, N_TRACES)
77+
self.assertEqual(filtr.filtered_traces, N_TRACES)
7878
for trace in self.api.traces:
7979
for span in trace:
8080
self.assertIsNotNone(span.get_tag(tag_name))
8181

82-
def test_processing_pipeline_short_circuit(self):
83-
processor = KeepAllProcessor()
84-
processing_pipeline = [RemoveAllProcessor(), processor]
85-
worker = AsyncWorker(self.api, self.traces, self.services, processing_pipeline=processing_pipeline)
82+
def test_filters_short_circuit(self):
83+
filtr = KeepAllFilter()
84+
filters = [RemoveAllFilter(), filtr]
85+
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
8686
worker.stop()
8787
worker.join()
8888
self.assertEqual(len(self.api.traces), 0)
89-
self.assertEqual(processor.processed_traces, 0)
89+
self.assertEqual(filtr.filtered_traces, 0)

0 commit comments

Comments
 (0)