Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/license-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ jobs:
with:
requirements: "backend/requirements-all.txt"
fail: "Copyleft,Other,Error"
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)'
# psycopg2 is LGPL 2
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|termcolor.*3\.0\.1|deepchecks.*)' # psycopg2 is LGPL 2
# category_encoders is BSD https://github.com/scikit-learn-contrib/category_encoders/tree/master?tab=BSD-3-Clause-1-ov-file
# attrs is MIT https://github.com/python-attrs/attrs/blob/main/LICENSE
# referencing is MIT https://github.com/python-jsonschema/referencing?tab=MIT-1-ov-file
Expand All @@ -64,6 +63,7 @@ jobs:
# torchvision is BSD https://github.com/pytorch/vision/blob/main/LICENSE
# torchaudio is BSD https://github.com/pytorch/audio/blob/main/LICENSE
# terminado is BSD https://github.com/jupyter/terminado/blob/main/LICENSE
# termcolor is MIT https://github.com/termcolor/termcolor/blob/main/COPYING.txt
# orderedmultidict is freeley distributed https://github.com/gruns/orderedmultidict/blob/master/LICENSE.md
- name: Print report
if: ${{ always() }}
Expand Down
20 changes: 5 additions & 15 deletions backend/deepchecks_monitoring/bgtasks/tasks_queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"""Contains alert scheduling logic."""
import asyncio
import datetime
import logging.handlers
import logging
import typing as t
from time import perf_counter

Expand All @@ -28,11 +28,11 @@
from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent
from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter
from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation
# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker
from deepchecks_monitoring.config import DatabaseSettings, RedisSettings
from deepchecks_monitoring.config import DatabaseSettings
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE
from deepchecks_monitoring.monitoring_utils import configure_logger
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
from deepchecks_monitoring.utils.redis_util import init_async_redis

try:
from deepchecks_monitoring import ee
Expand Down Expand Up @@ -136,7 +136,7 @@ async def move_tasks_to_queue(self, session) -> int:
return 0


class WorkerSettings(DatabaseSettings, RedisSettings):
class WorkerSettings(DatabaseSettings):
"""Worker settings."""

logfile: t.Optional[str] = None
Expand All @@ -152,16 +152,6 @@ class Config:
env_file_encoding = 'utf-8'


async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
redis = RedisCluster.from_url(redis_uri)
await redis.ping()
return redis
except redis_exceptions.RedisClusterException:
return Redis.from_url(redis_uri)


def execute_worker():
"""Execute worker."""

Expand Down Expand Up @@ -195,7 +185,7 @@ async def main():

async with ResourcesProvider(settings) as rp:
async with anyio.create_task_group() as g:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
async_redis = await init_async_redis(rp.redis_settings)
worker = tasks_queuer.TasksQueuer(rp, async_redis, workers, logger, settings.queuer_run_interval)
g.start_soon(worker.run)

Expand Down
18 changes: 4 additions & 14 deletions backend/deepchecks_monitoring/bgtasks/tasks_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@
# ----------------------------------------------------------------------------
#
"""Contains alert scheduling logic."""
import logging.handlers
import logging
import typing as t

import anyio
import pendulum as pdl
import uvloop
from redis.asyncio import Redis, RedisCluster
from redis.exceptions import LockNotOwnedError, RedisClusterException
from redis.exceptions import LockNotOwnedError
from sqlalchemy import select

from deepchecks_monitoring.bgtasks.alert_task import AlertsTask
from deepchecks_monitoring.bgtasks.delete_db_table_task import DeleteDbTableTask
from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent
from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter
from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation
# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker
from deepchecks_monitoring.config import Settings
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK
from deepchecks_monitoring.monitoring_utils import configure_logger
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
from deepchecks_monitoring.utils.redis_util import init_async_redis

try:
from deepchecks_monitoring import ee
Expand Down Expand Up @@ -160,16 +160,6 @@ class WorkerSettings(BaseWorkerSettings, Settings):
pass


async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
redis = RedisCluster.from_url(redis_uri)
await redis.ping()
return redis
except RedisClusterException:
return Redis.from_url(redis_uri)


def execute_worker():
"""Execute worker."""

Expand All @@ -189,7 +179,7 @@ async def main():
from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel

async with ResourcesProvider(settings) as rp:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
async_redis = await init_async_redis(rp.redis_settings)

