Skip to content

Commit 78ba4e0

Browse files
committed
made broker connection more abstract to support more servers easier
1 parent 4c32f1d commit 78ba4e0

File tree

1 file changed

+33
-16
lines changed

1 file changed

+33
-16
lines changed

scheduler/queues.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
from typing import List, Dict, Set
22

33
import redis
4-
import valkey
54

6-
from .connection_types import RedisSentinel, BrokerConnectionClass
5+
from .connection_types import RedisSentinel, ValkeySentinel, BrokerConnectionClass
76
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
87
from .settings import SCHEDULER_CONFIG
98
from .settings import logger, Broker
109

10+
ConnectionErrors = {redis.ConnectionError}
11+
12+
try:
13+
from valkey import exceptions
14+
ConnectionErrors.add(exceptions.ConnectionError)
15+
except ImportError:
16+
pass
17+
1118
_CONNECTION_PARAMS = {
1219
"URL",
1320
"DB",
@@ -27,32 +34,42 @@
2734
class QueueNotFoundError(Exception):
2835
pass
2936

37+
ssl_url_protocol = {
38+
"valkey": "valkeys",
39+
"redis": "rediss",
40+
"fakeredis": "rediss"
41+
}
42+
43+
sentinel_broker = {
44+
"valkey": ValkeySentinel,
45+
"redis": RedisSentinel,
46+
}
3047

31-
def _get_redis_connection(config, use_strict_redis=False):
48+
def _get_broker_connection(config, use_strict_broker=False):
3249
"""
3350
Returns a redis connection from a connection config
3451
"""
3552
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
3653
import fakeredis
3754

38-
redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
55+
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
3956
else:
40-
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
57+
broker_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_broker)]
4158
logger.debug(f"Getting connection for {config}")
4259
if "URL" in config:
43-
if config.get("SSL") or config.get("URL").startswith("rediss://"):
44-
return redis_cls.from_url(
60+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol[SCHEDULER_CONFIG.BROKER.value]}://"):
61+
return broker_cls.from_url(
4562
config["URL"],
4663
db=config.get("DB"),
4764
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
4865
)
4966
else:
50-
return redis_cls.from_url(
67+
return broker_cls.from_url(
5168
config["URL"],
5269
db=config.get("DB"),
5370
)
5471
if "UNIX_SOCKET_PATH" in config:
55-
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
72+
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
5673

5774
if "SENTINELS" in config:
5875
connection_kwargs = {
@@ -63,13 +80,13 @@ def _get_redis_connection(config, use_strict_redis=False):
6380
}
6481
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
6582
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
66-
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
83+
sentinel = sentinel_broker[SCHEDULER_CONFIG.BROKER.value](config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
6784
return sentinel.master_for(
6885
service_name=config["MASTER_NAME"],
69-
redis_class=redis_cls,
86+
redis_class=broker_cls,
7087
)
7188

72-
return redis_cls(
89+
return broker_cls(
7390
host=config["HOST"],
7491
port=config["PORT"],
7592
db=config.get("DB", 0),
@@ -82,8 +99,8 @@ def _get_redis_connection(config, use_strict_redis=False):
8299

83100

84101
def get_connection(queue_settings, use_strict_redis=False):
85-
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86-
return _get_redis_connection(queue_settings, use_strict_redis)
102+
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
103+
return _get_broker_connection(queue_settings, use_strict_redis)
87104

88105

89106
def get_queue(
@@ -116,7 +133,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116133
try:
117134
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
118135
workers_set.update(curr_workers)
119-
except (redis.ConnectionError, valkey.ConnectionError) as e:
136+
except ConnectionErrors as e:
120137
logger.error(f"Could not connect for queue {queue_name}: {e}")
121138
return workers_set
122139

@@ -142,7 +159,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142159
for name in queue_names[1:]:
143160
if not _queues_share_connection_params(queue_params, QUEUES[name]):
144161
raise ValueError(
145-
f'Queues must have the same redis connection. "{name}" and'
162+
f'Queues must have the same broker connection. "{name}" and'
146163
f' "{queue_names[0]}" have different connections'
147164
)
148165
queue = get_queue(name, **kwargs)

0 commit comments

Comments
 (0)