Skip to content

Commit 3256a08

Browse files
committed
refactor: separated queue views to several files
1 parent 7f06a91 commit 3256a08

File tree

21 files changed

+385
-287
lines changed

21 files changed

+385
-287
lines changed

scheduler/decorators.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def delay(*args, **kwargs):
9090
on_failure=self.on_failure,
9191
on_success=self.on_success,
9292
on_stopped=self.on_stopped,
93+
when=None,
9394
)
9495

9596
JOB_METHODS_LIST.append(f"{f.__module__}.{f.__name__}")

scheduler/helpers/queues/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
"InvalidJobOperation",
44
"get_queue",
55
"get_all_workers",
6-
"get_queues",
76
"perform_job",
87
]
98

10-
from .getters import get_queue, get_all_workers, get_queues
9+
from .getters import get_queue, get_all_workers
1110
from .queue_logic import Queue, InvalidJobOperation, perform_job

scheduler/helpers/queues/getters.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from typing import List, Set
22

3-
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker
43
from scheduler.redis_models.worker import WorkerModel
54
from scheduler.settings import (
65
SCHEDULER_CONFIG,
@@ -9,12 +8,10 @@
98
QueueConfiguration,
109
logger,
1110
)
11+
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker
1212
from .queue_logic import Queue
1313

1414

15-
class QueueConnectionDiscrepancyError(Exception):
16-
pass
17-
1815

1916
_BAD_QUEUE_CONFIGURATION = set()
2017

@@ -82,19 +79,3 @@ def get_all_workers() -> Set[WorkerModel]:
8279
return workers_set
8380

8481

85-
def get_queues(*queue_names: str) -> List[Queue]:
86-
"""Return queue instances from specified queue names. All instances must use the same Broker configuration."""
87-
88-
queue_config = get_queue_configuration(queue_names[0])
89-
queues = [get_queue(queue_names[0])]
90-
# perform consistency checks while building return list
91-
for queue_name in queue_names[1:]:
92-
curr_queue_config = get_queue_configuration(queue_name)
93-
if not queue_config.same_connection_params(curr_queue_config):
94-
raise QueueConnectionDiscrepancyError(
95-
f'Queues must have the same broker connection. "{queue_name}" and "{queue_names[0]}" have different connection settings'
96-
)
97-
queue = get_queue(queue_name)
98-
queues.append(queue)
99-
100-
return queues