workers = [
ModelVersionCacheInvalidation(),
Expand Down
6 changes: 6 additions & 0 deletions backend/deepchecks_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ class RedisSettings(BaseDeepchecksSettings):
"""Redis settings."""

redis_uri: t.Optional[RedisDsn] = None
decode_responses: bool = True
socket_connect_timeout: int = 5
socket_timeout: int = 5
socket_keepalive: bool = True
retry_attempts: int = 6
cluster_error_retry_attempts: int = 2


class Settings(
Expand Down
14 changes: 10 additions & 4 deletions backend/deepchecks_monitoring/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from contextlib import asynccontextmanager, contextmanager

import httpx
import redis.exceptions as redis_exceptions
import tenacity
from aiokafka import AIOKafkaProducer
from authlib.integrations.starlette_client import OAuth
Expand All @@ -22,7 +23,6 @@
from kafka.errors import KafkaError, TopicAlreadyExistsError
from redis.client import Redis
from redis.cluster import RedisCluster
from redis.exceptions import RedisClusterException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.future.engine import Engine, create_engine
Expand All @@ -39,6 +39,7 @@
from deepchecks_monitoring.utils import database
from deepchecks_monitoring.utils.mixpanel import BaseEvent as BaseMixpanelEvent
from deepchecks_monitoring.utils.mixpanel import MixpanelEventReporter
from deepchecks_monitoring.utils.redis_util import create_settings_dict

__all__ = ["ResourcesProvider"]

Expand Down Expand Up @@ -291,10 +292,15 @@ def get_kafka_admin(self) -> t.Generator[KafkaAdminClient, None, None]:
def redis_client(self) -> t.Optional[Redis]:
"""Return redis client if redis defined, else None."""
if self._redis_client is None and self.redis_settings.redis_uri:
settings = create_settings_dict(self.redis_settings)
try:
self._redis_client = RedisCluster.from_url(self.redis_settings.redis_uri)
except RedisClusterException:
self._redis_client = Redis.from_url(self.redis_settings.redis_uri)
self._redis_client = RedisCluster.from_url(
cluster_error_retry_attempts=self.redis_settings.cluster_error_retry_attempts,
**settings
)
except redis_exceptions.RedisClusterException:
self._redis_client = Redis.from_url(**settings)

return self._redis_client

@property
Expand Down
5 changes: 4 additions & 1 deletion backend/deepchecks_monitoring/schema_models/column_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def to_sqlalchemy_type(self):
}
return types_map[self]

def is_indexed(self):
return self in {ColumnType.NUMERIC, ColumnType.INTEGER, ColumnType.BIGINT, ColumnType.DATETIME}

def to_json_schema_type(self, nullable=False, min_items: int = None, max_items: int = None):
"""Return the json type of the column type."""
types_map = {
Expand Down Expand Up @@ -170,7 +173,7 @@ def column_types_to_table_columns(column_types: t.Dict[str, ColumnType], primary
sa.Column(
name,
data_type.to_sqlalchemy_type(),
index=True,
index=data_type.is_indexed(),
primary_key=(name == primary_key)
)
for name, data_type in column_types.items()
Expand Down
34 changes: 34 additions & 0 deletions backend/deepchecks_monitoring/utils/redis_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from redis.asyncio import Redis as AsyncRedis
from redis.asyncio import RedisCluster as AsyncRedisCluster
from redis.backoff import ExponentialBackoff
from redis.exceptions import RedisClusterException
from redis.retry import Retry

from deepchecks_monitoring.config import RedisSettings


def create_settings_dict(redis_settings: RedisSettings):
"""Create redis settings param dict"""

return dict(
url=redis_settings.redis_uri,
decode_responses=redis_settings.decode_responses,
socket_connect_timeout=redis_settings.socket_connect_timeout,
socket_timeout=redis_settings.socket_timeout,
socket_keepalive=redis_settings.socket_keepalive,
retry=Retry(ExponentialBackoff(), redis_settings.retry_attempts),
)


async def init_async_redis(redis_settings: RedisSettings):
"""Initialize redis connection."""
settings = create_settings_dict(redis_settings)
try:
redis = AsyncRedisCluster.from_url(
cluster_error_retry_attempts=redis_settings.cluster_error_retry_attempts,
**settings
)
await redis.ping()
return redis
except RedisClusterException:
return AsyncRedis.from_url(**settings)
15 changes: 3 additions & 12 deletions backend/dev_utils/run_task_directly.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,20 @@
import asyncio

import dotenv
from redis.asyncio import Redis, RedisCluster
from redis.exceptions import LockNotOwnedError, RedisClusterException
from redis.exceptions import LockNotOwnedError
from sqlalchemy import create_engine, select

from deepchecks_monitoring.ee.bgtasks import ObjectStorageIngestor
from deepchecks_monitoring.ee.resources import ResourcesProvider
from deepchecks_monitoring.logic.keys import TASK_RUNNER_LOCK
from deepchecks_monitoring.public_models import Task
from deepchecks_monitoring.utils.redis_util import init_async_redis

# Task class you want to run
TASK_CLASS = ObjectStorageIngestor
# The task name you want to run (need to be exists in DB, we take the last one ordered by id desc)
BG_WORKER_TASK = 'object_storage_ingestion'

async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
redis = RedisCluster.from_url(redis_uri)
await redis.ping()
return redis
except RedisClusterException:
return Redis.from_url(redis_uri)

async def run_it():
if path := dotenv.find_dotenv(usecwd=True):
dotenv.load_dotenv(dotenv_path=path)
Expand All @@ -49,7 +40,7 @@ async def run_it():
async with rp.create_async_database_session() as session:

try:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
async_redis = await init_async_redis(rp.redis_settings)

lock_name = TASK_RUNNER_LOCK.format(1)
# By default, allow task 5 minutes before removes lock to allow another run. Inside the task itself we can
Expand Down
Loading