Skip to content

Commit 8873e2b

Browse files
committed
Fix race condition in RMQ Monitor
1 parent aaf8977 commit 8873e2b

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

lithops/monitor.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169

170170
self.rabbit_amqp_url = config.get('amqp_url')
171171
self.queue = f'lithops-{self.executor_id}'
172+
self.tag = None
172173
self._create_resources()
173174

174175
def _create_resources(self):
@@ -189,6 +190,8 @@ def _delete_resources(self):
189190
"""
190191
connection = pika.BlockingConnection(self.pikaparams)
191192
channel = connection.channel()
193+
if self.tag:
194+
channel.basic_cancel(self.tag)
192195
channel.queue_delete(queue=self.queue)
193196
channel.close()
194197
connection.close()
@@ -241,9 +244,7 @@ def _generate_tokens(self, call_status):
241244
self.token_bucket_q.put('#')
242245

243246
def run(self):
244-
logger.debug(f'ExecutorID {self.executor_id} | Starting RabbitMQ job monitor')
245-
prevoius_log = None
246-
log_time = 0
247+
logger.debug(f'ExecutorID {self.executor_id} | Starting RabbitMQ job monitor')
247248
SLEEP_TIME = 2
248249

249250
channel = self.connection.channel()
@@ -261,21 +262,24 @@ def callback(ch, method, properties, body):
261262
if self._all_ready() or not self.should_run:
262263
ch.stop_consuming()
263264
ch.close()
264-
self._print_status_log()
265-
logger.debug(f'ExecutorID {self.executor_id} | RabbitMQ job monitor finished')
266-
267-
channel.basic_consume(self.queue, callback, auto_ack=True)
268-
threading.Thread(target=channel.start_consuming, daemon=True).start()
269-
270-
while not self._all_ready():
271-
# Format call_ids running, pending and done
272-
prevoius_log, log_time = self._print_status_log(previous_log=prevoius_log, log_time=log_time)
273-
self._future_timeout_checker(self.futures)
274-
time.sleep(SLEEP_TIME)
275-
log_time += SLEEP_TIME
276265

277-
if not self.should_run:
278-
break
266+
def manage_timeouts():
267+
prevoius_log = None
268+
log_time = 0
269+
while self.should_run and not self._all_ready():
270+
# Format call_ids running, pending and done
271+
prevoius_log, log_time = self._print_status_log(previous_log=prevoius_log, log_time=log_time)
272+
self._future_timeout_checker(self.futures)
273+
time.sleep(SLEEP_TIME)
274+
log_time += SLEEP_TIME
275+
276+
threading.Thread(target=manage_timeouts, daemon=True).start()
277+
278+
self.tag = channel.basic_consume(self.queue, callback, auto_ack=True)
279+
channel.start_consuming()
280+
self.tag = None
281+
self._print_status_log()
282+
logger.debug(f'ExecutorID {self.executor_id} | RabbitMQ job monitor finished')
279283

280284

281285
class StorageMonitor(Monitor):

0 commit comments

Comments
 (0)