@@ -207,25 +207,34 @@ def _notify_listeners(self, callback_name):
207207 callback (self )
208208
209209 def _safe_call (self , fn , * args , ** kwargs ):
210+ class update_timestamps :
211+ '''Context manager to set the start and finish timestamps.'''
212+
213+ def __init__ (self , obj ):
214+ self .obj = obj
215+
216+ def __enter__ (self ):
217+ if fn .__name__ != 'poll' :
218+ cs = self .obj ._current_stage
219+ self .obj ._timestamps [f'{ cs } _start' ] = time .time ()
220+
221+ def __exit__ (self , exc_type , exc_value , traceback ):
222+ cs = self .obj ._current_stage
223+ self .obj ._timestamps [f'{ cs } _finish' ] = time .time ()
224+ self .obj ._timestamps ['pipeline_end' ] = time .time ()
225+
210226 if fn .__name__ != 'poll' :
211227 self ._current_stage = fn .__name__
212- self ._timestamps [f'{ self ._current_stage } _start' ] = time .time ()
213228
214229 try :
215230 with logging .logging_context (self .check ) as logger :
216- logger .debug ('entering stage: %s' % self ._current_stage )
217- ret = fn (* args , ** kwargs )
218- self ._timestamps [f'{ self ._current_stage } _finish' ] = time .time ()
219- self ._timestamps ['pipeline_end' ] = time .time ()
220- return ret
231+ logger .debug (f'entering stage: { self ._current_stage } ' )
232+ with update_timestamps (self ):
233+ return fn (* args , ** kwargs )
221234 except ABORT_REASONS :
222- self ._timestamps [f'{ self ._current_stage } _finish' ] = time .time ()
223- self ._timestamps ['pipeline_end' ] = time .time ()
224235 self .fail ()
225236 raise
226237 except BaseException as e :
227- self ._timestamps [f'{ self ._current_stage } _finish' ] = time .time ()
228- self ._timestamps ['pipeline_end' ] = time .time ()
229238 self .fail ()
230239 raise TaskExit from e
231240
0 commit comments