Skip to content

Commit 9cc5155

Browse files
committed
celery: allow using links instead of child spans for task execution
1 parent b232b9a commit 9cc5155

File tree

3 files changed

+97
-4
lines changed

3 files changed

+97
-4
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
### Added
15+
16+
- `opentelemetry-instrumentation-celery`: Add `use_links` parameter to allow creating span links instead of parent-child relationships between task creation and execution spans.
17+
([#3002](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3002))
18+
1419
### Fixed
1520

1621
### Added

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ def add(x, y):
4747
4848
add.delay(42, 50)
4949
50+
Configuration
51+
-------------
52+
53+
The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:
54+
55+
* ``use_links`` (bool): When ``True``, Celery task execution spans will be linked to the
56+
task creation spans instead of being created as child spans. This provides a looser
57+
coupling between spans in distributed systems. Defaults to ``False`` to maintain
58+
backward compatibility.
59+
5060
Setting up tracing
5161
------------------
5262
@@ -122,6 +132,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
122132

123133
def _instrument(self, **kwargs):
124134
tracer_provider = kwargs.get("tracer_provider")
135+
use_links = kwargs.get("use_links", False)
125136

126137
# pylint: disable=attribute-defined-outside-init
127138
self._tracer = trace.get_tracer(
@@ -130,6 +141,8 @@ def _instrument(self, **kwargs):
130141
tracer_provider,
131142
schema_url="https://opentelemetry.io/schemas/1.11.0",
132143
)
144+
# pylint: disable=attribute-defined-outside-init
145+
self._use_links = use_links
133146

134147
meter_provider = kwargs.get("meter_provider")
135148
meter = get_meter(
@@ -170,14 +183,32 @@ def _trace_prerun(self, *args, **kwargs):
170183
self.update_task_duration_time(task_id)
171184
request = task.request
172185
tracectx = extract(request, getter=celery_getter) or None
173-
token = context_api.attach(tracectx) if tracectx is not None else None
174186

175187
logger.debug("prerun signal start task_id=%s", task_id)
176188

177189
operation_name = f"{_TASK_RUN}/{task.name}"
178-
span = self._tracer.start_span(
179-
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
180-
)
190+
191+
if self._use_links and tracectx is not None:
192+
parent_span_context = trace.get_current_span(
193+
tracectx
194+
).get_span_context()
195+
links = (
196+
[trace.Link(parent_span_context)]
197+
if parent_span_context.is_valid
198+
else None
199+
)
200+
span = self._tracer.start_span(
201+
operation_name, links=links, kind=trace.SpanKind.CONSUMER
202+
)
203+
# Don't attach the context when using links to avoid parent-child relationship
204+
token = None
205+
else:
206+
token = (
207+
context_api.attach(tracectx) if tracectx is not None else None
208+
)
209+
span = self._tracer.start_span(
210+
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
211+
)
181212

182213
activation = trace.use_span(span, end_on_exit=True)
183214
activation.__enter__() # pylint: disable=E1101

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,63 @@ def _retrieve_context_wrapper_none_token(
222222

223223
unwrap(utils, "retrieve_context")
224224

225+
def test_task_use_links(self):
226+
CeleryInstrumentor().instrument(use_links=True)
227+
228+
result = task_add.delay(1, 2)
229+
230+
timeout = time.time() + 60 * 1 # 1 minute from now
231+
while not result.ready():
232+
if time.time() > timeout:
233+
break
234+
time.sleep(0.05)
235+
236+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
237+
self.assertEqual(len(spans), 2)
238+
239+
consumer, producer = spans
240+
241+
self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
242+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
243+
self.assertSpanHasAttributes(
244+
consumer,
245+
{
246+
"celery.action": "run",
247+
"celery.state": "SUCCESS",
248+
SpanAttributes.MESSAGING_DESTINATION: "celery",
249+
"celery.task_name": "tests.celery_test_tasks.task_add",
250+
},
251+
)
252+
253+
self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
254+
self.assertEqual(0, len(consumer.events))
255+
256+
self.assertEqual(
257+
producer.name, "apply_async/tests.celery_test_tasks.task_add"
258+
)
259+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
260+
self.assertSpanHasAttributes(
261+
producer,
262+
{
263+
"celery.action": "apply_async",
264+
"celery.task_name": "tests.celery_test_tasks.task_add",
265+
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
266+
SpanAttributes.MESSAGING_DESTINATION: "celery",
267+
},
268+
)
269+
270+
# Verify that consumer span is not a child of producer span when using links
271+
self.assertIsNone(consumer.parent)
272+
self.assertNotEqual(
273+
consumer.context.trace_id, producer.context.trace_id
274+
)
275+
276+
# Verify that consumer span has a link to the producer span
277+
self.assertEqual(len(consumer.links), 1)
278+
link = consumer.links[0]
279+
self.assertEqual(link.context.span_id, producer.context.span_id)
280+
self.assertEqual(link.context.trace_id, producer.context.trace_id)
281+
225282

226283
class TestCelerySignatureTask(TestBase):
227284
def setUp(self):

0 commit comments

Comments
 (0)