@@ -60,7 +60,7 @@ def add(x, y):
6060"""
6161
6262import logging
63- from timeit import default_timer
63+ import time
6464from typing import Collection , Iterable
6565
6666from billiard import VERSION
@@ -76,6 +76,7 @@ def add(x, y):
7676from opentelemetry .metrics import get_meter
7777from opentelemetry .propagate import extract , inject
7878from opentelemetry .propagators .textmap import Getter
79+ from opentelemetry .semconv ._incubating .metrics import messaging_metrics
7980from opentelemetry .semconv .trace import SpanAttributes
8081from opentelemetry .trace .status import Status , StatusCode
8182
@@ -96,6 +97,12 @@ def add(x, y):
9697_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
9798_TASK_NAME_KEY = "celery.task_name"
9899
100+ # Metric names
101+ _TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
102+ _TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
103+ _TASK_PROCESSING_TIME = messaging_metrics .MESSAGING_PROCESS_DURATION
104+ _TASK_PREFETCH_TIME = "messaging.prefetch.duration"
105+
99106
100107class CeleryGetter (Getter ):
101108 def get (self , carrier , key ):
@@ -113,10 +120,36 @@ def keys(self, carrier):
113120celery_getter = CeleryGetter ()
114121
115122
116- class CeleryInstrumentor (BaseInstrumentor ):
117- metrics = None
118- task_id_to_start_time = {}
123+ class TaskDurationTracker :
124+ def __init__ (self , metrics ):
125+ self .metrics = metrics
126+ self .tracker = {}
127+
128+ def record_start (self , key , step ):
129+ self .tracker .setdefault (key , {})[step ] = time .perf_counter ()
130+
131+ def record_finish (self , key , metric_name , attributes ):
132+ try :
133+ time_elapsed = self ._time_elapsed (key , metric_name )
134+ self .metrics [metric_name ].record (
135+ max (0 , time_elapsed ), attributes = attributes
136+ )
137+ except KeyError :
138+ logger .warning ("Failed to record %s for task %s" , metric_name , key )
139+
140+ def _time_elapsed (self , key , step ):
141+ end_time = time .perf_counter ()
142+ try :
143+ start_time = self .tracker .get (key , {}).pop (step )
144+ time_elapsed = end_time - start_time
145+ return time_elapsed
146+ finally :
147+ # Cleanup operation
148+ if key in self .tracker and not self .tracker .get (key ):
149+ self .tracker .pop (key )
150+
119151
152+ class CeleryInstrumentor (BaseInstrumentor ):
120153 def instrumentation_dependencies (self ) -> Collection [str ]:
121154 return _instruments
122155
@@ -139,8 +172,10 @@ def _instrument(self, **kwargs):
139172 schema_url = "https://opentelemetry.io/schemas/1.11.0" ,
140173 )
141174
142- self .create_celery_metrics (meter )
175+ self .metrics = _create_celery_worker_metrics (meter )
176+ self .time_tracker = TaskDurationTracker (self .metrics )
143177
178+ signals .task_received .connect (self ._trace_received , weak = False )
144179 signals .task_prerun .connect (self ._trace_prerun , weak = False )
145180 signals .task_postrun .connect (self ._trace_postrun , weak = False )
146181 signals .before_task_publish .connect (
@@ -153,27 +188,52 @@ def _instrument(self, **kwargs):
153188 signals .task_retry .connect (self ._trace_retry , weak = False )
154189
155190 def _uninstrument (self , ** kwargs ):
191+ signals .task_received .disconnect (self ._trace_received )
156192 signals .task_prerun .disconnect (self ._trace_prerun )
157193 signals .task_postrun .disconnect (self ._trace_postrun )
158194 signals .before_task_publish .disconnect (self ._trace_before_publish )
159195 signals .after_task_publish .disconnect (self ._trace_after_publish )
160196 signals .task_failure .disconnect (self ._trace_failure )
161197 signals .task_retry .disconnect (self ._trace_retry )
162198
199+ def _trace_received (self , * args , ** kwargs ):
200+ """
201+ On prerun signal, task is prefetched and prefetch timer starts
202+ """
203+
204+ request = utils .retrieve_request (kwargs )
205+
206+ metrics_attributes = utils .get_metrics_attributes_from_request (request )
207+ self .metrics [_TASK_COUNT_PREFETCHED ].add (
208+ 1 , attributes = metrics_attributes
209+ )
210+ self .time_tracker .record_start (request .task_id , _TASK_PREFETCH_TIME )
211+
163212 def _trace_prerun (self , * args , ** kwargs ):
213+ """
214+ On prerun signal, task is no longer prefetched, and execution timer
215+ starts along with the task span
216+ """
217+
164218 task = utils .retrieve_task (kwargs )
165219 task_id = utils .retrieve_task_id (kwargs )
166220
167221 if task is None or task_id is None :
168222 return
169223
170- self .update_task_duration_time (task_id )
224+ metrics_attributes = utils .get_metrics_attributes_from_task (task )
225+ self .metrics [_TASK_COUNT_PREFETCHED ].add (
226+ - 1 , attributes = metrics_attributes
227+ )
228+ self .time_tracker .record_finish (
229+ task_id , _TASK_PREFETCH_TIME , metrics_attributes
230+ )
231+ self .time_tracker .record_start (task_id , _TASK_PROCESSING_TIME )
232+
171233 request = task .request
172234 tracectx = extract (request , getter = celery_getter ) or None
173235 token = context_api .attach (tracectx ) if tracectx is not None else None
174236
175- logger .debug ("prerun signal start task_id=%s" , task_id )
176-
177237 operation_name = f"{ _TASK_RUN } /{ task .name } "
178238 span = self ._tracer .start_span (
179239 operation_name , context = tracectx , kind = trace .SpanKind .CONSUMER
@@ -183,14 +243,24 @@ def _trace_prerun(self, *args, **kwargs):
183243 activation .__enter__ () # pylint: disable=E1101
184244 utils .attach_context (task , task_id , span , activation , token )
185245
246+ self .metrics [_TASK_COUNT_ACTIVE ].add (1 , attributes = metrics_attributes )
247+
186248 def _trace_postrun (self , * args , ** kwargs ):
249+ """
250+ On postrun signal, task is no longer being executed
251+ """
252+
187253 task = utils .retrieve_task (kwargs )
188254 task_id = utils .retrieve_task_id (kwargs )
189255
190256 if task is None or task_id is None :
191257 return
192258
193- logger .debug ("postrun signal task_id=%s" , task_id )
259+ metrics_attributes = utils .get_metrics_attributes_from_task (task )
260+ self .metrics [_TASK_COUNT_ACTIVE ].add (- 1 , attributes = metrics_attributes )
261+ self .time_tracker .record_finish (
262+ task_id , _TASK_PROCESSING_TIME , metrics_attributes
263+ )
194264
195265 # retrieve and finish the Span
196266 ctx = utils .retrieve_context (task , task_id )
@@ -210,10 +280,8 @@ def _trace_postrun(self, *args, **kwargs):
210280
211281 activation .__exit__ (None , None , None )
212282 utils .detach_context (task , task_id )
213- self .update_task_duration_time (task_id )
214- labels = {"task" : task .name , "worker" : task .request .hostname }
215- self ._record_histograms (task_id , labels )
216- # if the process sending the task is not instrumented
283+
284+ # If the process sending the task is not instrumented,
217285 # there's no incoming context and no token to detach
218286 if token is not None :
219287 context_api .detach (token )
@@ -345,29 +413,29 @@ def _trace_retry(*args, **kwargs):
345413 # something that isn't an `Exception`
346414 span .set_attribute (_TASK_RETRY_REASON_KEY , str (reason ))
347415
348- def update_task_duration_time (self , task_id ):
349- cur_time = default_timer ()
350- task_duration_time_until_now = (
351- cur_time - self .task_id_to_start_time [task_id ]
352- if task_id in self .task_id_to_start_time
353- else cur_time
354- )
355- self .task_id_to_start_time [task_id ] = task_duration_time_until_now
356-
357- def _record_histograms (self , task_id , metric_attributes ):
358- if task_id is None :
359- return
360416
361- self .metrics ["flower.task.runtime.seconds" ].record (
362- self .task_id_to_start_time .get (task_id ),
363- attributes = metric_attributes ,
364- )
365-
366- def create_celery_metrics (self , meter ) -> None :
367- self .metrics = {
368- "flower.task.runtime.seconds" : meter .create_histogram (
369- name = "flower.task.runtime.seconds" ,
370- unit = "seconds" ,
371- description = "The time it took to run the task." ,
372- )
373- }
417+ def _create_celery_worker_metrics (meter ) -> None :
418+ metrics = {
419+ _TASK_COUNT_ACTIVE : meter .create_up_down_counter (
420+ name = _TASK_COUNT_ACTIVE ,
421+ unit = "{message}" ,
422+ description = "Number of tasks currently being executed by the worker" ,
423+ ),
424+ _TASK_COUNT_PREFETCHED : meter .create_up_down_counter (
425+ name = _TASK_COUNT_PREFETCHED ,
426+ unit = "{message}" ,
427+ description = "Number of tasks prefetched by the worker" ,
428+ ),
429+ _TASK_PREFETCH_TIME : meter .create_histogram (
430+ name = _TASK_PREFETCH_TIME ,
431+ unit = "s" ,
432+ description = "The time the task spent in prefetch mode" ,
433+ ),
434+ _TASK_PROCESSING_TIME : meter .create_histogram (
435+ name = _TASK_PROCESSING_TIME ,
436+ unit = "s" ,
437+ description = "The time it took to run the task." ,
438+ ),
439+ }
440+
441+ return metrics
0 commit comments