Skip to content

Commit 0c91a14

Browse files
committed
refactor: separated queue views to several files
1 parent f1381b0 commit 0c91a14

File tree

12 files changed

+124
-212
lines changed

12 files changed

+124
-212
lines changed

scheduler/helpers/callback.py

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import importlib
21
import inspect
32
from typing import Union, Callable, Any, Optional
43

4+
from scheduler.helpers.utils import callable_func
55
from scheduler.timeouts import JobTimeoutException
66

77

@@ -19,7 +19,11 @@ def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[int]
1919
if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func):
2020
raise CallbackSetupError(f"Callback `func` must be a string or function, received {func}")
2121
if isinstance(func, str):
22-
func = _import_attribute(func)
22+
try:
23+
func_str = func
24+
func = callable_func(func)
25+
except (TypeError, AttributeError, ModuleNotFoundError, ValueError) as e:
26+
raise CallbackSetupError(f"Callback `func` is not callable: {func_str}")
2327
self.func: Callable[..., Any] = func
2428

2529
@property
@@ -31,47 +35,3 @@ def __call__(self, *args, **kwargs):
3135

3236
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(self.timeout, JobTimeoutException):
3337
return self.func(*args, **kwargs)
34-
35-
36-
def _import_attribute(name: str) -> Callable[..., Any]:
37-
"""Returns an attribute from a dotted path name. Example: `path.to.func`.
38-
39-
When the attribute we look for is a staticmethod, module name in its dotted path is not the last-before-end word
40-
E.g.: package_a.package_b.module_a.ClassA.my_static_method
41-
Thus we remove the bits from the end of the name until we can import it
42-
43-
:param name: The name (reference) to the path.
44-
:raises ValueError: If no module is found or invalid attribute name.
45-
:returns: An attribute (normally a Callable)
46-
"""
47-
name_bits = name.split(".")
48-
module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]]
49-
module = None
50-
while len(module_name_bits) > 0:
51-
try:
52-
module_name = ".".join(module_name_bits)
53-
module = importlib.import_module(module_name)
54-
break
55-
except ImportError:
56-
attribute_bits.insert(0, module_name_bits.pop())
57-
58-
if module is None: # maybe it's a builtin
59-
try:
60-
return __builtins__[name]
61-
except KeyError:
62-
raise CallbackSetupError(f"Invalid attribute name: {name}")
63-
64-
attribute_name = ".".join(attribute_bits)
65-
if hasattr(module, attribute_name):
66-
return getattr(module, attribute_name)
67-
# staticmethods
68-
attribute_name = attribute_bits.pop()
69-
attribute_owner_name = ".".join(attribute_bits)
70-
try:
71-
attribute_owner = getattr(module, attribute_owner_name)
72-
except: # noqa
73-
raise CallbackSetupError(f"Invalid attribute name: {attribute_name}")
74-
75-
if not hasattr(attribute_owner, attribute_name):
76-
raise CallbackSetupError(f"Invalid attribute name: {name}")
77-
return getattr(attribute_owner, attribute_name)

scheduler/helpers/queues/queue_logic.py

Lines changed: 57 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import sys
33
import traceback
44
from datetime import datetime
5-
from functools import total_ordering
65
from typing import Dict, List, Optional, Tuple, Union, Self, Any
76

87
from redis import WatchError
@@ -56,7 +55,6 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
5655
_job_stack = []
5756

5857

