Skip to content

Commit ebbce68

Browse files
committed
wip
1 parent cb9b344 commit ebbce68

File tree

11 files changed

+43
-46
lines changed

11 files changed

+43
-46
lines changed

scheduler/helpers/callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[int]
3030
def name(self) -> str:
3131
return f"{self.func.__module__}.{self.func.__qualname__}"
3232

33-
def __call__(self, *args, **kwargs):
33+
def __call__(self, *args: Any, **kwargs: Any) -> Any:
3434
from scheduler.settings import SCHEDULER_CONFIG
3535

3636
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(self.timeout, JobTimeoutException):

scheduler/helpers/queues/getters.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
from typing import Set
22

33
from scheduler.redis_models.worker import WorkerModel
4-
from scheduler.settings import SCHEDULER_CONFIG, get_queue_names, get_queue_configuration, QueueConfiguration, logger
5-
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker
4+
from scheduler.settings import SCHEDULER_CONFIG, get_queue_names, get_queue_configuration, logger
5+
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker, ConnectionType, QueueConfiguration
66
from .queue_logic import Queue
77

8-
98
_BAD_QUEUE_CONFIGURATION = set()
109

1110

12-
def _get_connection(config: QueueConfiguration, use_strict_broker=False):
11+
def _get_connection(config: QueueConfiguration, use_strict_broker: bool = False) -> ConnectionType:
1312
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
1413
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
1514
import fakeredis
@@ -32,7 +31,7 @@ def _get_connection(config: QueueConfiguration, use_strict_broker=False):
3231
sentinel_kwargs = config.SENTINEL_KWARGS or {}
3332
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)].sentinel_type
3433
sentinel = SentinelClass(config.SENTINELS, sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
35-
return sentinel.master_for(
34+
return sentinel.master_for( # type: ignore
3635
service_name=config.MASTER_NAME,
3736
redis_class=broker_cls,
3837
)
@@ -47,7 +46,7 @@ def _get_connection(config: QueueConfiguration, use_strict_broker=False):
4746
)
4847

4948

50-
def get_queue(name="default") -> Queue:
49+
def get_queue(name: str = "default") -> Queue:
5150
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
5251
queue_settings = get_queue_configuration(name)
5352
is_async = queue_settings.ASYNC

scheduler/helpers/queues/queue_logic.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020
from scheduler.redis_models import JobStatus, SchedulerLock, Result, ResultType, JobModel
2121
from scheduler.settings import logger, SCHEDULER_CONFIG
22-
from scheduler.types import ConnectionType, FunctionReferenceType, Self
22+
from scheduler.types import ConnectionType, FunctionReferenceType, Self, PipelineType
2323

2424

2525
class InvalidJobOperation(Exception):
@@ -49,17 +49,17 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
4949
coro_result = loop.run_until_complete(result)
5050
result = coro_result
5151
if job_model.success_callback:
52-
job_model.success_callback(job_model, connection, result) # type: ignore
52+
job_model.success_callback(job_model, connection, result)
5353
return result
5454
except:
5555
if job_model.failure_callback:
56-
job_model.failure_callback(job_model, connection, *sys.exc_info()) # type: ignore
56+
job_model.failure_callback(job_model, connection, *sys.exc_info())
5757
raise
5858
finally:
5959
assert job_model is _job_stack.pop()
6060

6161

62-
_job_stack = []
62+
_job_stack: List[JobModel] = []
6363

6464

6565
class Queue:
@@ -89,11 +89,11 @@ def __init__(self, connection: ConnectionType, name: str, is_async: bool = True)
8989
self.scheduled_job_registry = ScheduledJobRegistry(connection=self.connection, name=self.name)
9090
self.canceled_job_registry = CanceledJobRegistry(connection=self.connection, name=self.name)
9191

92-
def __len__(self):
92+
def __len__(self) -> int:
9393
return self.count
9494

9595
@property
96-
def scheduler_pid(self) -> int:
96+
def scheduler_pid(self) -> Optional[int]:
9797
lock = SchedulerLock(self.name)
9898
pid = lock.value(self.connection)
9999
return int(pid.decode()) if pid is not None else None
@@ -162,7 +162,7 @@ def count(self) -> int:
162162
def get_registry(self, name: str) -> JobNamesRegistry:
163163
name = name.lower()
164164
if name in Queue.REGISTRIES:
165-
return getattr(self, Queue.REGISTRIES[name])
165+
return getattr(self, Queue.REGISTRIES[name]) # type: ignore
166166
raise NoSuchRegistryError(f"Unknown registry name {name}")
167167

168168
def get_all_job_names(self) -> List[str]:
@@ -182,22 +182,21 @@ def get_all_jobs(self) -> List[JobModel]:
182182
def create_and_enqueue_job(
183183
self,
184184
func: FunctionReferenceType,
185-
args: Union[Tuple, List, None] = None,
186-
kwargs: Optional[Dict] = None,
185+
args: Union[Tuple[Any, ...], List[Any], None] = None,
186+
kwargs: Optional[Dict[str, Any]] = None,
187187
when: Optional[datetime] = None,
188188
timeout: Optional[int] = None,
189189
result_ttl: Optional[int] = None,
190190
job_info_ttl: Optional[int] = None,
191191
description: Optional[str] = None,
192192
name: Optional[str] = None,
193193
at_front: bool = False,
194-
meta: Optional[Dict] = None,
194+
meta: Optional[Dict[str, Any]] = None,
195195
on_success: Optional[Callback] = None,
196196
on_failure: Optional[Callback] = None,
197197
on_stopped: Optional[Callback] = None,
198198
task_type: Optional[str] = None,
199199
scheduled_task_id: Optional[int] = None,
200-
pipeline: Optional[ConnectionType] = None,
201200
) -> JobModel:
202201
"""Creates a job to represent the delayed function call and enqueues it.
203202
:param when: When to schedule the job (None to enqueue immediately)
@@ -216,7 +215,6 @@ def create_and_enqueue_job(
216215
:param on_stopped: Callback for on stopped
217216
:param task_type: The task type
218217
:param scheduled_task_id: The scheduled task id
219-
:param pipeline: The Broker Pipeline
220218
:returns: The enqueued Job
221219
"""
222220
status = JobStatus.QUEUED if when is None else JobStatus.SCHEDULED
@@ -240,7 +238,7 @@ def create_and_enqueue_job(
240238
scheduled_task_id=scheduled_task_id,
241239
)
242240
if when is None:
243-
job_model = self.enqueue_job(job_model, connection=pipeline, at_front=at_front)
241+
job_model = self.enqueue_job(job_model, at_front=at_front)
244242
elif isinstance(when, datetime):
245243
job_model.save(connection=self.connection)
246244
self.scheduled_job_registry.schedule(self.connection, job_model.name, when)
@@ -250,7 +248,7 @@ def create_and_enqueue_job(
250248

251249
def job_handle_success(
252250
self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int, connection: ConnectionType
253-
):
251+
) -> None:
254252
"""Saves and cleanup job after successful execution"""
255253
job.after_execution(
256254
job_info_ttl,
@@ -268,7 +266,7 @@ def job_handle_success(
268266
ttl=result_ttl,
269267
)
270268

271-
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType):
269+
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType) -> None:
272270
# Does not set job status since the job might be stopped
273271
job.after_execution(
274272
SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
@@ -308,10 +306,7 @@ def run_sync(self, job: JobModel) -> JobModel:
308306

309307
@classmethod
310308
def dequeue_any(
311-
cls,
312-
queues: List[Self],
313-
timeout: Optional[int],
314-
connection: Optional[ConnectionType] = None,
309+
cls, queues: List[Self], timeout: Optional[int], connection: ConnectionType
315310
) -> Tuple[Optional[JobModel], Optional[Self]]:
316311
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
317312
is important.
@@ -414,19 +409,19 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
414409
pass
415410

416411
def enqueue_job(
417-
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
412+
self, job_model: JobModel, pipeline: Optional[PipelineType] = None, at_front: bool = False
418413
) -> JobModel:
419414
"""Enqueues a job for delayed execution without checking dependencies.
420415
421416
If Queue is instantiated with is_async=False, job is executed immediately.
422417
:param job_model: The job redis model
423-
:param connection: The Redis Pipeline
418+
:param pipeline: The Broker Pipeline
424419
:param at_front: Whether to enqueue the job at the front
425420
426421
:returns: The enqueued JobModel
427422
"""
428423

429-
pipe = connection if connection is not None else self.connection.pipeline()
424+
pipe: PipelineType = pipeline if pipeline is not None else self.connection.pipeline()
430425
job_model.started_at = None
431426
job_model.ended_at = None
432427
job_model.status = JobStatus.QUEUED

scheduler/helpers/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
import importlib
33
import time
4-
from typing import Callable
4+
from typing import Callable, Any
55

66

77
def current_timestamp() -> int:
@@ -14,10 +14,10 @@ def utcnow() -> datetime.datetime:
1414
return datetime.datetime.now(datetime.timezone.utc)
1515

1616

17-
def callable_func(callable_str: str) -> Callable:
17+
def callable_func(callable_str: str) -> Callable[[Any], Any]:
1818
path = callable_str.split(".")
1919
module = importlib.import_module(".".join(path[:-1]))
20-
func = getattr(module, path[-1])
21-
if callable(func) is False:
20+
func: Callable[[Any], Any] = getattr(module, path[-1])
21+
if not callable(func):
2222
raise TypeError(f"'{callable_str}' is not callable")
2323
return func

scheduler/models/args.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from scheduler.helpers import utils
1111

12-
ARG_TYPE_TYPES_DICT:Dict[str,Type] = {
12+
ARG_TYPE_TYPES_DICT: Dict[str, Type] = {
1313
"str": str,
1414
"int": int,
1515
"bool": bool,

scheduler/models/task.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def _get_task_for_job(job: JobModel) -> Optional["Task"]:
3333
return task
3434

3535

36-
def failure_callback(job: JobModel, connection:ConnectionType, result: Any, *args: Any, **kwargs: Any) -> None:
36+
def failure_callback(job: JobModel, connection: ConnectionType, result: Any, *args: Any, **kwargs: Any) -> None:
3737
task = _get_task_for_job(job)
3838
if task is None:
3939
logger.warn(f"Could not find task for job {job.name}")
@@ -176,7 +176,7 @@ class TimeUnits(models.TextChoices):
176176
),
177177
)
178178

179-
def callable_func(self)->Callable:
179+
def callable_func(self) -> Callable:
180180
"""Translate callable string to callable"""
181181
return utils.callable_func(self.callable)
182182

@@ -187,9 +187,9 @@ def is_scheduled(self) -> bool:
187187
return False
188188
# check whether job_id is in scheduled/queued/active jobs
189189
res = (
190-
(self.job_name in self.rqueue.scheduled_job_registry.all())
191-
or (self.job_name in self.rqueue.queued_job_registry.all())
192-
or (self.job_name in self.rqueue.active_job_registry.all())
190+
(self.job_name in self.rqueue.scheduled_job_registry.all())
191+
or (self.job_name in self.rqueue.queued_job_registry.all())
192+
or (self.job_name in self.rqueue.active_job_registry.all())
193193
)
194194
# If the job_id is not scheduled/queued/started,
195195
# update the job_id to None. (The job_id belongs to a previous run which is completed)

scheduler/redis_models/registry/base_registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ def get_first(self) -> Optional[str]:
7474
first_job = self.connection.zrange(self._key, 0, 0)
7575
return first_job[0].decode() if first_job else None
7676

77-
def get_last_timestamp(self) -> Optional[float]:
77+
def get_last_timestamp(self) -> Optional[int]:
7878
"""Returns the last timestamp in the registry."""
7979
self.cleanup(self.connection)
8080
last_timestamp = self.connection.zrange(self._key, -1, -1, withscores=True)
81-
return last_timestamp[0][1] if last_timestamp else None
81+
return int(last_timestamp[0][1]) if last_timestamp else None
8282

8383
@property
8484
def key(self) -> str:

scheduler/redis_models/result.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from scheduler.helpers.utils import utcnow
77
from scheduler.redis_models.base import StreamModel, decode_dict
8+
from scheduler.settings import logger
89
from scheduler.types import ConnectionType, Self
910

1011

@@ -34,19 +35,21 @@ def create(
3435
cls,
3536
connection: ConnectionType,
3637
job_name: str,
37-
worker_name: str,
38+
worker_name: Optional[str],
3839
_type: ResultType,
3940
ttl: int,
4041
return_value: Any = None,
4142
exc_string: Optional[str] = None,
4243
) -> Self:
44+
if worker_name is None:
45+
logger.warning(f"Job {job_name} has no worker name, will save result with 'unknown_worker'")
4346
result = cls(
4447
parent=job_name,
4548
ttl=ttl,
4649
type=_type,
4750
return_value=return_value,
4851
exc_string=exc_string,
49-
worker_name=worker_name,
52+
worker_name=worker_name or "unknown_worker",
5053
)
5154
result.save(connection)
5255
return result

scheduler/types/settings_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class QueueConfiguration:
6565
USERNAME: Optional[str] = None
6666
PASSWORD: Optional[str] = None
6767

68-
ASYNC: Optional[bool] = True
68+
ASYNC: bool = True
6969

7070
SENTINELS: Optional[List[Tuple[str, int]]] = None
7171
SENTINEL_KWARGS: Optional[Dict[str, str]] = None

scheduler/views/helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def _enqueue_multiple_jobs(queue: Queue, job_names: List[str], at_front: bool =
6868
if job is None:
6969
continue
7070
job.save(connection=pipe)
71-
queue.enqueue_job(job, connection=pipe, at_front=at_front)
71+
queue.enqueue_job(job, pipeline=pipe, at_front=at_front)
7272
jobs_requeued += 1
7373
pipe.execute()
7474
return jobs_requeued

0 commit comments

Comments
 (0)