Skip to content

Commit 78b5c2a

Browse files
committed
Remove RQ dependency
1 parent 5f5f917 commit 78b5c2a

File tree

18 files changed

+103
-55
lines changed

18 files changed

+103
-55
lines changed

scheduler/helpers/callback.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,19 @@ def _import_attribute(name: str) -> Callable[..., Any]:
4747
name_bits = name.split(".")
4848
module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]]
4949
module = None
50-
while len(module_name_bits):
50+
while len(module_name_bits) > 0:
5151
try:
5252
module_name = ".".join(module_name_bits)
5353
module = importlib.import_module(module_name)
5454
break
5555
except ImportError:
5656
attribute_bits.insert(0, module_name_bits.pop())
5757

58-
if module is None:
59-
# maybe it's a builtin
58+
if module is None: # maybe it's a builtin
6059
try:
6160
return __builtins__[name]
6261
except KeyError:
63-
raise ValueError(f"Invalid attribute name: {name}")
62+
raise CallbackSetupError(f"Invalid attribute name: {name}")
6463

6564
attribute_name = ".".join(attribute_bits)
6665
if hasattr(module, attribute_name):

scheduler/helpers/queues/queue_logic.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import sys
33
import traceback
4-
from datetime import datetime, timedelta
4+
from datetime import datetime
55
from functools import total_ordering
66
from typing import Dict, List, Optional, Tuple, Union, Self, Any
77

