1+ # type: ignore
12# Copyright The OpenTelemetry Authors
23#
34# Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,7 +61,7 @@ def add(x, y):
6061"""
6162
6263import logging
63- from timeit import default_timer
64+ import time
6465from typing import Collection , Iterable
6566
6667from billiard import VERSION
@@ -76,6 +77,7 @@ def add(x, y):
7677from opentelemetry .metrics import get_meter
7778from opentelemetry .propagate import extract , inject
7879from opentelemetry .propagators .textmap import Getter
80+ from opentelemetry .semconv ._incubating .metrics import messaging_metrics
7981from opentelemetry .semconv .trace import SpanAttributes
8082from opentelemetry .trace .status import Status , StatusCode
8183
@@ -96,6 +98,12 @@ def add(x, y):
9698_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
9799_TASK_NAME_KEY = "celery.task_name"
98100
101+ # Metric names
102+ _TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
103+ _TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
104+ _TASK_PROCESSING_TIME = messaging_metrics .MESSAGING_PROCESS_DURATION
105+ _TASK_PREFETCH_TIME = "messaging.prefetch.duration"
106+
99107
100108class CeleryGetter (Getter ):
101109 def get (self , carrier , key ):
@@ -113,9 +121,37 @@ def keys(self, carrier):
113121celery_getter = CeleryGetter ()
114122
115123
124+ class TaskDurationTracker :
125+ def __init__ (self , metrics ):
126+ self .metrics = metrics
127+ self .tracker = {}
128+
129+ def record_start (self , key , step ):
130+ self .tracker .setdefault (key , {})[step ] = time .perf_counter ()
131+
132+ def record_finish (self , key , metric_name , attributes ):
133+
134+ try :
135+ time_elapsed = self ._time_elapsed (key , metric_name )
136+ self .metrics [metric_name ].record (
137+ max (0 , time_elapsed ), attributes = attributes
138+ )
139+ except KeyError :
140+ logger .warning ("Failed to record %s for task %s" , metric_name , key )
141+
142+ def _time_elapsed (self , key , step ):
143+ end_time = time .perf_counter ()
144+ try :
145+ start_time = self .tracker .get (key , {}).pop (step )
146+ time_elapsed = end_time - start_time
147+ return time_elapsed
148+ finally :
149+ # Cleanup operation
150+ if key in self .tracker and not self .tracker .get (key ):
151+ self .tracker .pop (key )
152+
153+
116154class CeleryInstrumentor (BaseInstrumentor ):
117- metrics = None
118- task_id_to_start_time = {}
119155
120156 def instrumentation_dependencies (self ) -> Collection [str ]:
121157 return _instruments
@@ -139,41 +175,60 @@ def _instrument(self, **kwargs):
139175 schema_url = "https://opentelemetry.io/schemas/1.11.0" ,
140176 )
141177
142- self .create_celery_metrics (meter )
178+ self .metrics = _create_celery_worker_metrics (meter )
179+ self .time_tracker = TaskDurationTracker (self .metrics )
143180
181+ signals .task_received .connect (self ._trace_received , weak = False )
144182 signals .task_prerun .connect (self ._trace_prerun , weak = False )
145183 signals .task_postrun .connect (self ._trace_postrun , weak = False )
146- signals .before_task_publish .connect (
147- self ._trace_before_publish , weak = False
148- )
149- signals .after_task_publish .connect (
150- self ._trace_after_publish , weak = False
151- )
184+ signals .before_task_publish .connect (self ._trace_before_publish , weak = False )
185+ signals .after_task_publish .connect (self ._trace_after_publish , weak = False )
152186 signals .task_failure .connect (self ._trace_failure , weak = False )
153187 signals .task_retry .connect (self ._trace_retry , weak = False )
154188
155189 def _uninstrument (self , ** kwargs ):
190+ signals .task_received .disconnect (self ._trace_received )
156191 signals .task_prerun .disconnect (self ._trace_prerun )
157192 signals .task_postrun .disconnect (self ._trace_postrun )
158193 signals .before_task_publish .disconnect (self ._trace_before_publish )
159194 signals .after_task_publish .disconnect (self ._trace_after_publish )
160195 signals .task_failure .disconnect (self ._trace_failure )
161196 signals .task_retry .disconnect (self ._trace_retry )
162197
198+ def _trace_received (self , * args , ** kwargs ):
199+ """
200+ On prerun signal, task is prefetched and prefetch timer starts
201+ """
202+
203+ request = utils .retrieve_task_from_request (kwargs )
204+
205+ metrics_attributes = utils .get_metrics_attributes_from_request (request )
206+ self .metrics [_TASK_COUNT_PREFETCHED ].add (1 , attributes = metrics_attributes )
207+ self .time_tracker .record_start (request .task_id , _TASK_PREFETCH_TIME )
208+
163209 def _trace_prerun (self , * args , ** kwargs ):
210+ """
211+ On prerun signal, task is no longer prefetched, and execution timer
212+ starts along with the task span
213+ """
214+
164215 task = utils .retrieve_task (kwargs )
165216 task_id = utils .retrieve_task_id (kwargs )
166217
167218 if task is None or task_id is None :
168219 return
169220
170- self .update_task_duration_time (task_id )
221+ metrics_attributes = utils .get_metrics_attributes_from_task (task )
222+ self .metrics [_TASK_COUNT_PREFETCHED ].add (- 1 , attributes = metrics_attributes )
223+ self .time_tracker .record_finish (
224+ task_id , _TASK_PREFETCH_TIME , metrics_attributes
225+ )
226+ self .time_tracker .record_start (task_id , _TASK_PROCESSING_TIME )
227+
171228 request = task .request
172229 tracectx = extract (request , getter = celery_getter ) or None
173230 token = context_api .attach (tracectx ) if tracectx is not None else None
174231
175- logger .debug ("prerun signal start task_id=%s" , task_id )
176-
177232 operation_name = f"{ _TASK_RUN } /{ task .name } "
178233 span = self ._tracer .start_span (
179234 operation_name , context = tracectx , kind = trace .SpanKind .CONSUMER
@@ -183,14 +238,24 @@ def _trace_prerun(self, *args, **kwargs):
183238 activation .__enter__ () # pylint: disable=E1101
184239 utils .attach_context (task , task_id , span , activation , token )
185240
241+ self .metrics [_TASK_COUNT_ACTIVE ].add (1 , attributes = metrics_attributes )
242+
186243 def _trace_postrun (self , * args , ** kwargs ):
244+ """
245+ On postrun signal, task is no longer being executed
246+ """
247+
187248 task = utils .retrieve_task (kwargs )
188249 task_id = utils .retrieve_task_id (kwargs )
189250
190251 if task is None or task_id is None :
191252 return
192253
193- logger .debug ("postrun signal task_id=%s" , task_id )
254+ metrics_attributes = utils .get_metrics_attributes_from_task (task )
255+ self .metrics [_TASK_COUNT_ACTIVE ].add (- 1 , attributes = metrics_attributes )
256+ self .time_tracker .record_finish (
257+ task_id , _TASK_PROCESSING_TIME , metrics_attributes
258+ )
194259
195260 # retrieve and finish the Span
196261 ctx = utils .retrieve_context (task , task_id )
@@ -210,10 +275,8 @@ def _trace_postrun(self, *args, **kwargs):
210275
211276 activation .__exit__ (None , None , None )
212277 utils .detach_context (task , task_id )
213- self .update_task_duration_time (task_id )
214- labels = {"task" : task .name , "worker" : task .request .hostname }
215- self ._record_histograms (task_id , labels )
216- # if the process sending the task is not instrumented
278+
279+ # If the process sending the task is not instrumented,
217280 # there's no incoming context and no token to detach
218281 if token is not None :
219282 context_api .detach (token )
@@ -345,29 +408,30 @@ def _trace_retry(*args, **kwargs):
345408 # something that isn't an `Exception`
346409 span .set_attribute (_TASK_RETRY_REASON_KEY , str (reason ))
347410
348- def update_task_duration_time (self , task_id ):
349- cur_time = default_timer ()
350- task_duration_time_until_now = (
351- cur_time - self .task_id_to_start_time [task_id ]
352- if task_id in self .task_id_to_start_time
353- else cur_time
354- )
355- self .task_id_to_start_time [task_id ] = task_duration_time_until_now
356-
357- def _record_histograms (self , task_id , metric_attributes ):
358- if task_id is None :
359- return
360411
361- self .metrics ["flower.task.runtime.seconds" ].record (
362- self .task_id_to_start_time .get (task_id ),
363- attributes = metric_attributes ,
364- )
365-
366- def create_celery_metrics (self , meter ) -> None :
367- self .metrics = {
368- "flower.task.runtime.seconds" : meter .create_histogram (
369- name = "flower.task.runtime.seconds" ,
370- unit = "seconds" ,
371- description = "The time it took to run the task." ,
372- )
373- }
412+ def _create_celery_worker_metrics (meter ) -> None :
413+
414+ metrics = {
415+ _TASK_COUNT_ACTIVE : meter .create_up_down_counter (
416+ name = _TASK_COUNT_ACTIVE ,
417+ unit = "{message}" ,
418+ description = "Number of tasks currently being executed by the worker" ,
419+ ),
420+ _TASK_COUNT_PREFETCHED : meter .create_up_down_counter (
421+ name = _TASK_COUNT_PREFETCHED ,
422+ unit = "{message}" ,
423+ description = "Number of tasks prefetched by the worker" ,
424+ ),
425+ _TASK_PREFETCH_TIME : meter .create_histogram (
426+ name = _TASK_PREFETCH_TIME ,
427+ unit = "s" ,
428+ description = "The time the task spent in prefetch mode" ,
429+ ),
430+ _TASK_PROCESSING_TIME : meter .create_histogram (
431+ name = _TASK_PROCESSING_TIME ,
432+ unit = "s" ,
433+ description = "The time it took to run the task." ,
434+ ),
435+ }
436+
437+ return metrics
0 commit comments