|
2 | 2 | from math import ceil
|
3 | 3 | from typing import Tuple, Optional
|
4 | 4 |
|
5 |
| -import redis |
6 | 5 | from django.contrib import admin, messages
|
7 | 6 | from django.contrib.admin.views.decorators import staff_member_required
|
8 | 7 | from django.core.paginator import Paginator
|
|
14 | 13 | from django.views.decorators.cache import never_cache
|
15 | 14 | from redis.exceptions import ResponseError
|
16 | 15 |
|
17 |
| -from .queues import get_all_workers, get_connection, QueueNotFoundError |
| 16 | + |
| 17 | +from .queues import get_all_workers, get_connection, QueueNotFoundError, ConnectionErrors |
18 | 18 | from .queues import get_queue as get_queue_base
|
19 | 19 | from .rq_classes import JobExecution, DjangoWorker, DjangoQueue, InvalidJobOperation
|
20 | 20 | from .settings import SCHEDULER_CONFIG, logger
|
21 | 21 |
|
| 22 | +ResponseErrors = {ResponseError} |
| 23 | + |
| 24 | +try: |
| 25 | + from valkey import exceptions |
| 26 | + ResponseErrors.add(exceptions.ResponseError) |
| 27 | +except ImportError: |
| 28 | + pass |
| 29 | + |
22 | 30 |
|
23 | 31 | def get_queue(queue_name: str) -> DjangoQueue:
|
24 | 32 | try:
|
@@ -71,7 +79,7 @@ def get_statistics(run_maintenance_tasks=False):
|
71 | 79 | if run_maintenance_tasks:
|
72 | 80 | queue.clean_registries()
|
73 | 81 |
|
74 |
| - # Raw access to the first item from left of the redis list. |
| 82 | + # Raw access to the first item from left of the broker list. |
75 | 83 | # This might not be accurate since new job can be added from the left
|
76 | 84 | # with `at_front` parameters.
|
77 | 85 | # Ideally rq should supports Queue.oldest_job
|
@@ -102,7 +110,7 @@ def get_statistics(run_maintenance_tasks=False):
|
102 | 110 | canceled_jobs=len(queue.canceled_job_registry),
|
103 | 111 | )
|
104 | 112 | queues.append(queue_data)
|
105 |
| - except redis.ConnectionError as e: |
| 113 | + except ConnectionErrors as e: |
106 | 114 | logger.error(f"Could not connect for queue {queue_name}: {e}")
|
107 | 115 | continue
|
108 | 116 |
|
@@ -277,7 +285,7 @@ def clear_queue_registry(request: HttpRequest, queue_name: str, registry_name: s
|
277 | 285 | for job_id in job_ids:
|
278 | 286 | registry.remove(job_id, delete_job=True)
|
279 | 287 | messages.info(request, f"You have successfully cleared the {registry_name} jobs in queue {queue.name}")
|
280 |
| - except ResponseError as e: |
| 288 | + except ResponseErrors as e: |
281 | 289 | messages.error(
|
282 | 290 | request,
|
283 | 291 | f"error: {e}",
|
|
0 commit comments