Skip to content

Commit ecc6c2d

Browse files
committed
Additional metrics exported from Celery workers
1 parent e6869cd commit ecc6c2d

File tree

5 files changed

+510
-108
lines changed

5 files changed

+510
-108
lines changed

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 107 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# type: ignore
12
# Copyright The OpenTelemetry Authors
23
#
34
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,7 +61,7 @@ def add(x, y):
6061
"""
6162

6263
import logging
63-
from timeit import default_timer
64+
import time
6465
from typing import Collection, Iterable
6566

6667
from billiard import VERSION
@@ -76,6 +77,7 @@ def add(x, y):
7677
from opentelemetry.metrics import get_meter
7778
from opentelemetry.propagate import extract, inject
7879
from opentelemetry.propagators.textmap import Getter
80+
from opentelemetry.semconv._incubating.metrics import messaging_metrics
7981
from opentelemetry.semconv.trace import SpanAttributes
8082
from opentelemetry.trace.status import Status, StatusCode
8183

@@ -96,6 +98,12 @@ def add(x, y):
9698
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
9799
_TASK_NAME_KEY = "celery.task_name"
98100

101+
# Metric names
102+
_TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
103+
_TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
104+
_TASK_PROCESSING_TIME = messaging_metrics.MESSAGING_PROCESS_DURATION
105+
_TASK_PREFETCH_TIME = "messaging.prefetch.duration"
106+
99107

100108
class CeleryGetter(Getter):
101109
def get(self, carrier, key):
@@ -113,10 +121,36 @@ def keys(self, carrier):
113121
celery_getter = CeleryGetter()
114122

115123

116-
class CeleryInstrumentor(BaseInstrumentor):
117-
metrics = None
118-
task_id_to_start_time = {}
124+
class TaskDurationTracker:
125+
def __init__(self, metrics):
126+
self.metrics = metrics
127+
self.tracker = {}
128+
129+
def record_start(self, key, step):
130+
self.tracker.setdefault(key, {})[step] = time.perf_counter()
131+
132+
def record_finish(self, key, metric_name, attributes):
133+
try:
134+
time_elapsed = self._time_elapsed(key, metric_name)
135+
self.metrics[metric_name].record(
136+
max(0, time_elapsed), attributes=attributes
137+
)
138+
except KeyError:
139+
logger.warning("Failed to record %s for task %s", metric_name, key)
140+
141+
def _time_elapsed(self, key, step):
142+
end_time = time.perf_counter()
143+
try:
144+
start_time = self.tracker.get(key, {}).pop(step)
145+
time_elapsed = end_time - start_time
146+
return time_elapsed
147+
finally:
148+
# Cleanup operation
149+
if key in self.tracker and not self.tracker.get(key):
150+
self.tracker.pop(key)
151+
119152

153+
class CeleryInstrumentor(BaseInstrumentor):
120154
def instrumentation_dependencies(self) -> Collection[str]:
121155
return _instruments
122156

@@ -139,8 +173,10 @@ def _instrument(self, **kwargs):
139173
schema_url="https://opentelemetry.io/schemas/1.11.0",
140174
)
141175

142-
self.create_celery_metrics(meter)
176+
self.metrics = _create_celery_worker_metrics(meter)
177+
self.time_tracker = TaskDurationTracker(self.metrics)
143178

179+
signals.task_received.connect(self._trace_received, weak=False)
144180
signals.task_prerun.connect(self._trace_prerun, weak=False)
145181
signals.task_postrun.connect(self._trace_postrun, weak=False)
146182
signals.before_task_publish.connect(
@@ -153,27 +189,52 @@ def _instrument(self, **kwargs):
153189
signals.task_retry.connect(self._trace_retry, weak=False)
154190

155191
def _uninstrument(self, **kwargs):
192+
signals.task_received.disconnect(self._trace_received)
156193
signals.task_prerun.disconnect(self._trace_prerun)
157194
signals.task_postrun.disconnect(self._trace_postrun)
158195
signals.before_task_publish.disconnect(self._trace_before_publish)
159196
signals.after_task_publish.disconnect(self._trace_after_publish)
160197
signals.task_failure.disconnect(self._trace_failure)
161198
signals.task_retry.disconnect(self._trace_retry)
162199

200+
def _trace_received(self, *args, **kwargs):
201+
"""
202+
On prerun signal, task is prefetched and prefetch timer starts
203+
"""
204+
205+
request = utils.retrieve_request(kwargs)
206+
207+
metrics_attributes = utils.get_metrics_attributes_from_request(request)
208+
self.metrics[_TASK_COUNT_PREFETCHED].add(
209+
1, attributes=metrics_attributes
210+
)
211+
self.time_tracker.record_start(request.task_id, _TASK_PREFETCH_TIME)
212+
163213
def _trace_prerun(self, *args, **kwargs):
214+
"""
215+
On prerun signal, task is no longer prefetched, and execution timer
216+
starts along with the task span
217+
"""
218+
164219
task = utils.retrieve_task(kwargs)
165220
task_id = utils.retrieve_task_id(kwargs)
166221

167222
if task is None or task_id is None:
168223
return
169224

170-
self.update_task_duration_time(task_id)
225+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
226+
self.metrics[_TASK_COUNT_PREFETCHED].add(
227+
-1, attributes=metrics_attributes
228+
)
229+
self.time_tracker.record_finish(
230+
task_id, _TASK_PREFETCH_TIME, metrics_attributes
231+
)
232+
self.time_tracker.record_start(task_id, _TASK_PROCESSING_TIME)
233+
171234
request = task.request
172235
tracectx = extract(request, getter=celery_getter) or None
173236
token = context_api.attach(tracectx) if tracectx is not None else None
174237

175-
logger.debug("prerun signal start task_id=%s", task_id)
176-
177238
operation_name = f"{_TASK_RUN}/{task.name}"
178239
span = self._tracer.start_span(
179240
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
@@ -183,14 +244,24 @@ def _trace_prerun(self, *args, **kwargs):
183244
activation.__enter__() # pylint: disable=E1101
184245
utils.attach_context(task, task_id, span, activation, token)
185246

247+
self.metrics[_TASK_COUNT_ACTIVE].add(1, attributes=metrics_attributes)
248+
186249
def _trace_postrun(self, *args, **kwargs):
250+
"""
251+
On postrun signal, task is no longer being executed
252+
"""
253+
187254
task = utils.retrieve_task(kwargs)
188255
task_id = utils.retrieve_task_id(kwargs)
189256

190257
if task is None or task_id is None:
191258
return
192259

193-
logger.debug("postrun signal task_id=%s", task_id)
260+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
261+
self.metrics[_TASK_COUNT_ACTIVE].add(-1, attributes=metrics_attributes)
262+
self.time_tracker.record_finish(
263+
task_id, _TASK_PROCESSING_TIME, metrics_attributes
264+
)
194265

195266
# retrieve and finish the Span
196267
ctx = utils.retrieve_context(task, task_id)
@@ -210,10 +281,8 @@ def _trace_postrun(self, *args, **kwargs):
210281

211282
activation.__exit__(None, None, None)
212283
utils.detach_context(task, task_id)
213-
self.update_task_duration_time(task_id)
214-
labels = {"task": task.name, "worker": task.request.hostname}
215-
self._record_histograms(task_id, labels)
216-
# if the process sending the task is not instrumented
284+
285+
# If the process sending the task is not instrumented,
217286
# there's no incoming context and no token to detach
218287
if token is not None:
219288
context_api.detach(token)
@@ -345,29 +414,29 @@ def _trace_retry(*args, **kwargs):
345414
# something that isn't an `Exception`
346415
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
347416

348-
def update_task_duration_time(self, task_id):
349-
cur_time = default_timer()
350-
task_duration_time_until_now = (
351-
cur_time - self.task_id_to_start_time[task_id]
352-
if task_id in self.task_id_to_start_time
353-
else cur_time
354-
)
355-
self.task_id_to_start_time[task_id] = task_duration_time_until_now
356-
357-
def _record_histograms(self, task_id, metric_attributes):
358-
if task_id is None:
359-
return
360417

361-
self.metrics["flower.task.runtime.seconds"].record(
362-
self.task_id_to_start_time.get(task_id),
363-
attributes=metric_attributes,
364-
)
365-
366-
def create_celery_metrics(self, meter) -> None:
367-
self.metrics = {
368-
"flower.task.runtime.seconds": meter.create_histogram(
369-
name="flower.task.runtime.seconds",
370-
unit="seconds",
371-
description="The time it took to run the task.",
372-
)
373-
}
418+
def _create_celery_worker_metrics(meter) -> None:
419+
metrics = {
420+
_TASK_COUNT_ACTIVE: meter.create_up_down_counter(
421+
name=_TASK_COUNT_ACTIVE,
422+
unit="{message}",
423+
description="Number of tasks currently being executed by the worker",
424+
),
425+
_TASK_COUNT_PREFETCHED: meter.create_up_down_counter(
426+
name=_TASK_COUNT_PREFETCHED,
427+
unit="{message}",
428+
description="Number of tasks prefetched by the worker",
429+
),
430+
_TASK_PREFETCH_TIME: meter.create_histogram(
431+
name=_TASK_PREFETCH_TIME,
432+
unit="s",
433+
description="The time the task spent in prefetch mode",
434+
),
435+
_TASK_PROCESSING_TIME: meter.create_histogram(
436+
name=_TASK_PROCESSING_TIME,
437+
unit="s",
438+
description="The time it took to run the task.",
439+
),
440+
}
441+
442+
return metrics

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# type: ignore
2+
13
# Copyright The OpenTelemetry Authors
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,6 +22,10 @@
2022
from celery import registry # pylint: disable=no-name-in-module
2123
from celery.app.task import Task
2224

25+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
26+
MESSAGING_CLIENT_ID,
27+
MESSAGING_OPERATION_NAME,
28+
)
2329
from opentelemetry.semconv.trace import SpanAttributes
2430
from opentelemetry.trace import Span
2531

@@ -217,6 +223,14 @@ def retrieve_task_id(kwargs):
217223
return task_id
218224

219225

226+
def retrieve_request(kwargs):
227+
request = kwargs.get("request")
228+
if request is None:
229+
logger.debug("Unable to retrieve the request from signal arguments")
230+
231+
return request
232+
233+
220234
def retrieve_task_id_from_request(kwargs):
221235
# retry signal does not include task_id as argument so use request argument
222236
request = kwargs.get("request")
@@ -250,3 +264,17 @@ def retrieve_reason(kwargs):
250264
if not reason:
251265
logger.debug("Unable to retrieve the retry reason")
252266
return reason
267+
268+
269+
def get_metrics_attributes_from_request(request):
270+
return {
271+
MESSAGING_OPERATION_NAME: request.task.name,
272+
MESSAGING_CLIENT_ID: request.hostname,
273+
}
274+
275+
276+
def get_metrics_attributes_from_task(task):
277+
return {
278+
MESSAGING_OPERATION_NAME: task.name,
279+
MESSAGING_CLIENT_ID: task.request.hostname,
280+
}

instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# type: ignore
2+
13
# Copyright The OpenTelemetry Authors
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,14 +14,18 @@
1214
# See the License for the specific language governing permissions and
1315
# limitations under the License.
1416

17+
import time
18+
1519
from celery import Celery
1620

1721
from opentelemetry import baggage
1822

1923

2024
class Config:
21-
result_backend = "rpc"
22-
broker_backend = "memory"
25+
result_backend = "rpc://"
26+
without_gossip = True
27+
without_heartbeat = True
28+
without_mingle = True
2329

2430

2531
app = Celery(broker="memory:///")
@@ -31,8 +37,14 @@ class CustomError(Exception):
3137

3238

3339
@app.task
34-
def task_add(num_a, num_b):
35-
return num_a + num_b
40+
def task_add(x=1, y=2):
41+
return x + y
42+
43+
44+
@app.task
45+
def task_sleep(sleep_time):
46+
time.sleep(sleep_time)
47+
return 1
3648

3749

3850
@app.task

instrumentation/opentelemetry-instrumentation-celery/tests/test_duplicate.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ def test_duplicate_instrumentaion(self):
2626
CeleryInstrumentor().uninstrument()
2727
self.assertIsNotNone(first.metrics)
2828
self.assertIsNotNone(second.metrics)
29-
self.assertEqual(first.task_id_to_start_time, {})
30-
self.assertEqual(second.task_id_to_start_time, {})

0 commit comments

Comments
 (0)