diff --git a/CHANGELOG.md b/CHANGELOG.md index b63232109b..aa801604ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- `opentelemetry-instrumentation-celery`: Add `use_links` parameter to allow creating span links instead of parent-child relationships between task creation and execution spans. + ([#3002](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3002)) + ### Fixed ### Added diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 908f158507..dd87f021a9 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -47,6 +47,16 @@ def add(x, y): add.delay(42, 50) +Configuration +------------- + +The ``CeleryInstrumentor().instrument()`` method accepts the following arguments: + +* ``use_links`` (bool): When ``True``, Celery task execution spans will be linked to the + task creation spans instead of being created as child spans. This provides a looser + coupling between spans in distributed systems. Defaults to ``False`` to maintain + backward compatibility. + Setting up tracing ------------------ @@ -122,6 +132,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") + use_links = kwargs.get("use_links", False) # pylint: disable=attribute-defined-outside-init self._tracer = trace.get_tracer( @@ -130,6 +141,8 @@ def _instrument(self, **kwargs): tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) + # pylint: disable=attribute-defined-outside-init + self._use_links = use_links meter_provider = kwargs.get("meter_provider") meter = get_meter( @@ -170,14 +183,32 @@ def _trace_prerun(self, *args, **kwargs): self.update_task_duration_time(task_id) request = task.request tracectx = extract(request, getter=celery_getter) or None - token = context_api.attach(tracectx) if tracectx is not None else None logger.debug("prerun signal start task_id=%s", task_id) operation_name = f"{_TASK_RUN}/{task.name}" - span = self._tracer.start_span( - operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER - ) + + if self._use_links and tracectx is not None: + parent_span_context = trace.get_current_span( + tracectx + ).get_span_context() + links = ( + [trace.Link(parent_span_context)] + if parent_span_context.is_valid + else None + ) + span = self._tracer.start_span( + operation_name, links=links, kind=trace.SpanKind.CONSUMER + ) + # Don't attach the context when using links to avoid parent-child relationship + token = None + else: + token = ( + context_api.attach(tracectx) if tracectx is not None else None + ) + span = self._tracer.start_span( + operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER + ) activation = trace.use_span(span, end_on_exit=True) activation.__enter__() # pylint: disable=E1101 diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index c68b1bc758..b94e1fa270 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -222,6 +222,63 @@ def _retrieve_context_wrapper_none_token( unwrap(utils, "retrieve_context") + def test_task_use_links(self): + CeleryInstrumentor().instrument(use_links=True) + + result = task_add.delay(1, 2) + + timeout = time.time() + 60 * 1 # 1 minute from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + self.assertEqual(0, len(consumer.events)) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", + SpanAttributes.MESSAGING_DESTINATION: "celery", + }, + ) + + # Verify that consumer span is not a child of producer span when using links + self.assertIsNone(consumer.parent) + self.assertNotEqual( + consumer.context.trace_id, producer.context.trace_id + ) + + # Verify that consumer span has a link to the producer span + self.assertEqual(len(consumer.links), 1) + link = consumer.links[0] + self.assertEqual(link.context.span_id, producer.context.span_id) + self.assertEqual(link.context.trace_id, producer.context.trace_id) + class TestCelerySignatureTask(TestBase): def setUp(self):