@@ -60,7 +60,7 @@ def add(x, y):
6060""" 
6161
6262import  logging 
63- from   timeit   import  default_timer 
63+ import  time 
6464from  typing  import  Collection , Iterable 
6565
6666from  billiard  import  VERSION 
@@ -76,6 +76,7 @@ def add(x, y):
7676from  opentelemetry .metrics  import  get_meter 
7777from  opentelemetry .propagate  import  extract , inject 
7878from  opentelemetry .propagators .textmap  import  Getter 
79+ from  opentelemetry .semconv ._incubating .metrics  import  messaging_metrics 
7980from  opentelemetry .semconv .trace  import  SpanAttributes 
8081from  opentelemetry .trace .status  import  Status , StatusCode 
8182
@@ -96,6 +97,12 @@ def add(x, y):
9697_TASK_REVOKED_TERMINATED_SIGNAL_KEY  =  "celery.terminated.signal" 
9798_TASK_NAME_KEY  =  "celery.task_name" 
9899
100+ # Metric names 
101+ _TASK_COUNT_ACTIVE  =  "messaging.client.active_tasks" 
102+ _TASK_COUNT_PREFETCHED  =  "messaging.client.prefetched_tasks" 
103+ _TASK_PROCESSING_TIME  =  messaging_metrics .MESSAGING_PROCESS_DURATION 
104+ _TASK_PREFETCH_TIME  =  "messaging.prefetch.duration" 
105+ 
99106
100107class  CeleryGetter (Getter ):
101108    def  get (self , carrier , key ):
@@ -113,10 +120,36 @@ def keys(self, carrier):
113120celery_getter  =  CeleryGetter ()
114121
115122
116- class  CeleryInstrumentor (BaseInstrumentor ):
117-     metrics  =  None 
118-     task_id_to_start_time  =  {}
123+ class  TaskDurationTracker :
124+     def  __init__ (self , metrics ):
125+         self .metrics  =  metrics 
126+         self .tracker  =  {}
127+ 
128+     def  record_start (self , key , step ):
129+         self .tracker .setdefault (key , {})[step ] =  time .perf_counter ()
130+ 
131+     def  record_finish (self , key , metric_name , attributes ):
132+         try :
133+             time_elapsed  =  self ._time_elapsed (key , metric_name )
134+             self .metrics [metric_name ].record (
135+                 max (0 , time_elapsed ), attributes = attributes 
136+             )
137+         except  KeyError :
138+             logger .warning ("Failed to record %s for task %s" , metric_name , key )
139+ 
140+     def  _time_elapsed (self , key , step ):
141+         end_time  =  time .perf_counter ()
142+         try :
143+             start_time  =  self .tracker .get (key , {}).pop (step )
144+             time_elapsed  =  end_time  -  start_time 
145+             return  time_elapsed 
146+         finally :
147+             # Cleanup operation 
148+             if  key  in  self .tracker  and  not  self .tracker .get (key ):
149+                 self .tracker .pop (key )
150+ 
119151
152+ class  CeleryInstrumentor (BaseInstrumentor ):
120153    def  instrumentation_dependencies (self ) ->  Collection [str ]:
121154        return  _instruments 
122155
@@ -139,8 +172,10 @@ def _instrument(self, **kwargs):
139172            schema_url = "https://opentelemetry.io/schemas/1.11.0" ,
140173        )
141174
142-         self .create_celery_metrics (meter )
175+         self .metrics  =  _create_celery_worker_metrics (meter )
176+         self .time_tracker  =  TaskDurationTracker (self .metrics )
143177
178+         signals .task_received .connect (self ._trace_received , weak = False )
144179        signals .task_prerun .connect (self ._trace_prerun , weak = False )
145180        signals .task_postrun .connect (self ._trace_postrun , weak = False )
146181        signals .before_task_publish .connect (
@@ -153,27 +188,52 @@ def _instrument(self, **kwargs):
153188        signals .task_retry .connect (self ._trace_retry , weak = False )
154189
155190    def  _uninstrument (self , ** kwargs ):
191+         signals .task_received .disconnect (self ._trace_received )
156192        signals .task_prerun .disconnect (self ._trace_prerun )
157193        signals .task_postrun .disconnect (self ._trace_postrun )
158194        signals .before_task_publish .disconnect (self ._trace_before_publish )
159195        signals .after_task_publish .disconnect (self ._trace_after_publish )
160196        signals .task_failure .disconnect (self ._trace_failure )
161197        signals .task_retry .disconnect (self ._trace_retry )
162198
199+     def  _trace_received (self , * args , ** kwargs ):
200+         """ 
201+         On prerun signal, task is prefetched and prefetch timer starts 
202+         """ 
203+ 
204+         request  =  utils .retrieve_request (kwargs )
205+ 
206+         metrics_attributes  =  utils .get_metrics_attributes_from_request (request )
207+         self .metrics [_TASK_COUNT_PREFETCHED ].add (
208+             1 , attributes = metrics_attributes 
209+         )
210+         self .time_tracker .record_start (request .task_id , _TASK_PREFETCH_TIME )
211+ 
163212    def  _trace_prerun (self , * args , ** kwargs ):
213+         """ 
214+         On prerun signal, task is no longer prefetched, and execution timer 
215+         starts along with the task span 
216+         """ 
217+ 
164218        task  =  utils .retrieve_task (kwargs )
165219        task_id  =  utils .retrieve_task_id (kwargs )
166220
167221        if  task  is  None  or  task_id  is  None :
168222            return 
169223
170-         self .update_task_duration_time (task_id )
224+         metrics_attributes  =  utils .get_metrics_attributes_from_task (task )
225+         self .metrics [_TASK_COUNT_PREFETCHED ].add (
226+             - 1 , attributes = metrics_attributes 
227+         )
228+         self .time_tracker .record_finish (
229+             task_id , _TASK_PREFETCH_TIME , metrics_attributes 
230+         )
231+         self .time_tracker .record_start (task_id , _TASK_PROCESSING_TIME )
232+ 
171233        request  =  task .request 
172234        tracectx  =  extract (request , getter = celery_getter ) or  None 
173235        token  =  context_api .attach (tracectx ) if  tracectx  is  not None  else  None 
174236
175-         logger .debug ("prerun signal start task_id=%s" , task_id )
176- 
177237        operation_name  =  f"{ _TASK_RUN } { task .name }  
178238        span  =  self ._tracer .start_span (
179239            operation_name , context = tracectx , kind = trace .SpanKind .CONSUMER 
@@ -183,14 +243,24 @@ def _trace_prerun(self, *args, **kwargs):
183243        activation .__enter__ ()  # pylint: disable=E1101 
184244        utils .attach_context (task , task_id , span , activation , token )
185245
246+         self .metrics [_TASK_COUNT_ACTIVE ].add (1 , attributes = metrics_attributes )
247+ 
186248    def  _trace_postrun (self , * args , ** kwargs ):
249+         """ 
250+         On postrun signal, task is no longer being executed 
251+         """ 
252+ 
187253        task  =  utils .retrieve_task (kwargs )
188254        task_id  =  utils .retrieve_task_id (kwargs )
189255
190256        if  task  is  None  or  task_id  is  None :
191257            return 
192258
193-         logger .debug ("postrun signal task_id=%s" , task_id )
259+         metrics_attributes  =  utils .get_metrics_attributes_from_task (task )
260+         self .metrics [_TASK_COUNT_ACTIVE ].add (- 1 , attributes = metrics_attributes )
261+         self .time_tracker .record_finish (
262+             task_id , _TASK_PROCESSING_TIME , metrics_attributes 
263+         )
194264
195265        # retrieve and finish the Span 
196266        ctx  =  utils .retrieve_context (task , task_id )
@@ -210,10 +280,8 @@ def _trace_postrun(self, *args, **kwargs):
210280
211281        activation .__exit__ (None , None , None )
212282        utils .detach_context (task , task_id )
213-         self .update_task_duration_time (task_id )
214-         labels  =  {"task" : task .name , "worker" : task .request .hostname }
215-         self ._record_histograms (task_id , labels )
216-         # if the process sending the task is not instrumented 
283+ 
284+         # If the process sending the task is not instrumented, 
217285        # there's no incoming context and no token to detach 
218286        if  token  is  not None :
219287            context_api .detach (token )
@@ -345,29 +413,29 @@ def _trace_retry(*args, **kwargs):
345413        # something that isn't an `Exception` 
346414        span .set_attribute (_TASK_RETRY_REASON_KEY , str (reason ))
347415
348-     def  update_task_duration_time (self , task_id ):
349-         cur_time  =  default_timer ()
350-         task_duration_time_until_now  =  (
351-             cur_time  -  self .task_id_to_start_time [task_id ]
352-             if  task_id  in  self .task_id_to_start_time 
353-             else  cur_time 
354-         )
355-         self .task_id_to_start_time [task_id ] =  task_duration_time_until_now 
356- 
357-     def  _record_histograms (self , task_id , metric_attributes ):
358-         if  task_id  is  None :
359-             return 
360416
361-         self .metrics ["flower.task.runtime.seconds" ].record (
362-             self .task_id_to_start_time .get (task_id ),
363-             attributes = metric_attributes ,
364-         )
365- 
366-     def  create_celery_metrics (self , meter ) ->  None :
367-         self .metrics  =  {
368-             "flower.task.runtime.seconds" : meter .create_histogram (
369-                 name = "flower.task.runtime.seconds" ,
370-                 unit = "seconds" ,
371-                 description = "The time it took to run the task." ,
372-             )
373-         }
417+ def  _create_celery_worker_metrics (meter ) ->  None :
418+     metrics  =  {
419+         _TASK_COUNT_ACTIVE : meter .create_up_down_counter (
420+             name = _TASK_COUNT_ACTIVE ,
421+             unit = "{message}" ,
422+             description = "Number of tasks currently being executed by the worker" ,
423+         ),
424+         _TASK_COUNT_PREFETCHED : meter .create_up_down_counter (
425+             name = _TASK_COUNT_PREFETCHED ,
426+             unit = "{message}" ,
427+             description = "Number of tasks prefetched by the worker" ,
428+         ),
429+         _TASK_PREFETCH_TIME : meter .create_histogram (
430+             name = _TASK_PREFETCH_TIME ,
431+             unit = "s" ,
432+             description = "The time the task spent in prefetch mode" ,
433+         ),
434+         _TASK_PROCESSING_TIME : meter .create_histogram (
435+             name = _TASK_PROCESSING_TIME ,
436+             unit = "s" ,
437+             description = "The time it took to run the task." ,
438+         ),
439+     }
440+ 
441+     return  metrics 
0 commit comments