1414
1515import anyio
1616import pendulum as pdl
17+ import redis .exceptions as redis_exceptions
1718import uvloop
1819from redis .asyncio import Redis , RedisCluster
19- from redis .exceptions import LockNotOwnedError
2020from sqlalchemy import select
2121
2222from deepchecks_monitoring .bgtasks .alert_task import AlertsTask
@@ -77,17 +77,20 @@ async def run(self):
7777 raise
7878
7979 async def wait_for_task (self , timeout = 120 ):
80- task_entry = await self .redis .bzpopmin (GLOBAL_TASK_QUEUE , timeout = timeout )
81-
82- # If timeout is not 0 we might get return value of None
83- if task_entry is None :
84- self .logger .info ('Got from redis queue task_id none' )
80+ try :
81+ task_entry = await self .redis .bzpopmin (GLOBAL_TASK_QUEUE , timeout = timeout )
82+ except redis_exceptions .TimeoutError :
83+ self .logger .info ('Got timeout from redis queue polling task_id ' )
8584 return
8685 else :
87- # Return value from redis is (redis key, value, score)
88- task_id = int (task_entry [1 ].decode ())
89- queued_timestamp : int = task_entry [2 ]
90- return task_id , queued_timestamp
86+ if task_entry is None :
87+ self .logger .info ('Got from redis queue task_id none' )
88+ return
89+ else :
90+ # Return value from redis is (redis key, value, score)
91+ task_id = int (task_entry [1 ].decode ())
92+ queued_timestamp : int = task_entry [2 ]
93+ return task_id , queued_timestamp
9194
9295 async def run_single_task (self , task_id , session , queued_timestamp ):
9396 """Run single task."""
@@ -111,7 +114,7 @@ async def run_single_task(self, task_id, session, queued_timestamp):
111114
112115 try :
113116 await lock .release ()
114- except LockNotOwnedError :
117+ except redis_exceptions . LockNotOwnedError :
115118 self .logger .error (f'Failed to release lock for task id: { task_id } . probably task run for longer than '
116119 f'maximum time for the lock' )
117120
@@ -179,7 +182,7 @@ async def main():
179182 from deepchecks_monitoring .bgtasks import tasks_runner # pylint: disable=import-outside-toplevel
180183
181184 async with ResourcesProvider (settings ) as rp :
182- async_redis = await init_async_redis (rp . redis_settings )
185+ async_redis = await init_async_redis ()
183186
184187 workers = [
185188 ModelVersionCacheInvalidation (),
0 commit comments