Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/deepchecks_monitoring/bgtasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"""Contains alert scheduling logic."""
import asyncio
import logging
import logging.handlers
import typing as t
from collections import defaultdict
from contextlib import asynccontextmanager
Expand Down
2 changes: 1 addition & 1 deletion backend/deepchecks_monitoring/bgtasks/tasks_queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async def main():

async with ResourcesProvider(settings) as rp:
async with anyio.create_task_group() as g:
async_redis = await init_async_redis(rp.redis_settings)
async_redis = await init_async_redis()
worker = tasks_queuer.TasksQueuer(rp, async_redis, workers, logger, settings.queuer_run_interval)
g.start_soon(worker.run)

Expand Down
13 changes: 6 additions & 7 deletions backend/deepchecks_monitoring/bgtasks/tasks_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pendulum as pdl
import uvloop
from redis.asyncio import Redis, RedisCluster
from redis.exceptions import LockNotOwnedError
from redis.exceptions import LockNotOwnedError, TimeoutError
from sqlalchemy import select

from deepchecks_monitoring.bgtasks.alert_task import AlertsTask
Expand Down Expand Up @@ -77,15 +77,14 @@ async def run(self):
raise

async def wait_for_task(self, timeout=120):
task_entry = await self.redis.bzpopmin(GLOBAL_TASK_QUEUE, timeout=timeout)

# If timeout is not 0 we might get return value of None
if task_entry is None:
try:
task_entry = await self.redis.bzpopmin(GLOBAL_TASK_QUEUE, timeout=timeout)
except TimeoutError:
self.logger.info('Got from redis queue task_id none')
return
else:
# Return value from redis is (redis key, value, score)
task_id = int(task_entry[1].decode())
task_id = int(task_entry[1])
queued_timestamp: int = task_entry[2]
return task_id, queued_timestamp

Expand Down Expand Up @@ -179,7 +178,7 @@ async def main():
from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel

async with ResourcesProvider(settings) as rp:
async_redis = await init_async_redis(rp.redis_settings)
async_redis = await init_async_redis()

workers = [
ModelVersionCacheInvalidation(),
Expand Down
3 changes: 2 additions & 1 deletion backend/deepchecks_monitoring/utils/redis_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def create_settings_dict(redis_settings: RedisSettings):
)


async def init_async_redis(redis_settings: RedisSettings):
async def init_async_redis(redis_settings: RedisSettings | None = None):
"""Initialize redis connection."""
redis_settings = redis_settings or RedisSettings()
settings = create_settings_dict(redis_settings)
try:
redis = AsyncRedisCluster.from_url(
Expand Down
2 changes: 1 addition & 1 deletion backend/dev_utils/run_task_directly.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def run_it():
async with rp.create_async_database_session() as session:

try:
async_redis = await init_async_redis(rp.redis_settings)
async_redis = await init_async_redis()

lock_name = TASK_RUNNER_LOCK.format(1)
# By default, allow task 5 minutes before removes lock to allow another run. Inside the task itself we can
Expand Down
Loading