@@ -317,7 +317,6 @@ def enqueue_job(
317317
job_model.status = JobStatus.QUEUED
318318
job_model.enqueued_at = utcnow()
319319
job_model.save(connection=pipe)
320-
job_model.expire(ttl=job_model.job_info_ttl, connection=pipe)
321320

322321
if self._is_async:
323322
if at_front:
@@ -326,10 +325,13 @@ def enqueue_job(
326325
score = self.queued_job_registry.get_last_timestamp() or current_timestamp()
327326
self.scheduled_job_registry.delete(connection=pipe, job_name=job_model.name)
328327
self.queued_job_registry.add(connection=pipe, score=score, job_name=job_model.name)
329-
result = pipe.execute()
330-
logger.debug(f"Pushed job {job_model.name} into {self.name}, {result[3]} job(s) are in queue.")
328+
pipe.execute()
329+
logger.debug(f"Pushed job {job_model.name} into {self.name} queued-jobs registry")
331330
else: # sync mode
331+
pipe.execute()
332332
job_model = self.run_sync(job_model)
333+
job_model.expire(ttl=job_model.job_info_ttl, connection=pipe)
334+
pipe.execute()
333335

334336
return job_model
335337

@@ -446,17 +448,18 @@ def cancel_job(self, job_name: str) -> None:
446448
# caller to handle it
447449
raise
448450

449-
def delete_job(self, job_name: str):
451+
def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
450452
"""Deletes the given job from the queue and all its registries"""
451-
452453
pipe = self.connection.pipeline()
453454

454455
while True:
455456
try:
456457
self._remove_from_registries(job_name, connection=pipe)
457458
self.failed_job_registry.delete(connection=pipe, job_name=job_name)
458-
if JobModel.exists(job_name, connection=self.connection):
459-
JobModel.delete_many([job_name], connection=pipe)
459+
if expire_job_model:
460+
job_model = JobModel.get(job_name, connection=self.connection)
461+
if job_model is not None:
462+
job_model.expire(ttl=job_model.job_info_ttl, connection=pipe)
460463
pipe.execute()
461464
break
462465
except WatchError:

scheduler/models/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,9 @@ def _schedule(self) -> bool:
351351
if not self.enabled:
352352
logger.debug(f"Task {str(self)} disabled, enable task before scheduling")
353353
return False
354-
if self.task_type in {TaskType.REPEATABLE, TaskType.ONCE} and self._schedule_time() < timezone.now():
355-
return False
356354
schedule_time = self._schedule_time()
355+
if self.task_type in {TaskType.REPEATABLE, TaskType.ONCE} and schedule_time < timezone.now():
356+
return False
357357
kwargs = self._enqueue_args()
358358
job = self.rqueue.create_and_enqueue_job(
359359
tools.run_task,

scheduler/redis_models/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def deserialize(cls, data: Dict[str, Any]) -> Self:
102102
types = {f.name: f.type for f in dataclasses.fields(cls)}
103103
for k in data:
104104
if k not in types:
105-
logger.warning(f"Unknown field {k} in WorkerModel")
105+
logger.warning(f"Unknown field {k} in {cls.__name__}")
106106
continue
107107
data[k] = _deserialize(data[k], types[k])
108108
return cls(**data)
@@ -184,6 +184,7 @@ def save(self, connection: ConnectionType) -> None:
184184
connection.hdel(self._key, *none_values)
185185
mapping = {k: v for k, v in mapping.items() if v is not None}
186186
if mapping:
187+
logger.debug(f"Saving {self._key}")
187188
connection.hset(self._key, mapping=mapping)
188189
self._dirty_fields = set()
189190
self._save_all = False

scheduler/redis_models/job.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ def create(
196196
timeout = _parse_timeout(timeout) or SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT
197197
if timeout == 0:
198198
raise ValueError("0 timeout is not allowed. Use -1 for infinite timeout")
199-
job_info_ttl = _parse_timeout(job_info_ttl or SCHEDULER_CONFIG.DEFAULT_JOB_TTL)
200-
if job_info_ttl is not None and job_info_ttl <= 0:
201-
raise ValueError("Job info ttl must be greater than 0")
199+
job_info_ttl = _parse_timeout(job_info_ttl if job_info_ttl is not None else SCHEDULER_CONFIG.DEFAULT_JOB_TTL)
202200
result_ttl = _parse_timeout(result_ttl)
203201
if not isinstance(args, (tuple, list)):
204202
raise TypeError(f"{args!r} is not a valid args list")
@@ -228,7 +226,7 @@ def create(
228226
else:
229227
raise TypeError(f"Expected a callable or a string, but got: {func}")
230228
description = description or _get_call_string(func, args or [], kwargs or {}, max_length=75)
231-
229+
job_info_ttl = job_info_ttl if job_info_ttl is not None else SCHEDULER_CONFIG.DEFAULT_JOB_TTL
232230
model = JobModel(
233231
created_at=utils.utcnow(),
234232
name=name,
@@ -247,7 +245,7 @@ def create(
247245
stopped_callback_name=on_stopped.name if on_stopped else None,
248246
stopped_callback_timeout=on_stopped.timeout if on_stopped else None,
249247
success_ttl=result_ttl,
250-
job_info_ttl=job_info_ttl or SCHEDULER_CONFIG.DEFAULT_JOB_TTL,
248+
job_info_ttl=job_info_ttl,
251249
timeout=timeout,
252250
status=status,
253251
last_heartbeat=None,

scheduler/redis_models/registry/queue_registries.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ def get_jobs_to_schedule(self, timestamp: int, chunk_size: int = 1000) -> List[s
8181
:param chunk_size: Max results to return.
8282
:returns: A list of job names
8383
"""
84-
score = timestamp
85-
jobs_to_schedule = self.connection.zrangebyscore(self._key, 0, score, start=0, num=chunk_size)
84+
jobs_to_schedule = self.connection.zrangebyscore(self._key, 0, max=timestamp, start=0, num=chunk_size)
8685
return [as_str(job_name) for job_name in jobs_to_schedule]
8786

8887
def get_scheduled_time(self, job_name: str) -> datetime:

scheduler/settings_types.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ class QueueConfiguration:
6666
MASTER_NAME: Optional[str] = None
6767
CONNECTION_KWARGS: Optional[Dict[str, Any]] = None
6868

69+
def __post_init__(self):
70+
if not any((self.URL, self.UNIX_SOCKET_PATH, self.HOST, self.SENTINELS)):
71+
raise ValueError(f"At least one of URL, UNIX_SOCKET_PATH, HOST must be provided: {self}")
72+
if sum((self.URL is not None, self.UNIX_SOCKET_PATH is not None, self.HOST is not None)) > 1:
73+
raise ValueError(f"Only one of URL, UNIX_SOCKET_PATH, HOST should be provided: {self}")
74+
if self.HOST is not None and (self.PORT is None or self.DB is None):
75+
raise ValueError(f"HOST requires PORT and DB: {self}")
76+
6977
def same_connection_params(self, other: Self) -> bool:
7078
for field in self.__CONNECTION_FIELDS__:
7179
if getattr(self, field) != getattr(other, field):

scheduler/tests/jobs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def failing_job():
3535

3636

3737
def test_job():
38-
# sleep(3)
3938
return 1 + 1
4039

4140

scheduler/tests/test_internals.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ def test_callback_bad_arguments(self):
2424
with self.assertRaises(CallbackSetupError) as cm:
2525
Callback("scheduler.tests.jobs.non_existing_method")
2626
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
27+
with self.assertRaises(CallbackSetupError) as cm:
28+
Callback("scheduler.tests.non_existing_module.non_existing_method")
29+
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
30+
with self.assertRaises(CallbackSetupError) as cm:
31+
Callback("non_existing_method")
32+
self.assertEqual(str(cm.exception), "Invalid attribute name: non_existing_method")
2733
with self.assertRaises(CallbackSetupError) as cm:
2834
Callback(1)
2935
self.assertEqual(str(cm.exception), "Callback `func` must be a string or function, received 1")

scheduler/tests/test_views/test_queue_actions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_job_list_action_delete_jobs__with_bad_next_url(self):
1515
# enqueue some jobs
1616
job_names = []
1717
for _ in range(0, 3):
18-
job = queue.create_and_enqueue_job(test_job)
18+
job = queue.create_and_enqueue_job(test_job, job_info_ttl=0)
1919
job_names.append(job.name)
2020

2121
# remove those jobs using view
@@ -46,7 +46,7 @@ def test_job_list_action_delete_jobs(self):
4646
# enqueue some jobs
4747
job_names = []
4848
for _ in range(0, 3):
49-
job = queue.create_and_enqueue_job(test_job)
49+
job = queue.create_and_enqueue_job(test_job, job_info_ttl=0)
5050
job_names.append(job.name)
5151

5252
# remove those jobs using view

0 commit comments

Comments
 (0)