Skip to content

Commit aeddf9d

Browse files
committed
Merge branch 'master' into unify-models
# Conflicts: # docs/changelog.md # pyproject.toml
2 parents 54bdf64 + 957d4e1 commit aeddf9d

File tree

12 files changed

+305
-293
lines changed

12 files changed

+305
-293
lines changed

.github/workflows/test.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ jobs:
8282
run: |
8383
python -m pip --quiet install poetry
8484
echo "$HOME/.poetry/bin" >> $GITHUB_PATH
85-
poetry install -E yaml
85+
if [ ${{ matrix.broker == 'valkey' }} == true ]; then
86+
additional_args="-E valkey"
87+
fi
88+
poetry install -E yaml $additional_args
8689
poetry run pip install django==${{ matrix.django-version }}
8790
8891
- name: Get version

docs/changelog.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# Changelog
22

3-
43
## v2.2.0 🌈
54

65
### 🚀 Features
@@ -10,6 +9,11 @@
109
- `Task` model has a `task_type` field to differentiate between the types of tasks.
1110
- Old tasks in the database will be migrated to the new `Task` model automatically.
1211

12+
## v2.1.1 🌈
13+
14+
### 🐛 Bug Fixes
15+
16+
- Support for valkey sentinel configuration @amirreza8002 (#191)
1317

1418
## v2.1.0 🌈
1519

docs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
mkdocs==1.6.1
2-
mkdocs-material==9.5.40
2+
mkdocs-material==9.5.44

poetry.lock

Lines changed: 239 additions & 238 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ packages = [
88
{ include = "scheduler" },
99
]
1010
version = "2.2.0"
11-
description = "An async job scheduler for django using redis"
11+
description = "An async job scheduler for django using redis/valkey brokers"
1212
readme = "README.md"
13-
keywords = ["redis", "django", "background-jobs", "job-queue", "task-queue", "redis-queue", "scheduled-jobs"]
13+
keywords = ["redis", "valkey", "django", "background-jobs", "job-queue", "task-queue", "redis-queue", "scheduled-jobs"]
1414
authors = [
1515
"Daniel Moran <[email protected]>",
1616
]
@@ -34,7 +34,7 @@ classifiers = [
3434
'Framework :: Django :: 5.1',
3535
]
3636
homepage = "https://github.com/django-commons/django-tasks-scheduler"
37-
documentation = "https://django-tasks-scheduler.readthedocs.io/"
37+
documentation = "https://django-tasks-scheduler.readthedocs.io/en/latest/"
3838

3939
[tool.poetry.urls]
4040
"Bug Tracker" = "https://github.com/django-commons/django-tasks-scheduler/issues"

scheduler/admin/task_models.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import redis
2-
import valkey
31
from django.contrib import admin, messages
42
from django.contrib.contenttypes.admin import GenericStackedInline
53
from django.utils.translation import gettext_lazy as _
64

75
from scheduler import tools
6+
from scheduler.broker_types import ConnectionErrorTypes
87
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
98
from scheduler.settings import SCHEDULER_CONFIG, logger
109
from scheduler.tools import get_job_executions_for_task
@@ -186,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
186185
obj = self.get_object(request, object_id)
187186
try:
188187
execution_list = get_job_executions_for_task(obj.queue, obj)
189-
except (redis.ConnectionError, valkey.ConnectionError) as e:
188+
except ConnectionErrorTypes as e:
190189
logger.warn(f"Could not get job executions: {e}")
191190
execution_list = list()
192191
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)

