Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ jobs:
run: |
python -m pip --quiet install poetry
echo "$HOME/.poetry/bin" >> $GITHUB_PATH
poetry install -E yaml
if [ ${{ matrix.broker == 'valkey' }} == true ]; then
additional_args="-E valkey"
fi
poetry install -E yaml $additional_args
poetry run pip install django==${{ matrix.django-version }}

- name: Get version
Expand Down
147 changes: 74 additions & 73 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ croniter = ">=2.0"
click = "^8.1"
rq = "^1.16"
pyyaml = { version = "^6.0", optional = true }
valkey = "6.0.1"
valkey = { version = "^6.0.2", optional = true}

[tool.poetry.dev-dependencies]
poetry = "^1.8.3"
Expand All @@ -60,6 +60,7 @@ freezegun = "^1.5"

[tool.poetry.extras]
yaml = ["pyyaml"]
valkey = ["valkey"]

[tool.flake8]
max-line-length = 120
Expand Down
5 changes: 2 additions & 3 deletions scheduler/admin/task_models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import redis
import valkey
from django.contrib import admin, messages
from django.contrib.contenttypes.admin import GenericStackedInline
from django.utils.translation import gettext_lazy as _

from scheduler import tools
from scheduler.broker_types import ConnectionErrorTypes
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions_for_task
Expand Down Expand Up @@ -186,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
obj = self.get_object(request, object_id)
try:
execution_list = get_job_executions_for_task(obj.queue, obj)
except (redis.ConnectionError, valkey.ConnectionError) as e:
except ConnectionErrorTypes as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)
Expand Down
26 changes: 26 additions & 0 deletions scheduler/broker_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Union, Dict, Tuple, Type

import redis

try:
import valkey
except ImportError:
valkey = redis
valkey.Valkey = redis.Redis
valkey.StrictValkey = redis.StrictRedis

from scheduler.settings import Broker

ConnectionErrorTypes = (redis.ConnectionError, valkey.ConnectionError)
ResponseErrorTypes = (redis.ResponseError, valkey.ResponseError)
ConnectionType = Union[redis.Redis, valkey.Valkey]
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]

BrokerMetaData: Dict[Tuple[Broker, bool], Tuple[Type[ConnectionType], Type[SentinelType], str]] = {
# Map of (Broker, Strict flag) => Connection Class, Sentinel Class, SSL Connection Prefix
(Broker.REDIS, False): (redis.Redis, redis.sentinel.Sentinel, "rediss"),
(Broker.VALKEY, False): (valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"),
(Broker.REDIS, True): (redis.StrictRedis, redis.sentinel.Sentinel, "rediss"),
(Broker.VALKEY, True): (valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"),
}
19 changes: 0 additions & 19 deletions scheduler/connection_types.py

This file was deleted.

5 changes: 2 additions & 3 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import sys

import click
import redis
import valkey
from django.core.management.base import BaseCommand
from django.db import connections
from rq.logutils import setup_loghandlers

from scheduler.broker_types import ConnectionErrorTypes
from scheduler.rq_classes import register_sentry
from scheduler.tools import create_worker

Expand Down Expand Up @@ -136,6 +135,6 @@ def handle(self, **options):
logging_level=log_level,
max_jobs=options["max_jobs"],
)
except (redis.ConnectionError, valkey.ConnectionError) as e:
except ConnectionErrorTypes as e:
click.echo(str(e), err=True)
sys.exit(1)
35 changes: 17 additions & 18 deletions scheduler/queues.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import List, Dict, Set

import redis
import valkey

from .connection_types import RedisSentinel, BrokerConnectionClass
from .broker_types import ConnectionErrorTypes, BrokerMetaData
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
from .settings import SCHEDULER_CONFIG
from .settings import logger, Broker
Expand All @@ -28,31 +25,32 @@ class QueueNotFoundError(Exception):
pass


def _get_redis_connection(config, use_strict_redis=False):
def _get_broker_connection(config, use_strict_broker=False):
"""
Returns a redis connection from a connection config
"""
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
import fakeredis

redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
else:
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
broker_cls = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][0]
logger.debug(f"Getting connection for {config}")
if "URL" in config:
if config.get("SSL") or config.get("URL").startswith("rediss://"):
return redis_cls.from_url(
ssl_url_protocol = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][2]
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol}://"):
return broker_cls.from_url(
config["URL"],
db=config.get("DB"),
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
)
else:
return redis_cls.from_url(
return broker_cls.from_url(
config["URL"],
db=config.get("DB"),
)
if "UNIX_SOCKET_PATH" in config:
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])

