Skip to content

Commit bf21e64

Browse files
authored
update-redis-settings (#391)
1 parent 96d7ada commit bf21e64

File tree

8 files changed

+68
-48
lines changed

8 files changed

+68
-48
lines changed

.github/workflows/license-check.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ jobs:
5050
with:
5151
requirements: "backend/requirements-all.txt"
5252
fail: "Copyleft,Other,Error"
53-
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)'
54-
# psycopg2 is LGPL 2
53+
exclude: '(category_encoders.*2\.7\..*|attrs.*25\.3\..*|referencing.*0\.36\..*|envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2025\.1\.31|tqdm.*4\.67\..*|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|termcolor.*3\.0\.1|deepchecks.*)' # psycopg2 is LGPL 2
5554
# category_encoders is BSD https://github.com/scikit-learn-contrib/category_encoders/tree/master?tab=BSD-3-Clause-1-ov-file
5655
# attrs is MIT https://github.com/python-attrs/attrs/blob/main/LICENSE
5756
# referencing is MIT https://github.com/python-jsonschema/referencing?tab=MIT-1-ov-file
@@ -64,6 +63,7 @@ jobs:
6463
# torchvision is BSD https://github.com/pytorch/vision/blob/main/LICENSE
6564
# torchaudio is BSD https://github.com/pytorch/audio/blob/main/LICENSE
6665
# terminado is BSD https://github.com/jupyter/terminado/blob/main/LICENSE
66+
# termcolor is MIT https://github.com/termcolor/termcolor/blob/main/COPYING.txt
6767
# orderedmultidict is freeley distributed https://github.com/gruns/orderedmultidict/blob/master/LICENSE.md
6868
- name: Print report
6969
if: ${{ always() }}

backend/deepchecks_monitoring/bgtasks/tasks_queuer.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"""Contains alert scheduling logic."""
1212
import asyncio
1313
import datetime
14-
import logging.handlers
14+
import logging
1515
import typing as t
1616
from time import perf_counter
1717

@@ -28,11 +28,11 @@
2828
from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent
2929
from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter
3030
from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation
31-
# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker
32-
from deepchecks_monitoring.config import DatabaseSettings, RedisSettings
31+
from deepchecks_monitoring.config import DatabaseSettings
3332
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE
3433
from deepchecks_monitoring.monitoring_utils import configure_logger
3534
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
35+
from deepchecks_monitoring.utils.redis_util import init_async_redis
3636

3737
try:
3838
from deepchecks_monitoring import ee
@@ -136,7 +136,7 @@ async def move_tasks_to_queue(self, session) -> int:
136136
return 0
137137

138138

139-
class WorkerSettings(DatabaseSettings, RedisSettings):
139+
class WorkerSettings(DatabaseSettings):
140140
"""Worker settings."""
141141

142142
logfile: t.Optional[str] = None
@@ -152,16 +152,6 @@ class Config:
152152
env_file_encoding = 'utf-8'
153153

154154

155-
async def init_async_redis(redis_uri):
156-
"""Initialize redis connection."""
157-
try:
158-
redis = RedisCluster.from_url(redis_uri)
159-
await redis.ping()
160-
return redis
161-
except redis_exceptions.RedisClusterException:
162-
return Redis.from_url(redis_uri)
163-
164-
165155
def execute_worker():
166156
"""Execute worker."""
167157

@@ -195,7 +185,7 @@ async def main():
195185

196186
async with ResourcesProvider(settings) as rp:
197187
async with anyio.create_task_group() as g:
198-
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
188+
async_redis = await init_async_redis(rp.redis_settings)
199189
worker = tasks_queuer.TasksQueuer(rp, async_redis, workers, logger, settings.queuer_run_interval)
200190
g.start_soon(worker.run)
201191

backend/deepchecks_monitoring/bgtasks/tasks_runner.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,26 @@
99
# ----------------------------------------------------------------------------
1010
#
1111
"""Contains alert scheduling logic."""
12-
import logging.handlers
12+
import logging
1313
import typing as t
1414

1515
import anyio
1616
import pendulum as pdl
1717
import uvloop
1818
from redis.asyncio import Redis, RedisCluster
19-
from redis.exceptions import LockNotOwnedError, RedisClusterException
19+
from redis.exceptions import LockNotOwnedError
2020
from sqlalchemy import select
2121

