Skip to content

Commit e60f40f

Browse files
committed
made broker connection more abstract to support more servers easier
1 parent 11ee71e commit e60f40f

File tree

1 file changed

+38
-16
lines changed

1 file changed

+38
-16
lines changed

scheduler/queues.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
from typing import List, Dict, Set
22

33
import redis
4-
import valkey
54

6-
from .connection_types import RedisSentinel, BrokerConnectionClass
5+
from .connection_types import RedisSentinel, ValkeySentinel, BrokerConnectionClass
76
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
87
from .settings import SCHEDULER_CONFIG
98
from .settings import logger, Broker
109

10+
11+
try:
12+
from valkey import exceptions
13+
except ImportError:
14+
exceptions = ""
15+
exceptions.ConnectionError = redis.ConnectionError
16+
17+
ConnectionErrors = (redis.ConnectionError, exceptions.ConnectionError)
18+
1119
_CONNECTION_PARAMS = {
1220
"URL",
1321
"DB",
@@ -28,31 +36,43 @@ class QueueNotFoundError(Exception):
2836
pass
2937

3038

31-
def _get_redis_connection(config, use_strict_redis=False):
39+
ssl_url_protocol = {
40+
"valkey": "valkeys",
41+
"redis": "rediss",
42+
"fakeredis": "rediss"
43+
}
44+
45+
sentinel_broker = {
46+
"valkey": ValkeySentinel,
47+
"redis": RedisSentinel,
48+
}
49+
50+
51+
def _get_broker_connection(config, use_strict_broker=False):
3252
"""
3353
Returns a redis connection from a connection config
3454
"""
3555
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
3656
import fakeredis
3757

38-
redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
58+
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
3959
else:
40-
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
60+
broker_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_broker)]
4161
logger.debug(f"Getting connection for {config}")
4262
if "URL" in config:
43-
if config.get("SSL") or config.get("URL").startswith("rediss://"):
44-
return redis_cls.from_url(
63+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol[SCHEDULER_CONFIG.BROKER.value]}://"):
64+
return broker_cls.from_url(
4565
config["URL"],
4666
db=config.get("DB"),
4767
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
4868
)
4969
else:
50-
return redis_cls.from_url(
70+
return broker_cls.from_url(
5171
config["URL"],
5272
db=config.get("DB"),
5373
)
5474
if "UNIX_SOCKET_PATH" in config:
55-
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
75+
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
5676

5777
if "SENTINELS" in config:
5878
connection_kwargs = {
@@ -63,13 +83,15 @@ def _get_redis_connection(config, use_strict_redis=False):
6383
}
6484
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
6585
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
66-
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
86+
sentinel = sentinel_broker[SCHEDULER_CONFIG.BROKER.value](
87+
config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs
88+
)
6789
return sentinel.master_for(
6890
service_name=config["MASTER_NAME"],
69-
redis_class=redis_cls,
91+
redis_class=broker_cls,
7092
)
7193

72-
return redis_cls(
94+
return broker_cls(
7395
host=config["HOST"],
7496
port=config["PORT"],
7597
db=config.get("DB", 0),
@@ -82,8 +104,8 @@ def _get_redis_connection(config, use_strict_redis=False):
82104

83105

84106
def get_connection(queue_settings, use_strict_redis=False):
85-
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86-
return _get_redis_connection(queue_settings, use_strict_redis)
107+
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
108+
return _get_broker_connection(queue_settings, use_strict_redis)
87109

88110

89111
def get_queue(
@@ -116,7 +138,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116138
try:
117139
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
118140
workers_set.update(curr_workers)
119-
except (redis.ConnectionError, valkey.ConnectionError) as e:
141+
except ConnectionErrors as e:
120142
logger.error(f"Could not connect for queue {queue_name}: {e}")
121143
return workers_set
122144

@@ -142,7 +164,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142164
for name in queue_names[1:]:
143165
if not _queues_share_connection_params(queue_params, QUEUES[name]):
144166
raise ValueError(
145-
f'Queues must have the same redis connection. "{name}" and'
167+
f'Queues must have the same broker connection. "{name}" and'
146168
f' "{queue_names[0]}" have different connections'
147169
)
148170
queue = get_queue(name, **kwargs)

0 commit comments

Comments
 (0)