if "SENTINELS" in config:
connection_kwargs = {
Expand All @@ -63,13 +61,14 @@ def _get_redis_connection(config, use_strict_redis=False):
}
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][1]
sentinel = SentinelClass(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
return sentinel.master_for(
service_name=config["MASTER_NAME"],
redis_class=redis_cls,
redis_class=broker_cls,
)

return redis_cls(
return broker_cls(
host=config["HOST"],
port=config["PORT"],
db=config.get("DB", 0),
Expand All @@ -82,8 +81,8 @@ def _get_redis_connection(config, use_strict_redis=False):


def get_connection(queue_settings, use_strict_redis=False):
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
return _get_redis_connection(queue_settings, use_strict_redis)
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
return _get_broker_connection(queue_settings, use_strict_redis)


def get_queue(
Expand Down Expand Up @@ -116,7 +115,7 @@ def get_all_workers() -> Set[DjangoWorker]:
try:
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
workers_set.update(curr_workers)
except (redis.ConnectionError, valkey.ConnectionError) as e:
except ConnectionErrorTypes as e:
logger.error(f"Could not connect for queue {queue_name}: {e}")
return workers_set

Expand All @@ -142,7 +141,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
for name in queue_names[1:]:
if not _queues_share_connection_params(queue_params, QUEUES[name]):
raise ValueError(
f'Queues must have the same redis connection. "{name}" and'
f'Queues must have the same broker connection. "{name}" and'
f' "{queue_names[0]}" have different connections'
)
queue = get_queue(name, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rq.worker import WorkerStatus

from scheduler import settings
from scheduler.connection_types import PipelineType, ConnectionType
from scheduler.broker_types import PipelineType, ConnectionType

MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]

Expand Down
9 changes: 4 additions & 5 deletions scheduler/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from math import ceil
from typing import Tuple, Optional

import redis
from django.contrib import admin, messages
from django.contrib.admin.views.decorators import staff_member_required
from django.core.paginator import Paginator
Expand All @@ -12,8 +11,8 @@
from django.shortcuts import render
from django.urls import reverse, resolve
from django.views.decorators.cache import never_cache
from redis.exceptions import ResponseError

from .broker_types import ConnectionErrorTypes, ResponseErrorTypes
from .queues import get_all_workers, get_connection, QueueNotFoundError
from .queues import get_queue as get_queue_base
from .rq_classes import JobExecution, DjangoWorker, DjangoQueue, InvalidJobOperation
Expand Down Expand Up @@ -71,7 +70,7 @@ def get_statistics(run_maintenance_tasks=False):
if run_maintenance_tasks:
queue.clean_registries()

# Raw access to the first item from left of the redis list.
# Raw access to the first item from left of the broker list.
# This might not be accurate since new job can be added from the left
# with `at_front` parameters.
# Ideally rq should supports Queue.oldest_job
Expand Down Expand Up @@ -102,7 +101,7 @@ def get_statistics(run_maintenance_tasks=False):
canceled_jobs=len(queue.canceled_job_registry),
)
queues.append(queue_data)
except redis.ConnectionError as e:
except ConnectionErrorTypes as e:
logger.error(f"Could not connect for queue {queue_name}: {e}")
continue

Expand Down Expand Up @@ -277,7 +276,7 @@ def clear_queue_registry(request: HttpRequest, queue_name: str, registry_name: s
for job_id in job_ids:
registry.remove(job_id, delete_job=True)
messages.info(request, f"You have successfully cleared the {registry_name} jobs in queue {queue.name}")
except ResponseError as e:
except ResponseErrorTypes as e:
messages.error(
request,
f"error: {e}",
Expand Down
Loading