2222
from deepchecks_monitoring.bgtasks.alert_task import AlertsTask
2323
from deepchecks_monitoring.bgtasks.delete_db_table_task import DeleteDbTableTask
2424
from deepchecks_monitoring.bgtasks.mixpanel_system_state_event import MixpanelSystemStateEvent
2525
from deepchecks_monitoring.bgtasks.model_data_ingestion_alerter import ModelDataIngestionAlerter
2626
from deepchecks_monitoring.bgtasks.model_version_cache_invalidation import ModelVersionCacheInvalidation
27-
# from deepchecks_monitoring.bgtasks.model_version_topic_delete import ModelVersionTopicDeletionWorker
2827
from deepchecks_monitoring.config import Settings
2928
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK
3029
from deepchecks_monitoring.monitoring_utils import configure_logger
3130
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
31+
from deepchecks_monitoring.utils.redis_util import init_async_redis
3232

3333
try:
3434
from deepchecks_monitoring import ee
@@ -160,16 +160,6 @@ class WorkerSettings(BaseWorkerSettings, Settings):
160160
pass
161161

162162

163-
async def init_async_redis(redis_uri):
164-
"""Initialize redis connection."""
165-
try:
166-
redis = RedisCluster.from_url(redis_uri)
167-
await redis.ping()
168-
return redis
169-
except RedisClusterException:
170-
return Redis.from_url(redis_uri)
171-
172-
173163
def execute_worker():
174164
"""Execute worker."""
175165

@@ -189,7 +179,7 @@ async def main():
189179
from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel
190180

191181
async with ResourcesProvider(settings) as rp:
192-
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
182+
async_redis = await init_async_redis(rp.redis_settings)
193183

