1+ # type: ignore 
12# Copyright The OpenTelemetry Authors 
23# 
34# Licensed under the Apache License, Version 2.0 (the "License"); 
@@ -60,7 +61,7 @@ def add(x, y):
6061""" 
6162
6263import  logging 
63- from   timeit   import  default_timer 
64+ import  time 
6465from  typing  import  Collection , Iterable 
6566
6667from  billiard  import  VERSION 
@@ -76,6 +77,7 @@ def add(x, y):
7677from  opentelemetry .metrics  import  get_meter 
7778from  opentelemetry .propagate  import  extract , inject 
7879from  opentelemetry .propagators .textmap  import  Getter 
80+ from  opentelemetry .semconv ._incubating .metrics  import  messaging_metrics 
7981from  opentelemetry .semconv .trace  import  SpanAttributes 
8082from  opentelemetry .trace .status  import  Status , StatusCode 
8183
@@ -96,6 +98,12 @@ def add(x, y):
9698_TASK_REVOKED_TERMINATED_SIGNAL_KEY  =  "celery.terminated.signal" 
9799_TASK_NAME_KEY  =  "celery.task_name" 
98100
101+ # Metric names 
102+ _TASK_COUNT_ACTIVE  =  "messaging.client.active_tasks" 
103+ _TASK_COUNT_PREFETCHED  =  "messaging.client.prefetched_tasks" 
104+ _TASK_PROCESSING_TIME  =  messaging_metrics .MESSAGING_PROCESS_DURATION 
105+ _TASK_PREFETCH_TIME  =  "messaging.prefetch.duration" 
106+ 
99107
100108class  CeleryGetter (Getter ):
101109    def  get (self , carrier , key ):
@@ -113,10 +121,36 @@ def keys(self, carrier):
113121celery_getter  =  CeleryGetter ()
114122
115123
116- class  CeleryInstrumentor (BaseInstrumentor ):
117-     metrics  =  None 
118-     task_id_to_start_time  =  {}
124+ class  TaskDurationTracker :
125+     def  __init__ (self , metrics ):
126+         self .metrics  =  metrics 
127+         self .tracker  =  {}
128+ 
129+     def  record_start (self , key , step ):
130+         self .tracker .setdefault (key , {})[step ] =  time .perf_counter ()
131+ 
132+     def  record_finish (self , key , metric_name , attributes ):
133+         try :
134+             time_elapsed  =  self ._time_elapsed (key , metric_name )
135+             self .metrics [metric_name ].record (
136+                 max (0 , time_elapsed ), attributes = attributes 
137+             )
138+         except  KeyError :
139+             logger .warning ("Failed to record %s for task %s" , metric_name , key )
140+ 
141+     def  _time_elapsed (self , key , step ):
142+         end_time  =  time .perf_counter ()
143+         try :
144+             start_time  =  self .tracker .get (key , {}).pop (step )
145+             time_elapsed  =  end_time  -  start_time 
146+             return  time_elapsed 
147+         finally :
148+             # Cleanup operation 
149+             if  key  in  self .tracker  and  not  self .tracker .get (key ):
150+                 self .tracker .pop (key )
151+ 
119152
153+ class  CeleryInstrumentor (BaseInstrumentor ):
120154    def  instrumentation_dependencies (self ) ->  Collection [str ]:
121155        return  _instruments 
122156
@@ -139,8 +173,10 @@ def _instrument(self, **kwargs):
139173            schema_url = "https://opentelemetry.io/schemas/1.11.0" ,
140174        )
141175
142-         self .create_celery_metrics (meter )
176+         self .metrics  =  _create_celery_worker_metrics (meter )
177+         self .time_tracker  =  TaskDurationTracker (self .metrics )
143178
179+         signals .task_received .connect (self ._trace_received , weak = False )
144180        signals .task_prerun .connect (self ._trace_prerun , weak = False )
145181        signals .task_postrun .connect (self ._trace_postrun , weak = False )
146182        signals .before_task_publish .connect (
@@ -153,27 +189,52 @@ def _instrument(self, **kwargs):
153189        signals .task_retry .connect (self ._trace_retry , weak = False )
154190
155191    def  _uninstrument (self , ** kwargs ):
192+         signals .task_received .disconnect (self ._trace_received )
156193        signals .task_prerun .disconnect (self ._trace_prerun )
157194        signals .task_postrun .disconnect (self ._trace_postrun )
158195        signals .before_task_publish .disconnect (self ._trace_before_publish )
159196        signals .after_task_publish .disconnect (self ._trace_after_publish )
160197        signals .task_failure .disconnect (self ._trace_failure )
161198        signals .task_retry .disconnect (self ._trace_retry )
162199
200+     def  _trace_received (self , * args , ** kwargs ):
201+         """ 
202+         On prerun signal, task is prefetched and prefetch timer starts 
203+         """ 
204+ 
205+         request  =  utils .retrieve_request (kwargs )
206+ 
207+         metrics_attributes  =  utils .get_metrics_attributes_from_request (request )
208+         self .metrics [_TASK_COUNT_PREFETCHED ].add (
209+             1 , attributes = metrics_attributes 
210+         )
211+         self .time_tracker .record_start (request .task_id , _TASK_PREFETCH_TIME )
212+ 
163213    def  _trace_prerun (self , * args , ** kwargs ):
214+         """ 
215+         On prerun signal, task is no longer prefetched, and execution timer 
216+         starts along with the task span 
217+         """ 
218+ 
164219        task  =  utils .retrieve_task (kwargs )
165220        task_id  =  utils .retrieve_task_id (kwargs )
166221
167222        if  task  is  None  or  task_id  is  None :
168223            return 
169224
170-         self .update_task_duration_time (task_id )
225+         metrics_attributes  =  utils .get_metrics_attributes_from_task (task )
226+         self .metrics [_TASK_COUNT_PREFETCHED ].add (
227+             - 1 , attributes = metrics_attributes 
228+         )
229+         self .time_tracker .record_finish (
230+             task_id , _TASK_PREFETCH_TIME , metrics_attributes 
231+         )
232+         self .time_tracker .record_start (task_id , _TASK_PROCESSING_TIME )
233+ 
171234        request  =  task .request 
172235        tracectx  =  extract (request , getter = celery_getter ) or  None 
173236        token  =  context_api .attach (tracectx ) if  tracectx  is  not   None  else  None 
174237
175-         logger .debug ("prerun signal start task_id=%s" , task_id )
176- 
177238        operation_name  =  f"{ _TASK_RUN }  /{ task .name }  " 
178239        span  =  self ._tracer .start_span (
179240            operation_name , context = tracectx , kind = trace .SpanKind .CONSUMER 
@@ -183,14 +244,24 @@ def _trace_prerun(self, *args, **kwargs):
183244        activation .__enter__ ()  # pylint: disable=E1101 
184245        utils .attach_context (task , task_id , span , activation , token )
185246
247+         self .metrics [_TASK_COUNT_ACTIVE ].add (1 , attributes = metrics_attributes )
248+ 
186249    def  _trace_postrun (self , * args , ** kwargs ):
250+         """ 
251+         On postrun signal, task is no longer being executed 
252+         """ 
253+ 
187254        task  =  utils .retrieve_task (kwargs )
188255        task_id  =  utils .retrieve_task_id (kwargs )
189256
190257        if  task  is  None  or  task_id  is  None :
191258            return 
192259
193-         logger .debug ("postrun signal task_id=%s" , task_id )
260+         metrics_attributes  =  utils .get_metrics_attributes_from_task (task )
261+         self .metrics [_TASK_COUNT_ACTIVE ].add (- 1 , attributes = metrics_attributes )
262+         self .time_tracker .record_finish (
263+             task_id , _TASK_PROCESSING_TIME , metrics_attributes 
264+         )
194265
195266        # retrieve and finish the Span 
196267        ctx  =  utils .retrieve_context (task , task_id )
@@ -210,10 +281,8 @@ def _trace_postrun(self, *args, **kwargs):
210281
211282        activation .__exit__ (None , None , None )
212283        utils .detach_context (task , task_id )
213-         self .update_task_duration_time (task_id )
214-         labels  =  {"task" : task .name , "worker" : task .request .hostname }
215-         self ._record_histograms (task_id , labels )
216-         # if the process sending the task is not instrumented 
284+ 
285+         # If the process sending the task is not instrumented, 
217286        # there's no incoming context and no token to detach 
218287        if  token  is  not   None :
219288            context_api .detach (token )
@@ -345,29 +414,29 @@ def _trace_retry(*args, **kwargs):
345414        # something that isn't an `Exception` 
346415        span .set_attribute (_TASK_RETRY_REASON_KEY , str (reason ))
347416
348-     def  update_task_duration_time (self , task_id ):
349-         cur_time  =  default_timer ()
350-         task_duration_time_until_now  =  (
351-             cur_time  -  self .task_id_to_start_time [task_id ]
352-             if  task_id  in  self .task_id_to_start_time 
353-             else  cur_time 
354-         )
355-         self .task_id_to_start_time [task_id ] =  task_duration_time_until_now 
356- 
357-     def  _record_histograms (self , task_id , metric_attributes ):
358-         if  task_id  is  None :
359-             return 
360417
361-         self .metrics ["flower.task.runtime.seconds" ].record (
362-             self .task_id_to_start_time .get (task_id ),
363-             attributes = metric_attributes ,
364-         )
365- 
366-     def  create_celery_metrics (self , meter ) ->  None :
367-         self .metrics  =  {
368-             "flower.task.runtime.seconds" : meter .create_histogram (
369-                 name = "flower.task.runtime.seconds" ,
370-                 unit = "seconds" ,
371-                 description = "The time it took to run the task." ,
372-             )
373-         }
418+ def  _create_celery_worker_metrics (meter ) ->  None :
419+     metrics  =  {
420+         _TASK_COUNT_ACTIVE : meter .create_up_down_counter (
421+             name = _TASK_COUNT_ACTIVE ,
422+             unit = "{message}" ,
423+             description = "Number of tasks currently being executed by the worker" ,
424+         ),
425+         _TASK_COUNT_PREFETCHED : meter .create_up_down_counter (
426+             name = _TASK_COUNT_PREFETCHED ,
427+             unit = "{message}" ,
428+             description = "Number of tasks prefetched by the worker" ,
429+         ),
430+         _TASK_PREFETCH_TIME : meter .create_histogram (
431+             name = _TASK_PREFETCH_TIME ,
432+             unit = "s" ,
433+             description = "The time the task spent in prefetch mode" ,
434+         ),
435+         _TASK_PROCESSING_TIME : meter .create_histogram (
436+             name = _TASK_PROCESSING_TIME ,
437+             unit = "s" ,
438+             description = "The time it took to run the task." ,
439+         ),
440+     }
441+ 
442+     return  metrics 
0 commit comments