Skip to content

Commit c5efd75

Browse files
authored
adding timeout loop for heartbeat and messages (#860)
* adding timeout loop for heartbeat and messages * formatting * fixing messages
1 parent edfbf73 commit c5efd75

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

backend/heartbeat_listener.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
from datetime import datetime
6+
import time
67

78
from aio_pika import connect_robust
89
from aio_pika.abc import AbstractIncomingMessage
@@ -17,6 +18,9 @@
1718
logger = logging.getLogger(__name__)
1819
logger.setLevel(logging.INFO)
1920

21+
timeout = 5 * 60 # five minute timeout
22+
time_ran = 0
23+
2024

2125
async def callback(message: AbstractIncomingMessage):
2226
"""This method receives messages from RabbitMQ and processes them.
@@ -118,4 +122,14 @@ async def listen_for_heartbeats():
118122

119123

120124
if __name__ == "__main__":
121-
asyncio.run(listen_for_heartbeats())
125+
start = datetime.now()
126+
while time_ran < timeout:
127+
try:
128+
asyncio.run(listen_for_heartbeats())
129+
except Exception as e:
130+
logger.info(f" Heartbeat listner failed, retry in 10 seconds...")
131+
time.sleep(10)
132+
current_time = datetime.now()
133+
current_seconds = (current_time - start).total_seconds()
134+
time_ran += current_seconds
135+
logger.info(f" Heartbeat listener could not connect to rabbitmq.")

backend/message_listener.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66
import string
77
from datetime import datetime
8+
import time
89

910
from aio_pika import connect_robust
1011
from aio_pika.abc import AbstractIncomingMessage
@@ -22,6 +23,9 @@
2223
logger = logging.getLogger(__name__)
2324
logger.setLevel(logging.INFO)
2425

26+
timeout = 5 * 60 # five minute timeout
27+
time_ran = 0
28+
2529

2630
def parse_message_status(msg):
2731
"""Determine if the message corresponds to start/middle/end of job if possible. See pyclowder.utils.StatusMessage."""
@@ -219,4 +223,14 @@ async def listen_for_messages():
219223

220224

221225
if __name__ == "__main__":
222-
asyncio.run(listen_for_messages())
226+
start = datetime.now()
227+
while time_ran < timeout:
228+
try:
229+
asyncio.run(listen_for_messages())
230+
except Exception as e:
231+
logger.info(f" Message listener failed, retry in 10 seconds...")
232+
time.sleep(10)
233+
current_time = datetime.now()
234+
current_seconds = (current_time - start).total_seconds()
235+
time_ran += current_seconds
236+
logger.info(f"Message listener could not connect to rabbitmq. Timeout.")

0 commit comments

Comments
 (0)