Skip to content

Commit 2952888

Browse files
sciyoshibrettlangdon
authored andcommitted
[celery] Don't mark expected failures as errors (#820)
1 parent d96223a commit 2952888

File tree

2 files changed

+60
-7
lines changed

2 files changed

+60
-7
lines changed

ddtrace/contrib/celery/signals.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,7 @@
55
from celery import registry
66

77
from . import constants as c
8-
from .utils import (
9-
tags_from_context,
10-
retrieve_task_id,
11-
attach_span,
12-
detach_span,
13-
retrieve_span,
14-
)
8+
from .utils import tags_from_context, retrieve_task_id, attach_span, detach_span, retrieve_span
159

1610
log = logging.getLogger(__name__)
1711
SPAN_TYPE = 'worker'
@@ -129,6 +123,8 @@ def trace_failure(*args, **kwargs):
129123
ex = kwargs.get('einfo')
130124
if ex is None:
131125
return
126+
if hasattr(task, 'throws') and isinstance(ex.exception, task.throws):
127+
return
132128
span.set_exc_info(ex.type, ex.exception, ex.tb)
133129

134130

tests/contrib/celery/test_integration.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@
1010
from tests.opentracer.utils import init_tracer
1111

1212

13+
class MyException(Exception):
14+
pass
15+
16+
1317
class CeleryIntegrationTask(CeleryBaseTestCase):
1418
"""Ensures that the tracer works properly with a real Celery application
1519
without breaking the Application or Task API.
1620
"""
21+
1722
def test_concurrent_delays(self):
1823
# it should create one trace for each delayed execution
1924
@self.app.task
@@ -196,6 +201,28 @@ def fn_exception():
196201
ok_('Traceback (most recent call last)' in span.get_tag('error.stack'))
197202
ok_('Task class is failing' in span.get_tag('error.stack'))
198203

204+
def test_fn_exception_expected(self):
205+
# it should catch exceptions in task functions
206+
@self.app.task(throws=(MyException,))
207+
def fn_exception():
208+
raise MyException('Task class is failing')
209+
210+
t = fn_exception.apply()
211+
ok_(t.failed())
212+
ok_('Task class is failing' in t.traceback)
213+
214+
traces = self.tracer.writer.pop_traces()
215+
eq_(1, len(traces))
216+
eq_(1, len(traces[0]))
217+
span = traces[0][0]
218+
eq_(span.name, 'celery.run')
219+
eq_(span.resource, 'tests.contrib.celery.test_integration.fn_exception')
220+
eq_(span.service, 'celery-worker')
221+
eq_(span.get_tag('celery.id'), t.task_id)
222+
eq_(span.get_tag('celery.action'), 'run')
223+
eq_(span.get_tag('celery.state'), 'FAILURE')
224+
eq_(span.error, 0)
225+
199226
def test_fn_retry_exception(self):
200227
# it should not catch retry exceptions in task functions
201228
@self.app.task
@@ -282,6 +309,36 @@ def run(self):
282309
ok_('Traceback (most recent call last)' in span.get_tag('error.stack'))
283310
ok_('Task class is failing' in span.get_tag('error.stack'))
284311

312+
def test_class_task_exception_expected(self):
313+
# it should catch exceptions in class based tasks
314+
class BaseTask(self.app.Task):
315+
throws = (MyException,)
316+
317+
def run(self):
318+
raise MyException('Task class is failing')
319+
320+
t = BaseTask()
321+
# register the Task class if it's available (required in Celery 4.0+)
322+
register_task = getattr(self.app, 'register_task', None)
323+
if register_task is not None:
324+
register_task(t)
325+
326+
r = t.apply()
327+
ok_(r.failed())
328+
ok_('Task class is failing' in r.traceback)
329+
330+
traces = self.tracer.writer.pop_traces()
331+
eq_(1, len(traces))
332+
eq_(1, len(traces[0]))
333+
span = traces[0][0]
334+
eq_(span.name, 'celery.run')
335+
eq_(span.resource, 'tests.contrib.celery.test_integration.BaseTask')
336+
eq_(span.service, 'celery-worker')
337+
eq_(span.get_tag('celery.id'), r.task_id)
338+
eq_(span.get_tag('celery.action'), 'run')
339+
eq_(span.get_tag('celery.state'), 'FAILURE')
340+
eq_(span.error, 0)
341+
285342
def test_shared_task(self):
286343
# Ensure Django Shared Task are supported
287344
@celery.shared_task

0 commit comments

Comments
 (0)