59-
@total_ordering
6058
class Queue:
6159
REGISTRIES = dict(
6260
finished="finished_job_registry",
@@ -146,17 +144,6 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
146144
def first_queued_job_name(self) -> Optional[str]:
147145
return self.queued_job_registry.get_first()
148146

149-
def empty(self):
150-
"""Removes all queued jobs from the queue."""
151-
queued_jobs_count = self.queued_job_registry.count(connection=self.connection)
152-
with self.connection.pipeline() as pipe:
153-
for offset in range(0, queued_jobs_count, 1000):
154-
job_names = self.queued_job_registry.all(offset, 1000)
155-
for job_name in job_names:
156-
self.queued_job_registry.delete(connection=pipe, job_name=job_name)
157-
JobModel.delete_many(job_names, connection=pipe)
158-
pipe.execute()
159-
160147
@property
161148
def count(self) -> int:
162149
"""Returns a count of all messages in the queue."""
@@ -186,24 +173,24 @@ def get_all_jobs(self) -> List[JobModel]:
186173
return JobModel.get_many(job_names, connection=self.connection)
187174

188175
def create_and_enqueue_job(
189-
self,
190-
func: FunctionReferenceType,
191-
args: Union[Tuple, List, None] = None,
192-
kwargs: Optional[Dict] = None,
193-
when: Optional[datetime] = None,
194-
timeout: Optional[int] = None,
195-
result_ttl: Optional[int] = None,
196-
job_info_ttl: Optional[int] = None,
197-
description: Optional[str] = None,
198-
name: Optional[str] = None,
199-
at_front: bool = False,
200-
meta: Optional[Dict] = None,
201-
on_success: Optional[Callback] = None,
202-
on_failure: Optional[Callback] = None,
203-
on_stopped: Optional[Callback] = None,
204-
task_type: Optional[str] = None,
205-
scheduled_task_id: Optional[int] = None,
206-
pipeline: Optional[ConnectionType] = None,
176+
self,
177+
func: FunctionReferenceType,
178+
args: Union[Tuple, List, None] = None,
179+
kwargs: Optional[Dict] = None,
180+
when: Optional[datetime] = None,
181+
timeout: Optional[int] = None,
182+
result_ttl: Optional[int] = None,
183+
job_info_ttl: Optional[int] = None,
184+
description: Optional[str] = None,
185+
name: Optional[str] = None,
186+
at_front: bool = False,
187+
meta: Optional[Dict] = None,
188+
on_success: Optional[Callback] = None,
189+
on_failure: Optional[Callback] = None,
190+
on_stopped: Optional[Callback] = None,
191+
task_type: Optional[str] = None,
192+
scheduled_task_id: Optional[int] = None,
193+
pipeline: Optional[ConnectionType] = None,
207194
) -> JobModel:
208195
"""Creates a job to represent the delayed function call and enqueues it.
209196
:param when: When to schedule the job (None to enqueue immediately)
@@ -309,43 +296,6 @@ def run_job(self, job: JobModel) -> JobModel:
309296
pipeline.execute()
310297
return job
311298

312-
def enqueue_job(
313-
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
314-
) -> JobModel:
315-
"""Enqueues a job for delayed execution without checking dependencies.
316-
317-
If Queue is instantiated with is_async=False, job is executed immediately.
318-
:param job_model: The job redis model
319-
:param connection: The Redis Pipeline
320-
:param at_front: Whether to enqueue the job at the front
321-
322-
:returns: The enqueued JobModel
323-
"""
324-
325-
pipe = connection if connection is not None else self.connection.pipeline()
326-
327-
# Add Queue key set
328-
job_model.status = JobStatus.QUEUED
329-
job_model.enqueued_at = utcnow()
330-
job_model.save(connection=pipe)
331-
332-
if self._is_async:
333-
if at_front:
334-
score = current_timestamp()
335-
else:
336-
score = self.queued_job_registry.get_last_timestamp() or current_timestamp()
337-
self.scheduled_job_registry.delete(connection=pipe, job_name=job_model.name)
338-
self.queued_job_registry.add(connection=pipe, score=score, job_name=job_model.name)
339-
pipe.execute()
340-
logger.debug(f"Pushed job {job_model.name} into {self.name} queued-jobs registry")
341-
else: # sync mode
342-
pipe.execute()
343-
job_model = self.run_sync(job_model)
344-
job_model.expire(ttl=job_model.job_info_ttl, connection=pipe)
345-
pipe.execute()
346-
347-
return job_model
348-
349299
def run_sync(self, job: JobModel) -> JobModel:
350300
"""Run a job synchronously, meaning on the same process the method was called."""
351301
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
@@ -361,10 +311,10 @@ def run_sync(self, job: JobModel) -> JobModel:
361311

362312
@classmethod
363313
def dequeue_any(
364-
cls,
365-
queues: List[Self],
366-
timeout: Optional[int],
367-
connection: Optional[ConnectionType] = None,
314+
cls,
315+
queues: List[Self],
316+
timeout: Optional[int],
317+
connection: Optional[ConnectionType] = None,
368318
) -> Tuple[Optional[JobModel], Optional[Self]]:
369319
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
370320
is important.
@@ -398,19 +348,6 @@ def dequeue_any(
398348
return job, queue
399349
return None, None
400350

401-
def __eq__(self, other: Self) -> bool:
402-
if not isinstance(other, Queue):
403-
raise TypeError("Cannot compare queues to other objects")
404-
return self.name == other.name
405-
406-
def __lt__(self, other: Self) -> bool:
407-
if not isinstance(other, Queue):
408-
raise TypeError("Cannot compare queues to other objects")
409-
return self.name < other.name
410-
411-
def __hash__(self) -> int:
412-
return hash(self.name)
413-
414351
def __repr__(self) -> str:
415352
return f"{self.__class__.__name__}({self.name!r})"
416353

@@ -479,17 +416,39 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
479416
except WatchError:
480417
pass
481418

482-
def requeue_jobs(self, *job_names: str, at_front: bool = False) -> int:
483-
jobs = JobModel.get_many(job_names, connection=self.connection)
484-
jobs_requeued = 0
485-
with self.connection.pipeline() as pipe:
486-
for job in jobs:
487-
if job is None:
488-
continue
489-
job.started_at = None
490-
job.ended_at = None
491-
job.save(connection=pipe)
492-
self.enqueue_job(job, connection=pipe, at_front=at_front)
493-
jobs_requeued += 1
419+
def enqueue_job(
420+
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
421+
) -> JobModel:
422+
"""Enqueues a job for delayed execution without checking dependencies.
423+
424+
If Queue is instantiated with is_async=False, job is executed immediately.
425+
:param job_model: The job redis model
426+
:param connection: The Redis Pipeline
427+
:param at_front: Whether to enqueue the job at the front
428+
429+
:returns: The enqueued JobModel
430+
"""
431+
432+
pipe = connection if connection is not None else self.connection.pipeline()
433+
job_model.started_at = None
434+
job_model.ended_at = None
435+
job_model.status = JobStatus.QUEUED
436+
job_model.enqueued_at = utcnow()
437+
job_model.save(connection=pipe)
438+
439+
if self._is_async:
440+
if at_front:
441+
score = current_timestamp()
442+
else:
443+
score = self.queued_job_registry.get_last_timestamp() or current_timestamp()
444+
self.scheduled_job_registry.delete(connection=pipe, job_name=job_model.name)
445+
self.queued_job_registry.add(connection=pipe, score=score, job_name=job_model.name)
494446
pipe.execute()
495-
return jobs_requeued
447+
logger.debug(f"Pushed job {job_model.name} into {self.name} queued-jobs registry")
448+
else: # sync mode
449+
pipe.execute()
450+
job_model = self.run_sync(job_model)
451+
job_model.expire(ttl=job_model.job_info_ttl, connection=pipe)
452+
pipe.execute()
453+
454+
return job_model

scheduler/redis_models/registry/queue_registries.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ def compact(self) -> None:
2828
if not JobModel.exists(job_name, self.connection):
2929
self.delete(connection=self.connection, job_name=job_name)
3030

31+
def empty(self) -> None:
32+
queued_jobs_count = self.count(connection=self.connection)
33+
with self.connection.pipeline() as pipe:
34+
for offset in range(0, queued_jobs_count, 1000):
35+
job_names = self.all(offset, 1000)
36+
for job_name in job_names:
37+
self.delete(connection=pipe, job_name=job_name)
38+
JobModel.delete_many(job_names, connection=pipe)
39+
pipe.execute()
3140

3241
class FinishedJobRegistry(JobNamesRegistry):
3342
_element_key_template: ClassVar[str] = ":registry:{}:finished_jobs"

scheduler/tests/test_internals.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ def test_callback_bad_arguments(self):
2929
self.assertEqual(str(cm.exception), "Callback `timeout` must be a positive int, but received 1m")
3030
with self.assertRaises(CallbackSetupError) as cm:
3131
Callback("scheduler.tests.jobs.non_existing_method")
32-
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
32+
self.assertEqual(str(cm.exception), "Callback `func` is not callable: scheduler.tests.jobs.non_existing_method")
3333
with self.assertRaises(CallbackSetupError) as cm:
3434
Callback("scheduler.tests.non_existing_module.non_existing_method")
35-
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
35+
self.assertEqual(str(cm.exception), "Callback `func` is not callable: scheduler.tests.non_existing_module.non_existing_method")
3636
with self.assertRaises(CallbackSetupError) as cm:
3737
Callback("non_existing_method")
38-
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
38+
self.assertEqual(str(cm.exception), "Callback `func` is not callable: non_existing_method")
3939
with self.assertRaises(CallbackSetupError) as cm:
4040
Callback(1)
4141
self.assertEqual(str(cm.exception), "Callback `func` must be a string or function, received 1")

scheduler/tests/test_mgmt_commands/test_run_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class RunJobTest(TestCase):
1111
def test_run_job__should_schedule_job(self):
1212
queue = get_queue("default")
13-
queue.empty()
13+
queue.queued_job_registry.empty()
1414
func_name = f"{test_job.__module__}.{test_job.__name__}"
1515
# act
1616
call_command("run_job", func_name, queue="default")

scheduler/tests/test_views/test_job_detail_action.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,6 @@ def test_single_job_action_unknown_action(self):
2525
res = self.client.get(reverse("job_detail_action", args=[job.name, "unknown"]), follow=True)
2626
self.assertEqual(400, res.status_code)
2727

28-
def test_single_job_action_requeue_job(self):
29-
queue = get_queue("default")
30-
job = queue.create_and_enqueue_job(failing_job)
31-
worker = create_worker("default", burst=True)
32-
worker.work()
33-
job = JobModel.get(job.name, connection=queue.connection)
34-
self.assertTrue(job.is_failed)
35-
res = self.client.get(reverse("job_detail_action", args=[job.name, "requeue"]), follow=True)
36-
self.assertEqual(200, res.status_code)
37-
self.client.post(reverse("job_detail_action", args=[job.name, "requeue"]), {"requeue": "Requeue"}, follow=True)
38-
self.assertIn(job, JobModel.get_many(queue.queued_job_registry.all(), queue.connection))
39-
queue.delete_job(job.name)
40-
4128
def test_single_job_action_delete_job(self):
4229
queue = get_queue("default")
4330
job = queue.create_and_enqueue_job(test_job, job_info_ttl=0)
@@ -80,8 +67,6 @@ def test_single_job_action_enqueue_job(self):
8067
job = queue.create_and_enqueue_job(test_job)
8168
job_list.append(job)
8269

83-
# This job is deferred
84-
8570
self.assertEqual(job_list[-1].get_status(connection=queue.connection), JobStatus.QUEUED)
8671
self.assertIsNotNone(job_list[-1].enqueued_at)
8772

scheduler/tests/testtools.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111
from scheduler import settings
1212
from scheduler.admin.task_admin import job_execution_of
1313
from scheduler.helpers.queues import get_queue
14-
from scheduler.settings import QueueNotFoundError
15-
from scheduler.worker import create_worker
16-
from scheduler.models import TaskKwarg
1714
from scheduler.models import Task, TaskType
15+
from scheduler.models import TaskKwarg
1816
from scheduler.redis_models import JobModel
1917
from scheduler.worker import Worker
18+
from scheduler.worker import create_worker
2019

2120
multiprocessing.set_start_method("fork")
2221

@@ -48,7 +47,7 @@ def sequence_gen():
4847

4948

5049
def task_factory(
51-
task_type: TaskType, callable_name: str = "scheduler.tests.jobs.test_job", instance_only=False, **kwargs
50+
task_type: TaskType, callable_name: str = "scheduler.tests.jobs.test_job", instance_only=False, **kwargs
5251
):
5352
values = dict(
5453
name="Scheduled Job %d" % next(seq),
@@ -135,8 +134,3 @@ def setUp(self) -> None:
135134

136135
def tearDown(self) -> None:
137136
super(SchedulerBaseCase, self).tearDown()
138-
try:
139-
queue = get_queue("default")
140-
queue.empty()
141-
except QueueNotFoundError:
142-
pass

0 commit comments

Comments
 (0)