Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-celery`: Add `use_links` parameter to allow creating span links instead of parent-child relationships between task creation and execution spans.
([#3002](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3002))

### Fixed

- `opentelemetry-instrumentation-dbapi`: fix crash retrieving libpq version when enabling commenter with psycopg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def add(x, y):

add.delay(42, 50)

Configuration
-------------

The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:

* ``use_links`` (bool): When ``True``, Celery task execution spans will be linked to the
task creation spans instead of being created as child spans. This provides a looser
coupling between spans in distributed systems. Defaults to ``False`` to maintain
backward compatibility.

Setting up tracing
------------------

Expand Down Expand Up @@ -122,6 +132,7 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
use_links = kwargs.get("use_links", False)

# pylint: disable=attribute-defined-outside-init
self._tracer = trace.get_tracer(
Expand All @@ -130,6 +141,8 @@ def _instrument(self, **kwargs):
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
# pylint: disable=attribute-defined-outside-init
self._use_links = use_links

meter_provider = kwargs.get("meter_provider")
meter = get_meter(
Expand Down Expand Up @@ -170,14 +183,32 @@ def _trace_prerun(self, *args, **kwargs):
self.update_task_duration_time(task_id)
request = task.request
tracectx = extract(request, getter=celery_getter) or None
token = context_api.attach(tracectx) if tracectx is not None else None

logger.debug("prerun signal start task_id=%s", task_id)

operation_name = f"{_TASK_RUN}/{task.name}"
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

if self._use_links and tracectx is not None:
parent_span_context = trace.get_current_span(
tracectx
).get_span_context()
links = (
[trace.Link(parent_span_context)]
if parent_span_context.is_valid
else None
)
span = self._tracer.start_span(
operation_name, links=links, kind=trace.SpanKind.CONSUMER
)
# Don't attach the context when using links to avoid parent-child relationship
token = None
else:
token = (
context_api.attach(tracectx) if tracectx is not None else None
)
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,63 @@ def _retrieve_context_wrapper_none_token(

unwrap(utils, "retrieve_context")

def test_task_use_links(self):
CeleryInstrumentor().instrument(use_links=True)

result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minute from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "SUCCESS",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_add",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)
Comment on lines +266 to +268
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section of code is just duplicated from test_task, the remaining assertions below are the only meaningful changes


# Verify that consumer span is not a child of producer span when using links
self.assertIsNone(consumer.parent)
self.assertNotEqual(
consumer.context.trace_id, producer.context.trace_id
)

# Verify that consumer span has a link to the producer span
self.assertEqual(len(consumer.links), 1)
link = consumer.links[0]
self.assertEqual(link.context.span_id, producer.context.span_id)
self.assertEqual(link.context.trace_id, producer.context.trace_id)


class TestCelerySignatureTask(TestBase):
def setUp(self):
Expand Down