Skip to content

Commit 3bc1094

Browse files
committed
made broker connection more abstract to support more servers easier
1 parent 75e4828 commit 3bc1094

File tree

1 file changed

+37
-16
lines changed

1 file changed

+37
-16
lines changed

scheduler/queues.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
from typing import List, Dict, Set
22

33
import redis
4-
import valkey
54

6-
from .connection_types import RedisSentinel, BrokerConnectionClass
5+
from .connection_types import RedisSentinel, ValkeySentinel, BrokerConnectionClass
76
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
87
from .settings import SCHEDULER_CONFIG
98
from .settings import logger, Broker
109

10+
ConnectionErrors = {redis.ConnectionError}
11+
12+
try:
13+
from valkey import exceptions
14+
ConnectionErrors.add(exceptions.ConnectionError)
15+
except ImportError:
16+
pass
17+
1118
_CONNECTION_PARAMS = {
1219
"URL",
1320
"DB",
@@ -28,31 +35,43 @@ class QueueNotFoundError(Exception):
2835
pass
2936

3037

31-
def _get_redis_connection(config, use_strict_redis=False):
38+
ssl_url_protocol = {
39+
"valkey": "valkeys",
40+
"redis": "rediss",
41+
"fakeredis": "rediss"
42+
}
43+
44+
sentinel_broker = {
45+
"valkey": ValkeySentinel,
46+
"redis": RedisSentinel,
47+
}
48+
49+
50+
def _get_broker_connection(config, use_strict_broker=False):
3251
"""
3352
Returns a redis connection from a connection config
3453
"""
3554
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
3655
import fakeredis
3756

38-
redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
57+
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
3958
else:
40-
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
59+
broker_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_broker)]
4160
logger.debug(f"Getting connection for {config}")
4261
if "URL" in config:
43-
if config.get("SSL") or config.get("URL").startswith("rediss://"):
44-
return redis_cls.from_url(
62+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol[SCHEDULER_CONFIG.BROKER.value]}://"):
63+
return broker_cls.from_url(
4564
config["URL"],
4665
db=config.get("DB"),
4766
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
4867
)
4968
else:
50-
return redis_cls.from_url(
69+
return broker_cls.from_url(
5170
config["URL"],
5271
db=config.get("DB"),
5372
)
5473
if "UNIX_SOCKET_PATH" in config:
55-
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
74+
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
5675

5776
if "SENTINELS" in config:
5877
connection_kwargs = {
@@ -63,13 +82,15 @@ def _get_redis_connection(config, use_strict_redis=False):
6382
}
6483
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
6584
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
66-
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
85+
sentinel = sentinel_broker[SCHEDULER_CONFIG.BROKER.value](
86+
config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs
87+
)
6788
return sentinel.master_for(
6889
service_name=config["MASTER_NAME"],
69-
redis_class=redis_cls,
90+
redis_class=broker_cls,
7091
)
7192

72-
return redis_cls(
93+
return broker_cls(
7394
host=config["HOST"],
7495
port=config["PORT"],
7596
db=config.get("DB", 0),
@@ -82,8 +103,8 @@ def _get_redis_connection(config, use_strict_redis=False):
82103

83104

84105
def get_connection(queue_settings, use_strict_redis=False):
85-
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86-
return _get_redis_connection(queue_settings, use_strict_redis)
106+
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
107+
return _get_broker_connection(queue_settings, use_strict_redis)
87108

88109

89110
def get_queue(
@@ -116,7 +137,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116137
try:
117138
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
118139
workers_set.update(curr_workers)
119-
except (redis.ConnectionError, valkey.ConnectionError) as e:
140+
except ConnectionErrors as e:
120141
logger.error(f"Could not connect for queue {queue_name}: {e}")
121142
return workers_set
122143

@@ -142,7 +163,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142163
for name in queue_names[1:]:
143164
if not _queues_share_connection_params(queue_params, QUEUES[name]):
144165
raise ValueError(
145-
f'Queues must have the same redis connection. "{name}" and'
166+
f'Queues must have the same broker connection. "{name}" and'
146167
f' "{queue_names[0]}" have different connections'
147168
)
148169
queue = get_queue(name, **kwargs)

0 commit comments

Comments
 (0)