Skip to content

Commit e170e3c

Browse files
committed
BrokerMetaData
1 parent bbe6266 commit e170e3c

File tree

7 files changed

+41
-73
lines changed

7 files changed

+41
-73
lines changed

scheduler/admin/task_models.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import redis
2-
import valkey
31
from django.contrib import admin, messages
42
from django.contrib.contenttypes.admin import GenericStackedInline
53
from django.utils.translation import gettext_lazy as _
64

75
from scheduler import tools
6+
from scheduler.broker_types import ConnectionErrorTypes
87
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
98
from scheduler.settings import SCHEDULER_CONFIG, logger
109
from scheduler.tools import get_job_executions_for_task
@@ -186,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
186185
obj = self.get_object(request, object_id)
187186
try:
188187
execution_list = get_job_executions_for_task(obj.queue, obj)
189-
except (redis.ConnectionError, valkey.ConnectionError) as e:
188+
except ConnectionErrorTypes as e:
190189
logger.warn(f"Could not get job executions: {e}")
191190
execution_list = list()
192191
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)

scheduler/broker_types.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import Union, Dict, Tuple, Type
2+
3+
import redis
4+
5+
try:
6+
import valkey
7+
except ImportError:
8+
valkey = redis
9+
valkey.Valkey = redis.Redis
10+
valkey.StrictValkey = redis.StrictRedis
11+
12+
from scheduler.settings import Broker
13+
14+
ConnectionErrorTypes = (redis.ConnectionError, valkey.ConnectionError)
15+
ResponseErrorTypes = (redis.ResponseError, valkey.ResponseError)
16+
ConnectionType = Union[redis.Redis, valkey.Valkey]
17+
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
18+
SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]
19+
20+
BrokerMetaData: Dict[Tuple[Broker, bool], Tuple[Type[ConnectionType], Type[SentinelType], str]] = {
21+
# Map of (Broker, Strict flag) => Connection Class, Sentinel Class, SSL Connection Prefix
22+
(Broker.REDIS, False): (redis.Redis, redis.sentinel.Sentinel, "rediss"),
23+
(Broker.VALKEY, False): (valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"),
24+
(Broker.REDIS, True): (redis.StrictRedis, redis.sentinel.Sentinel, "rediss"),
25+
(Broker.VALKEY, True): (valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"),
26+
}

scheduler/connection_types.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

scheduler/management/commands/rqworker.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import sys
44

55
import click
6-
import redis
7-
import valkey
86
from django.core.management.base import BaseCommand
97
from django.db import connections
108
from rq.logutils import setup_loghandlers
119

10+
from scheduler.broker_types import ConnectionErrorTypes
1211
from scheduler.rq_classes import register_sentry
1312
from scheduler.tools import create_worker
1413

@@ -136,6 +135,6 @@ def handle(self, **options):
136135
logging_level=log_level,
137136
max_jobs=options["max_jobs"],
138137
)
139-
except (redis.ConnectionError, valkey.ConnectionError) as e:
138+
except ConnectionErrorTypes as e:
140139
click.echo(str(e), err=True)
141140
sys.exit(1)

scheduler/queues.py

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
from typing import List, Dict, Set
22

3-
import redis
4-
5-
from .connection_types import RedisSentinel, ValkeySentinel, BrokerConnectionClass
3+
from .broker_types import ConnectionErrorTypes, BrokerMetaData
64
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
75
from .settings import SCHEDULER_CONFIG
86
from .settings import logger, Broker
97

10-
11-
try:
12-
from valkey import exceptions
13-
except ImportError:
14-
exceptions = ""
15-
exceptions.ConnectionError = redis.ConnectionError
16-
17-
ConnectionErrors = (redis.ConnectionError, exceptions.ConnectionError)
18-
198
_CONNECTION_PARAMS = {
209
"URL",
2110
"DB",
@@ -36,18 +25,6 @@ class QueueNotFoundError(Exception):
3625
pass
3726

3827

39-
ssl_url_protocol = {
40-
"valkey": "valkeys",
41-
"redis": "rediss",
42-
"fakeredis": "rediss"
43-
}
44-
45-
sentinel_broker = {
46-
"valkey": ValkeySentinel,
47-
"redis": RedisSentinel,
48-
}
49-
50-
5128
def _get_broker_connection(config, use_strict_broker=False):
5229
"""
5330
Returns a redis connection from a connection config
@@ -57,10 +34,11 @@ def _get_broker_connection(config, use_strict_broker=False):
5734

5835
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
5936
else:
60-
broker_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_broker)]
37+
broker_cls = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][0]
6138
logger.debug(f"Getting connection for {config}")
6239
if "URL" in config:
63-
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol[SCHEDULER_CONFIG.BROKER.value]}://"):
40+
ssl_url_protocol = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][2]
41+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol}://"):
6442
return broker_cls.from_url(
6543
config["URL"],
6644
db=config.get("DB"),
@@ -83,9 +61,8 @@ def _get_broker_connection(config, use_strict_broker=False):
8361
}
8462
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
8563
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
86-
sentinel = sentinel_broker[SCHEDULER_CONFIG.BROKER.value](
87-
config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs
88-
)
64+
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][1]
65+
sentinel = SentinelClass(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
8966
return sentinel.master_for(
9067
service_name=config["MASTER_NAME"],
9168
redis_class=broker_cls,
@@ -138,7 +115,7 @@ def get_all_workers() -> Set[DjangoWorker]:
138115
try:
139116
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
140117
workers_set.update(curr_workers)
141-
except ConnectionErrors as e:
118+
except ConnectionErrorTypes as e:
142119
logger.error(f"Could not connect for queue {queue_name}: {e}")
143120
return workers_set
144121

scheduler/rq_classes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from rq.worker import WorkerStatus
2323

2424
from scheduler import settings
25-
from scheduler.connection_types import PipelineType, ConnectionType
25+
from scheduler.broker_types import PipelineType, ConnectionType
2626

2727
MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]
2828

scheduler/views.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,13 @@
1313
from django.views.decorators.cache import never_cache
1414
from redis.exceptions import ResponseError
1515

16-
17-
from .queues import get_all_workers, get_connection, QueueNotFoundError, ConnectionErrors
16+
from .broker_types import ConnectionErrorTypes
17+
from .queues import get_all_workers, get_connection, QueueNotFoundError
1818
from .queues import get_queue as get_queue_base
1919
from .rq_classes import JobExecution, DjangoWorker, DjangoQueue, InvalidJobOperation
2020
from .settings import SCHEDULER_CONFIG, logger
2121

2222

23-
try:
24-
from valkey import exceptions
25-
except ImportError:
26-
exceptions = ""
27-
exceptions.ResponseError = ResponseError
28-
29-
ResponseErrors = (ResponseError, exceptions.ResponseError)
30-
31-
3223
def get_queue(queue_name: str) -> DjangoQueue:
3324
try:
3425
return get_queue_base(queue_name)
@@ -111,7 +102,7 @@ def get_statistics(run_maintenance_tasks=False):
111102
canceled_jobs=len(queue.canceled_job_registry),
112103
)
113104
queues.append(queue_data)
114-
except ConnectionErrors as e:
105+
except ConnectionErrorTypes as e:
115106
logger.error(f"Could not connect for queue {queue_name}: {e}")
116107
continue
117108

0 commit comments

Comments
 (0)