Skip to content

Commit da6abc9

Browse files
author
mriamah
committed
Add test for raised exception in celery instrumentation using new semantic conventions
1 parent cba8dfa commit da6abc9

File tree

1 file changed

+79
-2
lines changed
  • instrumentation/opentelemetry-instrumentation-celery/tests

1 file changed

+79
-2
lines changed

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
)
2626
from opentelemetry.instrumentation.celery import CeleryInstrumentor, utils
2727
from opentelemetry.instrumentation.utils import unwrap
28-
from opentelemetry.semconv._incubating.attributes import messaging_attributes
28+
from opentelemetry.semconv._incubating.attributes import (
29+
exception_attributes,
30+
messaging_attributes,
31+
)
2932
from opentelemetry.semconv.trace import SpanAttributes
3033
from opentelemetry.test.test_base import TestBase
3134
from opentelemetry.trace import SpanKind, StatusCode
@@ -36,7 +39,7 @@
3639
class TestCeleryInstrumentation(TestBase):
3740
def setUp(self):
3841
super().setUp()
39-
_OpenTelemetrySemanticConventionStability._initialized = False
42+
_OpenTelemetrySemanticConventionStability._initialized = False # to have consistent behavior accross tests since this attribute ensures that initialization happens only once
4043
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
4144
self._thread = threading.Thread(target=self._worker.start)
4245
self._thread.daemon = True
@@ -286,6 +289,80 @@ def test_task_new_sem_conv(self):
286289
consumer.context.trace_id, producer.context.trace_id
287290
)
288291

292+
def test_task_raises_new_sem_conv(self):
293+
with mock.patch.dict(
294+
"os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging"}
295+
):
296+
CeleryInstrumentor().instrument()
297+
298+
result = task_raises.delay()
299+
300+
timeout = time.time() + 60 * 1 # 1 minutes from now
301+
while not result.ready():
302+
if time.time() > timeout:
303+
break
304+
time.sleep(0.05)
305+
306+
spans = self.sorted_spans(
307+
self.memory_exporter.get_finished_spans()
308+
)
309+
self.assertEqual(len(spans), 2)
310+
311+
consumer, producer = spans
312+
313+
self.assertEqual(
314+
consumer.name, "run/tests.celery_test_tasks.task_raises"
315+
)
316+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
317+
self.assertSpanHasAttributes(
318+
consumer,
319+
{
320+
"celery.action": "run",
321+
"celery.state": "FAILURE",
322+
messaging_attributes.MESSAGING_DESTINATION_NAME: "celery",
323+
"celery.task_name": "tests.celery_test_tasks.task_raises",
324+
},
325+
)
326+
327+
self.assertEqual(consumer.status.status_code, StatusCode.ERROR)
328+
329+
self.assertEqual(1, len(consumer.events))
330+
event = consumer.events[0]
331+
332+
self.assertIn(
333+
exception_attributes.EXCEPTION_STACKTRACE, event.attributes
334+
)
335+
336+
self.assertEqual(
337+
"tests.celery_test_tasks.CustomError",
338+
event.attributes[exception_attributes.EXCEPTION_TYPE],
339+
)
340+
341+
self.assertEqual(
342+
event.attributes[exception_attributes.EXCEPTION_MESSAGE],
343+
"The task failed!",
344+
)
345+
346+
self.assertEqual(
347+
producer.name,
348+
"apply_async/tests.celery_test_tasks.task_raises",
349+
)
350+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
351+
self.assertSpanHasAttributes(
352+
producer,
353+
{
354+
"celery.action": "apply_async",
355+
"celery.task_name": "tests.celery_test_tasks.task_raises",
356+
messaging_attributes.MESSAGING_DESTINATION_NAME: "celery",
357+
},
358+
)
359+
360+
self.assertNotEqual(consumer.parent, producer.context)
361+
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
362+
self.assertEqual(
363+
consumer.context.trace_id, producer.context.trace_id
364+
)
365+
289366
def test_task_both_sem_conv(self):
290367
with mock.patch.dict(
291368
"os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging/dup"}

0 commit comments

Comments
 (0)