Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
488fa2b
Wrapp the Redis client with retry mechanism to improve disaster recovery
alex-zaikman Mar 27, 2025
9585ad3
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
9d35a43
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
300c114
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
9fd9b2b
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
d9e9597
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
762e0f0
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
8b3a55d
add-redis-connection-proxy
JKL98ISR Mar 27, 2025
37b8c6f
fix-test
JKL98ISR Apr 2, 2025
439d952
upgrade-redis
JKL98ISR Apr 2, 2025
6c67a73
upgrade-redis
JKL98ISR Apr 2, 2025
162739b
upgrade-redis
JKL98ISR Apr 2, 2025
4f67441
upgrade-redis
JKL98ISR Apr 2, 2025
bb7938b
Merge branch 'main' of https://github.com/deepchecks/monitoring into …
JKL98ISR Apr 2, 2025
ff78fe0
upgrade-redis
JKL98ISR Apr 2, 2025
fc74f55
upgrade-redis
JKL98ISR Apr 2, 2025
f9c3d43
upgrade-redis
JKL98ISR Apr 2, 2025
1f3d02f
isort
JKL98ISR Apr 2, 2025
3e69188
upgrade-redis
JKL98ISR Apr 3, 2025
302c5d6
upgrade-redis
JKL98ISR Apr 3, 2025
a61dfbe
upgrade-redis
JKL98ISR Apr 3, 2025
5061536
upgrade-redis
JKL98ISR Apr 3, 2025
66d80ab
upgrade-redis
JKL98ISR Apr 3, 2025
4b20d9f
upgrade-redis
JKL98ISR Apr 3, 2025
24fb0aa
Update backend/tests/unittests/test_monitor_alert_rules_executor.py
JKL98ISR Apr 3, 2025
902ee18
upgrade-redis
JKL98ISR Apr 3, 2025
ef7c23c
Merge branch 'alex/mon-2671-add-redis-connection-proxy' of https://gi…
JKL98ISR Apr 3, 2025
918fa58
upgrade-redis
JKL98ISR Apr 3, 2025
03b0a3e
upgrade-redis
JKL98ISR Apr 3, 2025
46c1b49
upgrade-redis
JKL98ISR Apr 3, 2025
abf834e
upgrade-redis
JKL98ISR Apr 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/license-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
with:
requirements: "backend/requirements-all.txt"
fail: "Copyleft,Other,Error"
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)'
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|termcolor.*3\.0\.1|deepchecks.*)'
# psycopg2 is LGPL 2
# category_encoders is BSD https://github.com/scikit-learn-contrib/category_encoders/tree/master?tab=BSD-3-Clause-1-ov-file
# attrs is MIT https://github.com/python-attrs/attrs/blob/main/LICENSE
Expand All @@ -64,6 +64,7 @@ jobs:
# torchvision is BSD https://github.com/pytorch/vision/blob/main/LICENSE
# torchaudio is BSD https://github.com/pytorch/audio/blob/main/LICENSE
# terminado is BSD https://github.com/jupyter/terminado/blob/main/LICENSE
# termcolor is MIT https://github.com/termcolor/termcolor/blob/main/COPYING.txt
# orderedmultidict is freeley distributed https://github.com/gruns/orderedmultidict/blob/master/LICENSE.md
- name: Print report
if: ${{ always() }}
Expand Down
8 changes: 6 additions & 2 deletions backend/deepchecks_monitoring/api/v1/data_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ async def log_data_batch(
minute_rate = resources_provider.get_features_control(user).rows_per_minute

# Atomically getting the count and increasing in order to avoid race conditions
curr_count = resources_provider.cache_functions.get_and_incr_user_rate_count(user, time, len(data))
async with resources_provider.cache_functions() as cache_functions:
curr_count = await cache_functions.get_and_incr_user_rate_count(user, time, len(data))
remains = minute_rate - curr_count

# Remains can be negative because we don't check the limit before incrementing
Expand Down Expand Up @@ -140,7 +141,10 @@ async def log_labels(
minute_rate = resources_provider.get_features_control(user).rows_per_minute

# Atomically getting the count and increasing in order to avoid race conditions
curr_count = resources_provider.cache_functions.get_and_incr_user_rate_count(user, time, len(data), is_label=True)
async with resources_provider.cache_functions() as cache_functions:
curr_count = await cache_functions.get_and_incr_user_rate_count(
user, time, len(data), is_label=True
)
remains = minute_rate - curr_count

# Remains can be negative because we don't check the limit before incrementing
Expand Down
30 changes: 15 additions & 15 deletions backend/deepchecks_monitoring/api/v1/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from deepchecks_monitoring.api.v1.alert_rule import AlertRuleSchema
from deepchecks_monitoring.api.v1.check import CheckResultSchema, CheckSchema
from deepchecks_monitoring.config import Settings, Tags
from deepchecks_monitoring.dependencies import AsyncSessionDep, CacheFunctionsDep, ResourcesProviderDep, SettingsDep
from deepchecks_monitoring.logic.cache_functions import CacheFunctions
from deepchecks_monitoring.dependencies import AsyncSessionDep, ResourcesProviderDep, SettingsDep
from deepchecks_monitoring.logic.check_logic import CheckNotebookSchema, MonitorOptions, run_check_per_window_in_range
from deepchecks_monitoring.monitoring_utils import (DataFilterList, ExtendedAsyncSession, IdResponse,
MonitorCheckConfSchema, fetch_or_404, field_length)
Expand Down Expand Up @@ -155,7 +154,6 @@ async def update_monitor(
monitor_id: int,
body: MonitorUpdateSchema,
session: AsyncSession = AsyncSessionDep,
cache_funcs: CacheFunctions = CacheFunctionsDep,
user: User = Depends(CurrentActiveUser()),
resources_provider: ResourcesProvider = ResourcesProviderDep,
):
Expand Down Expand Up @@ -220,7 +218,8 @@ async def update_monitor(
)

# Delete cache
cache_funcs.clear_monitor_cache(user.organization_id, monitor_id)
async with resources_provider.cache_functions() as cache_funcs:
await cache_funcs.clear_monitor_cache(user.organization_id, monitor_id)
update_dict["updated_by"] = user.id
await Monitor.update(session, monitor_id, update_dict)
return Response(status_code=status.HTTP_200_OK)
Expand All @@ -231,12 +230,13 @@ async def delete_monitor(
monitor_id: int,
monitor: Monitor = Depends(Monitor.get_object_from_http_request),
session: AsyncSession = AsyncSessionDep,
cache_funcs: CacheFunctions = CacheFunctionsDep,
resources_provider: ResourcesProvider = ResourcesProviderDep,
user: User = Depends(CurrentActiveUser())
):
"""Delete monitor by id."""
await session.delete(monitor)
cache_funcs.clear_monitor_cache(user.organization_id, monitor_id)
async with resources_provider.cache_functions() as cache_funcs:
await cache_funcs.clear_monitor_cache(user.organization_id, monitor_id)
return Response(status_code=status.HTTP_200_OK)


Expand Down Expand Up @@ -280,7 +280,6 @@ async def run_monitor_lookback(
body: MonitorRunSchema,
monitor: Monitor = Depends(Monitor.get_object_from_http_request),
session: AsyncSession = AsyncSessionDep,
cache_funcs: CacheFunctions = CacheFunctionsDep,
user: User = Depends(CurrentActiveUser()),
resources_provider: ResourcesProvider = ResourcesProviderDep,
):
Expand Down Expand Up @@ -326,11 +325,12 @@ async def run_monitor_lookback(
organization_id=t.cast(int, user.organization_id)
)

return await run_check_per_window_in_range(
monitor.check_id,
session,
options,
monitor_id=monitor_id,
cache_funcs=cache_funcs,
organization_id=user.organization_id,
)
async with resources_provider.cache_functions() as cache_funcs:
return await run_check_per_window_in_range(
monitor.check_id,
session,
options,
monitor_id=monitor_id,
cache_funcs=cache_funcs,
organization_id=user.organization_id,
)
22 changes: 12 additions & 10 deletions backend/deepchecks_monitoring/bgtasks/alert_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@ async def execute_monitor(
# First looking for results in cache if already calculated
cache_results = {}
model_versions_without_cache = []
for model_version in model_versions:
cache_result = resources_provider.cache_functions.get_monitor_cache(
organization_id, model_version.id, monitor_id, start_time, end_time)
if cache_result.found:
cache_results[model_version] = cache_result.value
else:
model_versions_without_cache.append(model_version)
async with resources_provider.cache_functions() as cache_functions:
for model_version in model_versions:
cache_result = await cache_functions.get_monitor_cache(
organization_id, model_version.id, monitor_id, start_time, end_time)
if cache_result.found:
cache_results[model_version] = cache_result.value
else:
model_versions_without_cache.append(model_version)
logger.debug('Cache result: %s', cache_results)

# For model versions without result in cache running calculation
Expand All @@ -181,9 +182,10 @@ async def execute_monitor(

result_per_version = reduce_check_window(result_per_version, options)
# Save to cache
for version, result in result_per_version.items():
resources_provider.cache_functions.set_monitor_cache(
organization_id, version.id, monitor_id, start_time, end_time, result)
async with resources_provider.cache_functions() as cache_functions:
for version, result in result_per_version.items():
await cache_functions.set_monitor_cache(
organization_id, version.id, monitor_id, start_time, end_time, result)

logger.debug('Check execution result: %s', result_per_version)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,38 +48,38 @@ async def run(self, task: 'Task', session: AsyncSession, resources_provider, loc
self.logger.info({'message': 'starting job', 'worker name': str(type(self)),
'task': task.id, 'model version': model_version_id, 'org_id': org_id})

redis = resources_provider.redis_client
invalidation_set_key = get_invalidation_set_key(org_id, model_version_id)

# Query all timestamps
entries = redis.zrange(invalidation_set_key, start=0, end=-1, withscores=True)
if entries:
# Sort timestamps for faster search
invalidation_ts = sorted([int(x[0]) for x in entries])
max_score = max((x[1] for x in entries))

# Iterate all monitors cache keys and check timestamps overlap
monitor_pattern = build_monitor_cache_key(org_id, model_version_id, None, None, None)
keys_to_delete = []
for monitor_cache_key in redis.scan_iter(match=monitor_pattern):
splitted = monitor_cache_key.split(b':')
start_ts, end_ts = int(splitted[4]), int(splitted[5])
# Get first timestamp equal or larger than start_ts
index = bisect.bisect_left(invalidation_ts, start_ts)
# If index is equal to list length, then all timestamps are smaller than start_ts
if index == len(invalidation_ts):
continue
if start_ts <= invalidation_ts[index] < end_ts:
keys_to_delete.append(monitor_cache_key)

pipe = redis.pipeline()
for key in keys_to_delete:
async with resources_provider.get_redis_client() as redis:
invalidation_set_key = get_invalidation_set_key(org_id, model_version_id)

# Query all timestamps
entries = await redis.zrange(invalidation_set_key, start=0, end=-1, withscores=True)
if entries:
# Sort timestamps for faster search
invalidation_ts = sorted([int(x[0]) for x in entries])
max_score = max((x[1] for x in entries))

# Iterate all monitors cache keys and check timestamps overlap
monitor_pattern = build_monitor_cache_key(org_id, model_version_id, None, None, None)
keys_to_delete = []
async for monitor_cache_key in redis.scan_iter(match=monitor_pattern):
splitted = monitor_cache_key.split(b':')
start_ts, end_ts = int(splitted[4]), int(splitted[5])
# Get first timestamp equal or larger than start_ts
index = bisect.bisect_left(invalidation_ts, start_ts)
# If index is equal to list length, then all timestamps are smaller than start_ts
if index == len(invalidation_ts):
continue
if start_ts <= invalidation_ts[index] < end_ts:
keys_to_delete.append(monitor_cache_key)

pipe = redis.pipeline()
# Delete all cache keys - must do in separate deletes since RedisCluster does not support multi-delete
pipe.delete(key)
# Delete all invalidation timestamps by range. if timestamps were updated while running,
# then their score should be larger than max_score, and they won't be deleted
pipe.zremrangebyscore(invalidation_set_key, min=0, max=max_score)
pipe.execute()
for key in keys_to_delete:
await pipe.delete(key)
# Delete all invalidation timestamps by range. if timestamps were updated while running,
# then their score should be larger than max_score, and they won't be deleted
await pipe.zremrangebyscore(invalidation_set_key, min=0, max=max_score)
await pipe.execute()
self.logger.info({'message': 'finished job', 'worker name': str(type(self)),
'task': task.id, 'model version': model_version_id, 'org_id': org_id})

Expand Down
16 changes: 4 additions & 12 deletions backend/deepchecks_monitoring/bgtasks/tasks_queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE
from deepchecks_monitoring.monitoring_utils import configure_logger
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
from deepchecks_monitoring.utils.redis_proxy import RedisProxy

try:
from deepchecks_monitoring import ee
Expand All @@ -50,7 +51,7 @@ class TasksQueuer:
def __init__(
self,
resource_provider: ResourcesProvider,
redis_client: RedisCluster | Redis,
redis_client: RedisCluster | Redis | RedisProxy,
workers: t.List[BackgroundWorker],
logger: logging.Logger,
run_interval: int,
Expand Down Expand Up @@ -152,16 +153,6 @@ class Config:
env_file_encoding = 'utf-8'


async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
redis = RedisCluster.from_url(redis_uri)
await redis.ping()
return redis
except redis_exceptions.RedisClusterException:
return Redis.from_url(redis_uri)


def execute_worker():
"""Execute worker."""

Expand Down Expand Up @@ -195,7 +186,8 @@ async def main():

async with ResourcesProvider(settings) as rp:
async with anyio.create_task_group() as g:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
async_redis = RedisProxy(rp.redis_settings)
await async_redis.init_conn_async()
worker = tasks_queuer.TasksQueuer(rp, async_redis, workers, logger, settings.queuer_run_interval)
g.start_soon(worker.run)

Expand Down
16 changes: 4 additions & 12 deletions backend/deepchecks_monitoring/bgtasks/tasks_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pendulum as pdl
import uvloop
from redis.asyncio import Redis, RedisCluster
from redis.exceptions import LockNotOwnedError, RedisClusterException
from redis.exceptions import LockNotOwnedError
from sqlalchemy import select

from deepchecks_monitoring.bgtasks.alert_task import AlertsTask
Expand All @@ -29,6 +29,7 @@
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK
from deepchecks_monitoring.monitoring_utils import configure_logger
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
from deepchecks_monitoring.utils.redis_proxy import RedisProxy

try:
from deepchecks_monitoring import ee
Expand Down Expand Up @@ -160,16 +161,6 @@ class WorkerSettings(BaseWorkerSettings, Settings):
pass


async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
redis = RedisCluster.from_url(redis_uri)
await redis.ping()
return redis
except RedisClusterException:
return Redis.from_url(redis_uri)


def execute_worker():
"""Execute worker."""

Expand All @@ -189,7 +180,8 @@ async def main():
from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel

async with ResourcesProvider(settings) as rp:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
async_redis = RedisProxy(rp.redis_settings)
await async_redis.init_conn_async()

workers = [
ModelVersionCacheInvalidation(),
Expand Down
2 changes: 2 additions & 0 deletions backend/deepchecks_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class RedisSettings(BaseDeepchecksSettings):
"""Redis settings."""

redis_uri: t.Optional[RedisDsn] = None
stop_after_retries: int = 3 # Number of retries before giving up
wait_between_retries: int = 3 # Time to wait between retries


class Settings(
Expand Down
7 changes: 0 additions & 7 deletions backend/deepchecks_monitoring/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"limit_request_size",
"SettingsDep",
"DataIngestionDep",
"CacheFunctionsDep",
"ResourcesProviderDep"
]

Expand Down Expand Up @@ -61,11 +60,6 @@ def get_data_ingestion_backend(request: fastapi.Request):
return state.data_ingestion_backend


def get_cache_functions(request: fastapi.Request):
state = request.app.state
return state.resources_provider.cache_functions


def get_host(request: fastapi.Request) -> str:
settings = request.app.state.settings
return settings.host
Expand All @@ -78,7 +72,6 @@ def get_resources_provider(request: fastapi.Request) -> "ResourcesProvider":
AsyncSessionDep = fastapi.Depends(get_async_session)
SettingsDep = fastapi.Depends(get_settings)
DataIngestionDep = fastapi.Depends(get_data_ingestion_backend)
CacheFunctionsDep = fastapi.Depends(get_cache_functions)
ResourcesProviderDep = fastapi.Depends(get_resources_provider)


Expand Down
Loading
Loading