scheduler/broker_types.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# This is a helper module to obfuscate types used by different broker implementations.
2+
from typing import Union, Dict, Tuple, Type
3+
4+
import redis
5+
6+
try:
7+
import valkey
8+
except ImportError:
9+
valkey = redis
10+
valkey.Valkey = redis.Redis
11+
valkey.StrictValkey = redis.StrictRedis
12+
13+
from scheduler.settings import Broker
14+
15+
ConnectionErrorTypes = (redis.ConnectionError, valkey.ConnectionError)
16+
ResponseErrorTypes = (redis.ResponseError, valkey.ResponseError)
17+
ConnectionType = Union[redis.Redis, valkey.Valkey]
18+
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
19+
SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]
20+
21+
BrokerMetaData: Dict[Tuple[Broker, bool], Tuple[Type[ConnectionType], Type[SentinelType], str]] = {
22+
# Map of (Broker, Strict flag) => Connection Class, Sentinel Class, SSL Connection Prefix
23+
(Broker.REDIS, False): (redis.Redis, redis.sentinel.Sentinel, "rediss"),
24+
(Broker.VALKEY, False): (valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"),
25+
(Broker.REDIS, True): (redis.StrictRedis, redis.sentinel.Sentinel, "rediss"),
26+
(Broker.VALKEY, True): (valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"),
27+
}

scheduler/connection_types.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

scheduler/management/commands/rqworker.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import sys
44

55
import click
6-
import redis
7-
import valkey
86
from django.core.management.base import BaseCommand
97
from django.db import connections
108
from rq.logutils import setup_loghandlers
119

10+
from scheduler.broker_types import ConnectionErrorTypes
1211
from scheduler.rq_classes import register_sentry
1312
from scheduler.tools import create_worker
1413

@@ -133,6 +132,6 @@ def handle(self, **options):
133132
logging_level=log_level,
134133
max_jobs=options["max_jobs"],
135134
)
136-
except (redis.ConnectionError, valkey.ConnectionError) as e:
135+
except ConnectionErrorTypes as e:
137136
click.echo(str(e), err=True)
138137
sys.exit(1)

scheduler/queues.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
from typing import List, Dict, Set
22

3-
import redis
4-
import valkey
5-
6-
from .connection_types import RedisSentinel, BrokerConnectionClass
3+
from .broker_types import ConnectionErrorTypes, BrokerMetaData
74
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
85
from .settings import SCHEDULER_CONFIG
96
from .settings import logger, Broker
@@ -28,31 +25,32 @@ class QueueNotFoundError(Exception):
2825
pass
2926

3027

31-
def _get_redis_connection(config, use_strict_redis=False):
28+
def _get_broker_connection(config, use_strict_broker=False):
3229
"""
3330
Returns a redis connection from a connection config
3431
"""
3532
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
3633
import fakeredis
3734

38-
redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
35+
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
3936
else:
40-
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
37+
broker_cls = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][0]
4138
logger.debug(f"Getting connection for {config}")
4239
if "URL" in config:
43-
if config.get("SSL") or config.get("URL").startswith("rediss://"):
44-
return redis_cls.from_url(
40+
ssl_url_protocol = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][2]
41+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol}://"):
42+
return broker_cls.from_url(
4543
config["URL"],
4644
db=config.get("DB"),
4745
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
4846
)
4947
else:
50-
return redis_cls.from_url(
48+
return broker_cls.from_url(
5149
config["URL"],
5250
db=config.get("DB"),
5351
)
5452
if "UNIX_SOCKET_PATH" in config:
55-
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
53+
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
5654

5755
if "SENTINELS" in config:
5856
connection_kwargs = {
@@ -63,13 +61,14 @@ def _get_redis_connection(config, use_strict_redis=False):
6361
}
6462
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
6563
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
66-
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
64+
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][1]
65+
sentinel = SentinelClass(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
6766
return sentinel.master_for(
6867
service_name=config["MASTER_NAME"],
69-
redis_class=redis_cls,
68+
redis_class=broker_cls,
7069
)
7170

72-
return redis_cls(
71+
return broker_cls(
7372
host=config["HOST"],
7473
port=config["PORT"],
7574
db=config.get("DB", 0),
@@ -82,8 +81,8 @@ def _get_redis_connection(config, use_strict_redis=False):
8281

8382

8483
def get_connection(queue_settings, use_strict_redis=False):
85-
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86-
return _get_redis_connection(queue_settings, use_strict_redis)
84+
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
85+
return _get_broker_connection(queue_settings, use_strict_redis)
8786

8887

8988
def get_queue(
@@ -116,7 +115,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116115
try:
117116
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
118117
workers_set.update(curr_workers)
119-
except (redis.ConnectionError, valkey.ConnectionError) as e:
118+
except ConnectionErrorTypes as e:
120119
logger.error(f"Could not connect for queue {queue_name}: {e}")
121120
return workers_set
122121

@@ -142,7 +141,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142141
for name in queue_names[1:]:
143142
if not _queues_share_connection_params(queue_params, QUEUES[name]):
144143
raise ValueError(
145-
f'Queues must have the same redis connection. "{name}" and'
144+
f'Queues must have the same broker connection. "{name}" and'
146145
f' "{queue_names[0]}" have different connections'
147146
)
148147
queue = get_queue(name, **kwargs)

0 commit comments

Comments
 (0)