@@ -179,7 +179,7 @@ def _parse_progress(line: str):
179179
180180 def _log_accumulated_logs (new_log : str , acc_logs : List [str ], time_logs_sent : float ):
181181 # do not overload broker with messages, we log once every 1sec
182- TIME_BETWEEN_LOGS_S = 1 .0
182+ TIME_BETWEEN_LOGS_S = 2 .0
183183 acc_logs .append (new_log )
184184 now = time .monotonic ()
185185 if (now - time_logs_sent ) > TIME_BETWEEN_LOGS_S :
@@ -329,30 +329,43 @@ def process(self):
329329
330330 log .debug ('DONE Processing Pipeline %s and node %s from container' , self ._task .project_id , self ._task .internal_id )
331331
332- def run (self ):
333- connection = pika .BlockingConnection (self ._pika .parameters )
334332
333+ @contextmanager
334+ def safe_log_channel (self ):
335+ connection = pika .BlockingConnection (self ._pika .parameters )
335336 channel = connection .channel ()
336337 channel .exchange_declare (exchange = self ._pika .log_channel , exchange_type = 'fanout' , auto_delete = True )
338+ try :
339+ yield channel
340+ finally :
341+ connection .close ()
342+
343+ def run (self ):
344+ with self .safe_log_channel () as channel :
345+ msg = "Preprocessing start..."
346+ self ._log (channel , msg )
337347
338- msg = "Preprocessing start..."
339- self ._log (channel , msg )
340348 self .preprocess ()
341- msg = "...preprocessing end"
342- self ._log (channel , msg )
343349
344- msg = "Processing start..."
345- self ._log (channel , msg )
350+ with self .safe_log_channel () as channel :
351+ msg = "...preprocessing end"
352+ self ._log (channel , msg )
353+ msg = "Processing start..."
354+ self ._log (channel , msg )
355+
346356 self .process ()
347- msg = "...processing end"
348- self ._log (channel , msg )
349357
350- msg = "Postprocessing start..."
351- self ._log (channel , msg )
358+ with self .safe_log_channel () as channel :
359+ msg = "...processing end"
360+ self ._log (channel , msg )
361+ msg = "Postprocessing start..."
362+ self ._log (channel , msg )
363+
352364 self .postprocess ()
353- msg = "...postprocessing end"
354- self ._log (channel , msg )
355- connection .close ()
365+
366+ with self .safe_log_channel () as channel :
367+ msg = "...postprocessing end"
368+ self ._log (channel , msg )
356369
357370
358371 def postprocess (self ):
0 commit comments