Skip to content

Commit 9d7b57e

Browse files
committed
Test commit
1 parent e6869cd commit 9d7b57e

File tree

9 files changed

+519
-159
lines changed

9 files changed

+519
-159
lines changed

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 116 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# type: ignore
12
# Copyright The OpenTelemetry Authors
23
#
34
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,7 +61,7 @@ def add(x, y):
6061
"""
6162

6263
import logging
63-
from timeit import default_timer
64+
import time
6465
from typing import Collection, Iterable
6566

6667
from billiard import VERSION
@@ -76,6 +77,7 @@ def add(x, y):
7677
from opentelemetry.metrics import get_meter
7778
from opentelemetry.propagate import extract, inject
7879
from opentelemetry.propagators.textmap import Getter
80+
from opentelemetry.semconv._incubating.metrics import messaging_metrics
7981
from opentelemetry.semconv.trace import SpanAttributes
8082
from opentelemetry.trace.status import Status, StatusCode
8183

@@ -96,6 +98,12 @@ def add(x, y):
9698
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
9799
_TASK_NAME_KEY = "celery.task_name"
98100

101+
# Metric names
102+
_TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
103+
_TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
104+
_TASK_PROCESSING_TIME = messaging_metrics.MESSAGING_PROCESS_DURATION
105+
_TASK_PREFETCH_TIME = "messaging.prefetch.duration"
106+
99107

100108
class CeleryGetter(Getter):
101109
def get(self, carrier, key):
@@ -113,9 +121,37 @@ def keys(self, carrier):
113121
celery_getter = CeleryGetter()
114122

115123

124+
class TaskDurationTracker:
125+
def __init__(self, metrics):
126+
self.metrics = metrics
127+
self.tracker = {}
128+
129+
def record_start(self, key, step):
130+
self.tracker.setdefault(key, {})[step] = time.perf_counter()
131+
132+
def record_finish(self, key, metric_name, attributes):
133+
134+
try:
135+
time_elapsed = self._time_elapsed(key, metric_name)
136+
self.metrics[metric_name].record(
137+
max(0, time_elapsed), attributes=attributes
138+
)
139+
except KeyError:
140+
logger.warning("Failed to record %s for task %s", metric_name, key)
141+
142+
def _time_elapsed(self, key, step):
143+
end_time = time.perf_counter()
144+
try:
145+
start_time = self.tracker.get(key, {}).pop(step)
146+
time_elapsed = end_time - start_time
147+
return time_elapsed
148+
finally:
149+
# Cleanup operation
150+
if key in self.tracker and not self.tracker.get(key):
151+
self.tracker.pop(key)
152+
153+
116154
class CeleryInstrumentor(BaseInstrumentor):
117-
metrics = None
118-
task_id_to_start_time = {}
119155

120156
def instrumentation_dependencies(self) -> Collection[str]:
121157
return _instruments
@@ -139,60 +175,89 @@ def _instrument(self, **kwargs):
139175
schema_url="https://opentelemetry.io/schemas/1.11.0",
140176
)
141177

142-
self.create_celery_metrics(meter)
178+
self.metrics = _create_celery_worker_metrics(meter)
179+
self.time_tracker = TaskDurationTracker(self.metrics)
143180

181+
signals.task_received.connect(self._trace_received, weak=False)
144182
signals.task_prerun.connect(self._trace_prerun, weak=False)
145183
signals.task_postrun.connect(self._trace_postrun, weak=False)
146-
signals.before_task_publish.connect(
147-
self._trace_before_publish, weak=False
148-
)
149-
signals.after_task_publish.connect(
150-
self._trace_after_publish, weak=False
151-
)
184+
signals.before_task_publish.connect(self._trace_before_publish, weak=False)
185+
signals.after_task_publish.connect(self._trace_after_publish, weak=False)
152186
signals.task_failure.connect(self._trace_failure, weak=False)
153187
signals.task_retry.connect(self._trace_retry, weak=False)
154188

155189
def _uninstrument(self, **kwargs):
190+
signals.task_received.disconnect(self._trace_received)
156191
signals.task_prerun.disconnect(self._trace_prerun)
157192
signals.task_postrun.disconnect(self._trace_postrun)
158193
signals.before_task_publish.disconnect(self._trace_before_publish)
159194
signals.after_task_publish.disconnect(self._trace_after_publish)
160195
signals.task_failure.disconnect(self._trace_failure)
161196
signals.task_retry.disconnect(self._trace_retry)
162197

198+
def _trace_received(self, *args, **kwargs):
199+
"""
200+
On prerun signal, task is prefetched and prefetch timer starts
201+
"""
202+
203+
request = utils.retrieve_task_from_request(kwargs)
204+
205+
metrics_attributes = utils.get_metrics_attributes_from_request(request)
206+
self.metrics[_TASK_COUNT_PREFETCHED].add(1, attributes=metrics_attributes)
207+
self.time_tracker.record_start(request.task_id, _TASK_PREFETCH_TIME)
208+
163209
def _trace_prerun(self, *args, **kwargs):
210+
"""
211+
On prerun signal, task is no longer prefetched, and execution timer
212+
starts along with the task span
213+
"""
214+
164215
task = utils.retrieve_task(kwargs)
165216
task_id = utils.retrieve_task_id(kwargs)
166217

167218
if task is None or task_id is None:
168219
return
169220

170-
self.update_task_duration_time(task_id)
221+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
222+
self.metrics[_TASK_COUNT_PREFETCHED].add(-1, attributes=metrics_attributes)
223+
self.time_tracker.record_finish(
224+
task_id, _TASK_PREFETCH_TIME, metrics_attributes
225+
)
226+
self.time_tracker.record_start(task_id, _TASK_PROCESSING_TIME)
227+
171228
request = task.request
172229
tracectx = extract(request, getter=celery_getter) or None
173230
token = context_api.attach(tracectx) if tracectx is not None else None
174231

175-
logger.debug("prerun signal start task_id=%s", task_id)
176-
177232
operation_name = f"{_TASK_RUN}/{task.name}"
178233
span = self._tracer.start_span(
179234
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
180235
)
181236

182237
activation = trace.use_span(span, end_on_exit=True)
183-
activation.__enter__() # pylint: disable=E1101
238+
activation.__enter__()
184239
utils.attach_context(task, task_id, span, activation, token)
185240

241+
self.metrics[_TASK_COUNT_ACTIVE].add(1, attributes=metrics_attributes)
242+
186243
def _trace_postrun(self, *args, **kwargs):
244+
"""
245+
On postrun signal, task is no longer being executed
246+
"""
247+
187248
task = utils.retrieve_task(kwargs)
188249
task_id = utils.retrieve_task_id(kwargs)
189250

190251
if task is None or task_id is None:
191252
return
192253

193-
logger.debug("postrun signal task_id=%s", task_id)
254+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
255+
self.metrics[_TASK_COUNT_ACTIVE].add(-1, attributes=metrics_attributes)
256+
self.time_tracker.record_finish(
257+
task_id, _TASK_PROCESSING_TIME, metrics_attributes
258+
)
194259

195-
# retrieve and finish the Span
260+
# Retrieve and finish the Span
196261
ctx = utils.retrieve_context(task, task_id)
197262

198263
if ctx is None:
@@ -201,7 +266,7 @@ def _trace_postrun(self, *args, **kwargs):
201266

202267
span, activation, token = ctx
203268

204-
# request context tags
269+
# Request context tags
205270
if span.is_recording():
206271
span.set_attribute(_TASK_TAG_KEY, _TASK_RUN)
207272
utils.set_attributes_from_context(span, kwargs)
@@ -210,10 +275,8 @@ def _trace_postrun(self, *args, **kwargs):
210275

211276
activation.__exit__(None, None, None)
212277
utils.detach_context(task, task_id)
213-
self.update_task_duration_time(task_id)
214-
labels = {"task": task.name, "worker": task.request.hostname}
215-
self._record_histograms(task_id, labels)
216-
# if the process sending the task is not instrumented
278+
279+
# If the process sending the task is not instrumented,
217280
# there's no incoming context and no token to detach
218281
if token is not None:
219282
context_api.detach(token)
@@ -226,18 +289,16 @@ def _trace_before_publish(self, *args, **kwargs):
226289
return
227290

228291
if task is None:
229-
# task is an anonymous task send using send_task or using canvas workflow
292+
# task is an anonymous task sent using send_task or using canvas workflow
230293
# Signatures() to send to a task not in the current processes dependency
231294
# tree
232295
task_name = kwargs.get("sender", "unknown")
233296
else:
234297
task_name = task.name
235298
operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}"
236-
span = self._tracer.start_span(
237-
operation_name, kind=trace.SpanKind.PRODUCER
238-
)
299+
span = self._tracer.start_span(operation_name, kind=trace.SpanKind.PRODUCER)
239300

240-
# apply some attributes here because most of the data is not available
301+
# Apply some attributes here because most of the data is not available
241302
if span.is_recording():
242303
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
243304
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
@@ -247,9 +308,7 @@ def _trace_before_publish(self, *args, **kwargs):
247308
activation = trace.use_span(span, end_on_exit=True)
248309
activation.__enter__() # pylint: disable=E1101
249310

250-
utils.attach_context(
251-
task, task_id, span, activation, None, is_publish=True
252-
)
311+
utils.attach_context(task, task_id, span, activation, None, is_publish=True)
253312

254313
headers = kwargs.get("headers")
255314
if headers:
@@ -263,7 +322,7 @@ def _trace_after_publish(*args, **kwargs):
263322
if task is None or task_id is None:
264323
return
265324

266-
# retrieve and finish the Span
325+
# Retrieve and finish the Span
267326
ctx = utils.retrieve_context(task, task_id, is_publish=True)
268327

269328
if ctx is None:
@@ -272,7 +331,7 @@ def _trace_after_publish(*args, **kwargs):
272331

273332
_, activation, _ = ctx
274333

275-
activation.__exit__(None, None, None) # pylint: disable=E1101
334+
activation.__exit__(None, None, None)
276335
utils.detach_context(task, task_id, is_publish=True)
277336

278337
@staticmethod
@@ -345,29 +404,30 @@ def _trace_retry(*args, **kwargs):
345404
# something that isn't an `Exception`
346405
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
347406

348-
def update_task_duration_time(self, task_id):
349-
cur_time = default_timer()
350-
task_duration_time_until_now = (
351-
cur_time - self.task_id_to_start_time[task_id]
352-
if task_id in self.task_id_to_start_time
353-
else cur_time
354-
)
355-
self.task_id_to_start_time[task_id] = task_duration_time_until_now
356-
357-
def _record_histograms(self, task_id, metric_attributes):
358-
if task_id is None:
359-
return
360-
361-
self.metrics["flower.task.runtime.seconds"].record(
362-
self.task_id_to_start_time.get(task_id),
363-
attributes=metric_attributes,
364-
)
365407

366-
def create_celery_metrics(self, meter) -> None:
367-
self.metrics = {
368-
"flower.task.runtime.seconds": meter.create_histogram(
369-
name="flower.task.runtime.seconds",
370-
unit="seconds",
371-
description="The time it took to run the task.",
372-
)
373-
}
408+
def _create_celery_worker_metrics(meter) -> None:
409+
410+
metrics = {
411+
_TASK_PROCESSING_TIME: meter.create_histogram(
412+
name=_TASK_PROCESSING_TIME,
413+
unit="s",
414+
description="The time it took to run the task.",
415+
),
416+
_TASK_COUNT_ACTIVE: meter.create_up_down_counter(
417+
name=_TASK_COUNT_ACTIVE,
418+
unit="{message}",
419+
description="Number of tasks currently being executed by the worker",
420+
),
421+
_TASK_COUNT_PREFETCHED: meter.create_up_down_counter(
422+
name=_TASK_COUNT_PREFETCHED,
423+
unit="{message}",
424+
description="Number of tasks prefetched by the worker",
425+
),
426+
_TASK_PREFETCH_TIME: meter.create_histogram(
427+
name=_TASK_PREFETCH_TIME,
428+
unit="s",
429+
description="The time the task spent in prefetch mode",
430+
),
431+
}
432+
433+
return metrics

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# type: ignore
2+
13
# Copyright The OpenTelemetry Authors
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,6 +22,10 @@
2022
from celery import registry # pylint: disable=no-name-in-module
2123
from celery.app.task import Task
2224

25+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
26+
MESSAGING_CLIENT_ID,
27+
MESSAGING_OPERATION_NAME,
28+
)
2329
from opentelemetry.semconv.trace import SpanAttributes
2430
from opentelemetry.trace import Span
2531

@@ -91,9 +97,7 @@ def set_attributes_from_context(span, context):
9197
routing_key = value.get("routing_key")
9298

9399
if routing_key is not None:
94-
span.set_attribute(
95-
SpanAttributes.MESSAGING_DESTINATION, routing_key
96-
)
100+
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, routing_key)
97101

98102
value = str(value)
99103

@@ -217,6 +221,15 @@ def retrieve_task_id(kwargs):
217221
return task_id
218222

219223

224+
def retrieve_task_from_request(kwargs):
225+
# retry signal does not include task_id as argument so use request argument
226+
request = kwargs.get("request")
227+
if request is None:
228+
logger.debug("Unable to retrieve the request from signal arguments")
229+
230+
return request
231+
232+
220233
def retrieve_task_id_from_request(kwargs):
221234
# retry signal does not include task_id as argument so use request argument
222235
request = kwargs.get("request")
@@ -250,3 +263,17 @@ def retrieve_reason(kwargs):
250263
if not reason:
251264
logger.debug("Unable to retrieve the retry reason")
252265
return reason
266+
267+
268+
def get_metrics_attributes_from_request(request):
269+
return {
270+
MESSAGING_OPERATION_NAME: request.task.name,
271+
MESSAGING_CLIENT_ID: request.hostname,
272+
}
273+
274+
275+
def get_metrics_attributes_from_task(task):
276+
return {
277+
MESSAGING_OPERATION_NAME: task.name,
278+
MESSAGING_CLIENT_ID: task.request.hostname,
279+
}

instrumentation/opentelemetry-instrumentation-celery/test-requirements-0.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ amqp==5.2.0
22
asgiref==3.8.1
33
backports.zoneinfo==0.2.1
44
billiard==4.2.0
5-
celery==5.3.6
5+
celery[redis]==5.3.6
66
click==8.1.7
77
click-didyoumean==0.3.0
88
click-plugins==1.1.1
@@ -24,5 +24,6 @@ vine==5.1.0
2424
wcwidth==0.2.13
2525
wrapt==1.16.0
2626
zipp==3.19.2
27+
sqlalchemy
2728
-e opentelemetry-instrumentation
2829
-e instrumentation/opentelemetry-instrumentation-celery

0 commit comments

Comments
 (0)