Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-instrumentation-celery`: Add `use_links` parameter to allow creating span links instead of parent-child relationships between task creation and execution spans.
([#3002](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3002))

### Fixed

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def add(x, y):

add.delay(42, 50)

Configuration
-------------

The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:

* ``use_links`` (bool): When ``True``, Celery task execution spans will be linked to the
task creation spans instead of being created as child spans. This provides a looser
coupling between spans in distributed systems. Defaults to ``False`` to maintain
backward compatibility.

Setting up tracing
------------------

Expand Down Expand Up @@ -122,6 +132,7 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
use_links = kwargs.get("use_links", False)

# pylint: disable=attribute-defined-outside-init
self._tracer = trace.get_tracer(
Expand All @@ -130,6 +141,8 @@ def _instrument(self, **kwargs):
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
# pylint: disable=attribute-defined-outside-init
self._use_links = use_links

meter_provider = kwargs.get("meter_provider")
meter = get_meter(
Expand Down Expand Up @@ -170,14 +183,32 @@ def _trace_prerun(self, *args, **kwargs):
self.update_task_duration_time(task_id)
request = task.request
tracectx = extract(request, getter=celery_getter) or None
token = context_api.attach(tracectx) if tracectx is not None else None

logger.debug("prerun signal start task_id=%s", task_id)

operation_name = f"{_TASK_RUN}/{task.name}"
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

if self._use_links and tracectx is not None:
parent_span_context = trace.get_current_span(
tracectx
).get_span_context()
links = (
[trace.Link(parent_span_context)]
if parent_span_context.is_valid
else None
)
span = self._tracer.start_span(
operation_name, links=links, kind=trace.SpanKind.CONSUMER
)
# Don't attach the context when using links to avoid parent-child relationship
token = None
else:
token = (
context_api.attach(tracectx) if tracectx is not None else None
)
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,63 @@ def _retrieve_context_wrapper_none_token(

unwrap(utils, "retrieve_context")

def test_task_use_links(self):
CeleryInstrumentor().instrument(use_links=True)

result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minute from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "SUCCESS",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_add",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)
Comment on lines +266 to +268
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section of code is just duplicated from test_task, the remaining assertions below are the only meaningful changes


# Verify that consumer span is not a child of producer span when using links
self.assertIsNone(consumer.parent)
self.assertNotEqual(
consumer.context.trace_id, producer.context.trace_id
)

# Verify that consumer span has a link to the producer span
self.assertEqual(len(consumer.links), 1)
link = consumer.links[0]
self.assertEqual(link.context.span_id, producer.context.span_id)
self.assertEqual(link.context.trace_id, producer.context.trace_id)


class TestCelerySignatureTask(TestBase):
def setUp(self):
Expand Down