194184
workers = [
195185
ModelVersionCacheInvalidation(),

backend/deepchecks_monitoring/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ class RedisSettings(BaseDeepchecksSettings):
137137
"""Redis settings."""
138138

139139
redis_uri: t.Optional[RedisDsn] = None
140+
decode_responses: bool = True
141+
socket_connect_timeout: int = 5
142+
socket_timeout: int = 5
143+
socket_keepalive: bool = True
144+
retry_attempts: int = 6
145+
cluster_error_retry_attempts: int = 2
140146

141147

142148
class Settings(

backend/deepchecks_monitoring/resources.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from contextlib import asynccontextmanager, contextmanager
1515

1616
import httpx
17+
import redis.exceptions as redis_exceptions
1718
import tenacity
1819
from aiokafka import AIOKafkaProducer
1920
from authlib.integrations.starlette_client import OAuth
@@ -22,7 +23,6 @@
2223
from kafka.errors import KafkaError, TopicAlreadyExistsError
2324
from redis.client import Redis
2425
from redis.cluster import RedisCluster
25-
from redis.exceptions import RedisClusterException
2626
from sqlalchemy import select
2727
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
2828
from sqlalchemy.future.engine import Engine, create_engine
@@ -39,6 +39,7 @@
3939
from deepchecks_monitoring.utils import database
4040
from deepchecks_monitoring.utils.mixpanel import BaseEvent as BaseMixpanelEvent
4141
from deepchecks_monitoring.utils.mixpanel import MixpanelEventReporter
42+
from deepchecks_monitoring.utils.redis_util import create_settings_dict
4243

4344
__all__ = ["ResourcesProvider"]
4445

@@ -291,10 +292,15 @@ def get_kafka_admin(self) -> t.Generator[KafkaAdminClient, None, None]:
291292
def redis_client(self) -> t.Optional[Redis]:
292293
"""Return redis client if redis defined, else None."""
293294
if self._redis_client is None and self.redis_settings.redis_uri:
295+
settings = create_settings_dict(self.redis_settings)
294296
try:
295-
self._redis_client = RedisCluster.from_url(self.redis_settings.redis_uri)
296-
except RedisClusterException:
297-
self._redis_client = Redis.from_url(self.redis_settings.redis_uri)
297+
self._redis_client = RedisCluster.from_url(
298+
cluster_error_retry_attempts=self.redis_settings.cluster_error_retry_attempts,
299+
**settings
300+
)
301+
except redis_exceptions.RedisClusterException:
302+
self._redis_client = Redis.from_url(**settings)
303+
298304
return self._redis_client
299305

300306
@property

backend/deepchecks_monitoring/schema_models/column_type.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ def to_sqlalchemy_type(self):
6262
}
6363
return types_map[self]
6464

65+
def is_indexed(self):
66+
return self in {ColumnType.NUMERIC, ColumnType.INTEGER, ColumnType.BIGINT, ColumnType.DATETIME}
67+
6568
def to_json_schema_type(self, nullable=False, min_items: int = None, max_items: int = None):
6669
"""Return the json type of the column type."""
6770
types_map = {
@@ -170,7 +173,7 @@ def column_types_to_table_columns(column_types: t.Dict[str, ColumnType], primary
170173
sa.Column(
171174
name,
172175
data_type.to_sqlalchemy_type(),
173-
index=True,
176+
index=data_type.is_indexed(),
174177
primary_key=(name == primary_key)
175178
)
176179
for name, data_type in column_types.items()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from redis.asyncio import Redis as AsyncRedis
2+
from redis.asyncio import RedisCluster as AsyncRedisCluster
3+
from redis.backoff import ExponentialBackoff
4+
from redis.exceptions import RedisClusterException
5+
from redis.retry import Retry
6+
7+
from deepchecks_monitoring.config import RedisSettings
8+
9+
10+
def create_settings_dict(redis_settings: RedisSettings):
11+
"""Create redis settings param dict"""
12+
13+
return dict(
14+
url=redis_settings.redis_uri,
15+
decode_responses=redis_settings.decode_responses,
16+
socket_connect_timeout=redis_settings.socket_connect_timeout,
17+
socket_timeout=redis_settings.socket_timeout,
18+
socket_keepalive=redis_settings.socket_keepalive,
19+
retry=Retry(ExponentialBackoff(), redis_settings.retry_attempts),
20+
)
21+
22+
23+
async def init_async_redis(redis_settings: RedisSettings):
24+
"""Initialize redis connection."""
25+
settings = create_settings_dict(redis_settings)
26+
try:
27+
redis = AsyncRedisCluster.from_url(
28+
cluster_error_retry_attempts=redis_settings.cluster_error_retry_attempts,
29+
**settings
30+
)
31+
await redis.ping()
32+
return redis
33+
except RedisClusterException:
34+
return AsyncRedis.from_url(**settings)

backend/dev_utils/run_task_directly.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,20 @@
1212
import asyncio
1313

1414
import dotenv
15-
from redis.asyncio import Redis, RedisCluster
16-
from redis.exceptions import LockNotOwnedError, RedisClusterException
15+
from redis.exceptions import LockNotOwnedError
1716
from sqlalchemy import create_engine, select
1817

1918
from deepchecks_monitoring.ee.bgtasks import ObjectStorageIngestor
2019
from deepchecks_monitoring.ee.resources import ResourcesProvider
2120
from deepchecks_monitoring.logic.keys import TASK_RUNNER_LOCK
2221
from deepchecks_monitoring.public_models import Task
22+
from deepchecks_monitoring.utils.redis_util import init_async_redis
2323

2424
# Task class you want to run
2525
TASK_CLASS = ObjectStorageIngestor
2626
# The task name you want to run (need to be exists in DB, we take the last one ordered by id desc)
2727
BG_WORKER_TASK = 'object_storage_ingestion'
2828

29-
async def init_async_redis(redis_uri):
30-
"""Initialize redis connection."""
31-
try:
32-
redis = RedisCluster.from_url(redis_uri)
33-
await redis.ping()
34-
return redis
35-
except RedisClusterException:
36-
return Redis.from_url(redis_uri)
37-
3829
async def run_it():
3930
if path := dotenv.find_dotenv(usecwd=True):
4031
dotenv.load_dotenv(dotenv_path=path)
@@ -49,7 +40,7 @@ async def run_it():
4940
async with rp.create_async_database_session() as session:
5041

5142
try:
52-
async_redis = await init_async_redis(rp.redis_settings.redis_uri)
43+
async_redis = await init_async_redis(rp.redis_settings)
5344

5445
lock_name = TASK_RUNNER_LOCK.format(1)
5546
# By default, allow task 5 minutes before removes lock to allow another run. Inside the task itself we can

0 commit comments

Comments
 (0)