scheduler/helpers/queues/queue_logic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def create_and_enqueue_job(
190190
func: FunctionReferenceType,
191191
args: Union[Tuple, List, None] = None,
192192
kwargs: Optional[Dict] = None,
193+
when: Optional[datetime] = None,
193194
timeout: Optional[int] = None,
194195
result_ttl: Optional[int] = None,
195196
job_info_ttl: Optional[int] = None,
@@ -202,7 +203,6 @@ def create_and_enqueue_job(
202203
on_stopped: Optional[Callback] = None,
203204
task_type: Optional[str] = None,
204205
scheduled_task_id: Optional[int] = None,
205-
when: Optional[datetime] = None,
206206
pipeline: Optional[ConnectionType] = None,
207207
) -> JobModel:
208208
"""Creates a job to represent the delayed function call and enqueues it.

scheduler/management/commands/run_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ def handle(self, **options):
3232
queue = get_queue(options.get("queue"))
3333
func = options.get("callable")
3434
args = options.get("args")
35-
job = queue.create_and_enqueue_job(func, args=args, timeout=timeout, result_ttl=result_ttl)
35+
job = queue.create_and_enqueue_job(func, args=args, timeout=timeout, result_ttl=result_ttl, when=None)
3636
if verbosity:
3737
click.echo(f"Job {job.name} created")

scheduler/management/commands/scheduler_stats.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def add_arguments(self, parser):
5050
def _print_separator(self):
5151
click.echo("-" * self.table_width)
5252

53-
def _print_stats_dashboard(self, statistics, prev_stats=None):
53+
def _print_stats_dashboard(self, statistics, prev_stats=None, with_color:bool = True):
5454
if self.interval:
5555
click.clear()
5656
click.echo()
@@ -62,6 +62,8 @@ def _print_stats_dashboard(self, statistics, prev_stats=None):
6262
for ind, queue in enumerate(statistics["queues"]):
6363
vals = list((queue[k] for k in KEYS))
6464
# Deal with colors
65+
if not with_color:
66+
colors = ["" for _ in KEYS]
6567
if prev_stats and len(prev_stats["queues"]) > ind:
6668
prev = prev_stats["queues"][ind]
6769
prev_vals = tuple(prev[k] for k in KEYS)
@@ -71,7 +73,7 @@ def _print_stats_dashboard(self, statistics, prev_stats=None):
7173
else:
7274
colors = [ANSI_LIGHT_WHITE for _ in range(len(vals))]
7375
to_print = " | ".join([f"{colors[i]}{vals[i]:9}{ANSI_RESET}" for i in range(len(vals))])
74-
click.echo(f"| {queue['name']:<16} | {to_print} |", color=True)
76+
click.echo(f"| {queue['name']:<16} | {to_print} |", color=with_color)
7577

7678
self._print_separator()
7779

@@ -98,22 +100,20 @@ def handle(self, *args, **options):
98100
click.secho("Aborting. yaml not supported", err=True, fg="red")
99101
return
100102

101-
click.secho(
102-
yaml.dump(get_statistics(), default_flow_style=False),
103-
)
103+
click.secho(yaml.dump(get_statistics(), default_flow_style=False))
104104
return
105105

106106
self.interval = options.get("interval")
107107

108108
if not self.interval or self.interval < 0:
109-
self._print_stats_dashboard(get_statistics())
109+
self._print_stats_dashboard(get_statistics(), with_color=not options.get("no_color"))
110110
return
111111

112112
try:
113113
prev = None
114114
while True:
115115
statistics = get_statistics()
116-
self._print_stats_dashboard(statistics, prev)
116+
self._print_stats_dashboard(statistics, prev, with_color=not options.get("no_color"))
117117
prev = statistics
118118
time.sleep(self.interval)
119119
except KeyboardInterrupt:

scheduler/models/task.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
from django.utils.safestring import mark_safe
1616
from django.utils.translation import gettext_lazy as _
1717

18+
from scheduler import settings
1819
from scheduler.helpers.callback import Callback
1920
from scheduler.helpers.queues import Queue
2021
from scheduler.helpers.queues import get_queue
2122
from scheduler.redis_models import JobModel
2223
from scheduler.settings import logger, get_queue_names
23-
from scheduler import settings
2424
from scheduler.types import ConnectionType, TASK_TYPES
2525
from .args import TaskArg, TaskKwarg
2626
from ..helpers import utils
@@ -255,11 +255,7 @@ def rqueue(self) -> Queue:
255255
def enqueue_to_run(self) -> bool:
256256
"""Enqueue task to run now as a different instance from the scheduled task."""
257257
kwargs = self._enqueue_args()
258-
self.rqueue.create_and_enqueue_job(
259-
run_task,
260-
args=(self.task_type, self.id),
261-
**kwargs,
262-
)
258+
self.rqueue.create_and_enqueue_job(run_task, args=(self.task_type, self.id), when=None, **kwargs)
263259
return True
264260

265261
def unschedule(self) -> bool:

scheduler/redis_models/registry/queue_registries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def cleanup(self, connection: ConnectionType, timestamp: Optional[float] = None)
5454
and `all()` methods implemented in JobIdsRegistry."""
5555
pass
5656

57-
def schedule(self, connection: ConnectionType, job: JobModel, scheduled_datetime):
57+
def schedule(self, connection: ConnectionType, job: JobModel, scheduled_datetime: datetime):
5858
"""
5959
Adds job to registry, scored by its execution time (in UTC).
6060
If datetime has no tzinfo, it will assume localtimezone.

scheduler/templates/admin/scheduler/jobs.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
<div id="content-main">
2626
<ul class="object-tools">
2727
{% if job_status == 'Failed' %}
28-
<li><a href="{% url 'queue_requeue_all' queue.name registry_name %}" class="requeuelink">Requeue All</a>
28+
<li><a href="{% url 'queue_registry_action' queue.name registry_name 'requeue' %}" class="requeuelink">Requeue All</a>
2929
</li>
3030
{% endif %}
31-
<li><a href="{% url 'queue_clear' queue.name registry_name %}" class="deletelink">Empty Queue</a></li>
31+
<li><a href="{% url 'queue_registry_action' queue.name registry_name 'empty' %}" class="deletelink">Empty Queue</a></li>
3232
</ul>
3333
<div class="module" id="changelist">
3434
<form id="changelist-form" action="{% url 'queue_confirm_action' queue.name %}" method="post">
Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,94 @@
1+
import json
2+
import sys
3+
from io import StringIO
4+
5+
import yaml
16
from django.core.management import call_command
27
from django.test import TestCase, override_settings
38

4-
from scheduler.tests import test_settings # noqa
5-
from scheduler.types import SchedulerConfiguration
9+
from scheduler import settings
10+
from scheduler.helpers.queues import get_queue
11+
from scheduler.worker.worker import get_queues
612

713

14+
@override_settings(SCHEDULER_QUEUES=dict(default={"HOST": "localhost", "PORT": 6379, "DB": 0}))
815
class SchedulerStatsTest(TestCase):
9-
@override_settings(SCHEDULER_CONFIG=SchedulerConfiguration(SCHEDULER_INTERVAL=1))
10-
def test_scheduler_stats__does_not_fail(self):
16+
EXPECTED_OUTPUT = {
17+
"queues": [
18+
{
19+
"canceled_jobs": 0,
20+
"failed_jobs": 0,
21+
"finished_jobs": 0,
22+
"name": "default",
23+
"oldest_job_timestamp": None,
24+
"queued_jobs": 0,
25+
"scheduled_jobs": 0,
26+
"scheduler_pid": None,
27+
"started_jobs": 0,
28+
"workers": 0,
29+
}
30+
]
31+
}
32+
OLD_QUEUES = None
33+
34+
def setUp(self):
35+
super(SchedulerStatsTest, self).setUp()
36+
SchedulerStatsTest.OLD_QUEUES = settings._QUEUES
37+
settings._QUEUES = dict()
38+
settings.conf_settings()
39+
get_queue("default").connection.flushall()
40+
41+
def tearDown(self):
42+
super(SchedulerStatsTest, self).tearDown()
43+
settings._QUEUES = SchedulerStatsTest.OLD_QUEUES
44+
45+
def test_scheduler_stats__json_output(self):
46+
test_stdout = StringIO()
47+
sys.stdout = test_stdout
48+
# act
1149
call_command("scheduler_stats", "-j")
50+
# assert
51+
res = test_stdout.getvalue()
52+
self.assertEqual(json.loads(res), SchedulerStatsTest.EXPECTED_OUTPUT)
53+
54+
def test_scheduler_stats__yaml_output(self):
55+
# arrange
56+
test_stdout = StringIO()
57+
sys.stdout = test_stdout
58+
# act
1259
call_command("scheduler_stats", "-y")
13-
call_command("scheduler_stats")
60+
# assert
61+
res = test_stdout.getvalue()
62+
self.assertEqual(yaml.load(res, yaml.SafeLoader), SchedulerStatsTest.EXPECTED_OUTPUT)
63+
64+
def test_scheduler_stats__plain_text_output(self):
65+
test_stdout = StringIO()
66+
sys.stdout = test_stdout
67+
# act
68+
call_command("scheduler_stats", "--no-color")
69+
# assert
70+
res = test_stdout.getvalue()
71+
self.assertEqual(
72+
res,
73+
"""
74+
Django-Scheduler CLI Dashboard
75+
76+
--------------------------------------------------------------------------------
77+
| Name | Queued | Active | Finished | Canceled | Workers |
78+
--------------------------------------------------------------------------------
79+
| default | 0 | 0 | 0 | 0 | 0 |
80+
--------------------------------------------------------------------------------
81+
""",
82+
)
83+
84+
def test_scheduler_stats__bad_args(self):
85+
# arrange
86+
sys.stderr = StringIO()
87+
sys.stdout = StringIO()
88+
# act
89+
call_command("scheduler_stats", "-y", "-j")
90+
# assert
91+
res = sys.stdout.getvalue()
92+
self.assertEqual(res, """""")
93+
err = sys.stderr.getvalue()
94+
self.assertEqual(err, """Aborting. Cannot output as both json and yaml\n""")

0 commit comments

Comments
 (0)