diff --git a/.github/workflows/license-check.yml b/.github/workflows/license-check.yml index ff5f912bd..55ea35481 100644 --- a/.github/workflows/license-check.yml +++ b/.github/workflows/license-check.yml @@ -50,8 +50,7 @@ jobs: with: requirements: "backend/requirements-all.txt" fail: "Copyleft,Other,Error" - exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)' - # psycopg2 is LGPL 2 + exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|termcolor.*3\.0\.1|deepchecks.*)' # psycopg2 is LGPL 2 # category_encoders is BSD https://github.com/scikit-learn-contrib/category_encoders/tree/master?tab=BSD-3-Clause-1-ov-file # attrs is MIT https://github.com/python-attrs/attrs/blob/main/LICENSE # referencing is MIT https://github.com/python-jsonschema/referencing?tab=MIT-1-ov-file @@ -64,6 +63,7 @@ jobs: # torchvision is BSD https://github.com/pytorch/vision/blob/main/LICENSE # torchaudio is BSD https://github.com/pytorch/audio/blob/main/LICENSE # terminado is BSD https://github.com/jupyter/terminado/blob/main/LICENSE + # termcolor is MIT https://github.com/termcolor/termcolor/blob/main/COPYING.txt # orderedmultidict is freeley distributed https://github.com/gruns/orderedmultidict/blob/master/LICENSE.md - name: Print report if: ${{ always() }} diff --git a/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py b/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py index 0f1a6f225..c25d8ab1d 100644 --- a/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py +++ b/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py @@ -11,7 +11,7 @@ """Contains alert scheduling logic.""" import asyncio import datetime -import logging.handlers +import logging import typing as t from time import perf_counter @@ -28,11 +28,11 @@ from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation -# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker -from deepchecks_monitoring.config import DatabaseSettings, RedisSettings +from deepchecks_monitoring.config import DatabaseSettings from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE from deepchecks_monitoring.monitoring_utils import configure_logger from deepchecks_monitoring.public_models.task import BackgroundWorker, Task +from deepchecks_monitoring.utils.redis_util import init_async_redis try: from deepchecks_monitoring import ee @@ -136,7 +136,7 @@ async def move_tasks_to_queue(self, session) -> int: return 0 -class WorkerSettings(DatabaseSettings, RedisSettings): +class WorkerSettings(DatabaseSettings): """Worker settings.""" logfile: t.Optional[str] = None @@ -152,16 +152,6 @@ class Config: env_file_encoding = 'utf-8' -async def init_async_redis(redis_uri): - """Initialize redis connection.""" - try: - redis = RedisCluster.from_url(redis_uri) - await redis.ping() - return redis - except redis_exceptions.RedisClusterException: - return Redis.from_url(redis_uri) - - def execute_worker(): """Execute worker.""" @@ -195,7 +185,7 @@ async def main(): async with ResourcesProvider(settings) as rp: async with anyio.create_task_group() as g: - async_redis = await init_async_redis(rp.redis_settings.redis_uri) + async_redis = await init_async_redis(rp.redis_settings) worker = tasks_queuer.TasksQueuer(rp, async_redis, workers, logger, settings.queuer_run_interval) g.start_soon(worker.run) diff --git a/backend/deepchecks_monitoring/bgtasks/tasks_runner.py b/backend/deepchecks_monitoring/bgtasks/tasks_runner.py index 2cf7bff91..5e9a8f8c0 100644 --- a/backend/deepchecks_monitoring/bgtasks/tasks_runner.py +++ b/backend/deepchecks_monitoring/bgtasks/tasks_runner.py @@ -9,14 +9,14 @@ # ---------------------------------------------------------------------------- # """Contains alert scheduling logic.""" -import logging.handlers +import logging import typing as t import anyio import pendulum as pdl import uvloop from redis.asyncio import Redis, RedisCluster -from redis.exceptions import LockNotOwnedError, RedisClusterException +from redis.exceptions import LockNotOwnedError from sqlalchemy import select from deepchecks_monitoring.bgtasks.alert_task import AlertsTask @@ -24,11 +24,11 @@ from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation -# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker from deepchecks_monitoring.config import Settings from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK from deepchecks_monitoring.monitoring_utils import configure_logger from deepchecks_monitoring.public_models.task import BackgroundWorker, Task +from deepchecks_monitoring.utils.redis_util import init_async_redis try: from deepchecks_monitoring import ee @@ -160,16 +160,6 @@ class WorkerSettings(BaseWorkerSettings, Settings): pass -async def init_async_redis(redis_uri): - """Initialize redis connection.""" - try: - redis = RedisCluster.from_url(redis_uri) - await redis.ping() - return redis - except RedisClusterException: - return Redis.from_url(redis_uri) - - def execute_worker(): """Execute worker.""" @@ -189,7 +179,7 @@ async def main(): from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel async with ResourcesProvider(settings) as rp: - async_redis = await init_async_redis(rp.redis_settings.redis_uri) + async_redis = await init_async_redis(rp.redis_settings) workers = [ ModelVersionCacheInvalidation(), diff --git a/backend/deepchecks_monitoring/config.py b/backend/deepchecks_monitoring/config.py index d7c3a99d3..c443664de 100644 --- a/backend/deepchecks_monitoring/config.py +++ b/backend/deepchecks_monitoring/config.py @@ -137,6 +137,12 @@ class RedisSettings(BaseDeepchecksSettings): """Redis settings.""" redis_uri: t.Optional[RedisDsn] = None + decode_responses: bool = True + socket_connect_timeout: int = 5 + socket_timeout: int = 5 + socket_keepalive: bool = True + retry_attempts: int = 6 + cluster_error_retry_attempts: int = 2 class Settings( diff --git a/backend/deepchecks_monitoring/resources.py b/backend/deepchecks_monitoring/resources.py index 88919e50f..b3a1f62fa 100644 --- a/backend/deepchecks_monitoring/resources.py +++ b/backend/deepchecks_monitoring/resources.py @@ -14,6 +14,7 @@ from contextlib import asynccontextmanager, contextmanager import httpx +import redis.exceptions as redis_exceptions import tenacity from aiokafka import AIOKafkaProducer from authlib.integrations.starlette_client import OAuth @@ -22,7 +23,6 @@ from kafka.errors import KafkaError, TopicAlreadyExistsError from redis.client import Redis from redis.cluster import RedisCluster -from redis.exceptions import RedisClusterException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from sqlalchemy.future.engine import Engine, create_engine @@ -39,6 +39,7 @@ from deepchecks_monitoring.utils import database from deepchecks_monitoring.utils.mixpanel import BaseEvent as BaseMixpanelEvent from deepchecks_monitoring.utils.mixpanel import MixpanelEventReporter +from deepchecks_monitoring.utils.redis_util import create_settings_dict __all__ = ["ResourcesProvider"] @@ -291,10 +292,15 @@ def get_kafka_admin(self) -> t.Generator[KafkaAdminClient, None, None]: def redis_client(self) -> t.Optional[Redis]: """Return redis client if redis defined, else None.""" if self._redis_client is None and self.redis_settings.redis_uri: + settings = create_settings_dict(self.redis_settings) try: - self._redis_client = RedisCluster.from_url(self.redis_settings.redis_uri) - except RedisClusterException: - self._redis_client = Redis.from_url(self.redis_settings.redis_uri) + self._redis_client = RedisCluster.from_url( + cluster_error_retry_attempts=self.redis_settings.cluster_error_retry_attempts, + **settings + ) + except redis_exceptions.RedisClusterException: + self._redis_client = Redis.from_url(**settings) + return self._redis_client @property diff --git a/backend/deepchecks_monitoring/schema_models/column_type.py b/backend/deepchecks_monitoring/schema_models/column_type.py index fe18bd561..52e0a0432 100644 --- a/backend/deepchecks_monitoring/schema_models/column_type.py +++ b/backend/deepchecks_monitoring/schema_models/column_type.py @@ -62,6 +62,9 @@ def to_sqlalchemy_type(self): } return types_map[self] + def is_indexed(self): + return self in {ColumnType.NUMERIC, ColumnType.INTEGER, ColumnType.BIGINT, ColumnType.DATETIME} + def to_json_schema_type(self, nullable=False, min_items: int = None, max_items: int = None): """Return the json type of the column type.""" types_map = { @@ -170,7 +173,7 @@ def column_types_to_table_columns(column_types: t.Dict[str, ColumnType], primary sa.Column( name, data_type.to_sqlalchemy_type(), - index=True, + index=data_type.is_indexed(), primary_key=(name == primary_key) ) for name, data_type in column_types.items() diff --git a/backend/deepchecks_monitoring/utils/redis_util.py b/backend/deepchecks_monitoring/utils/redis_util.py new file mode 100644 index 000000000..23be25e1f --- /dev/null +++ b/backend/deepchecks_monitoring/utils/redis_util.py @@ -0,0 +1,34 @@ +from redis.asyncio import Redis as AsyncRedis +from redis.asyncio import RedisCluster as AsyncRedisCluster +from redis.backoff import ExponentialBackoff +from redis.exceptions import RedisClusterException +from redis.retry import Retry + +from deepchecks_monitoring.config import RedisSettings + + +def create_settings_dict(redis_settings: RedisSettings): + """Create redis settings param dict""" + + return dict( + url=redis_settings.redis_uri, + decode_responses=redis_settings.decode_responses, + socket_connect_timeout=redis_settings.socket_connect_timeout, + socket_timeout=redis_settings.socket_timeout, + socket_keepalive=redis_settings.socket_keepalive, + retry=Retry(ExponentialBackoff(), redis_settings.retry_attempts), + ) + + +async def init_async_redis(redis_settings: RedisSettings): + """Initialize redis connection.""" + settings = create_settings_dict(redis_settings) + try: + redis = AsyncRedisCluster.from_url( + cluster_error_retry_attempts=redis_settings.cluster_error_retry_attempts, + **settings + ) + await redis.ping() + return redis + except RedisClusterException: + return AsyncRedis.from_url(**settings) diff --git a/backend/dev_utils/run_task_directly.py b/backend/dev_utils/run_task_directly.py index 48021888b..c6db48336 100644 --- a/backend/dev_utils/run_task_directly.py +++ b/backend/dev_utils/run_task_directly.py @@ -12,29 +12,20 @@ import asyncio import dotenv -from redis.asyncio import Redis, RedisCluster -from redis.exceptions import LockNotOwnedError, RedisClusterException +from redis.exceptions import LockNotOwnedError from sqlalchemy import create_engine, select from deepchecks_monitoring.ee.bgtasks import ObjectStorageIngestor from deepchecks_monitoring.ee.resources import ResourcesProvider from deepchecks_monitoring.logic.keys import TASK_RUNNER_LOCK from deepchecks_monitoring.public_models import Task +from deepchecks_monitoring.utils.redis_util import init_async_redis # Task class you want to run TASK_CLASS = ObjectStorageIngestor # The task name you want to run (need to be exists in DB, we take the last one ordered by id desc) BG_WORKER_TASK = 'object_storage_ingestion' -async def init_async_redis(redis_uri): - """Initialize redis connection.""" - try: - redis = RedisCluster.from_url(redis_uri) - await redis.ping() - return redis - except RedisClusterException: - return Redis.from_url(redis_uri) - async def run_it(): if path := dotenv.find_dotenv(usecwd=True): dotenv.load_dotenv(dotenv_path=path) @@ -49,7 +40,7 @@ async def run_it(): async with rp.create_async_database_session() as session: try: - async_redis = await init_async_redis(rp.redis_settings.redis_uri) + async_redis = await init_async_redis(rp.redis_settings) lock_name = TASK_RUNNER_LOCK.format(1) # By default, allow task 5 minutes before removes lock to allow another run. Inside the task itself we can