Skip to content

Commit 8d731c0

Browse files
committed
Check that arguments are compatible with a task before scheduling
Tries to verify that the arguments passed while scheduling a job are compatible with the task function to prevent a user from scheduling a job that won't be able to be processed by workers. Fixes #4
1 parent 75bdd34 commit 8d731c0

File tree

4 files changed

+55
-3
lines changed

4 files changed

+55
-3
lines changed

spinach/engine.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
8585
task = self._tasks.get(task)
8686
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
8787
task_kwargs=kwargs)
88+
job.task_func = task.func
89+
job.check_signature()
8890
return self._broker.enqueue_jobs([job])
8991

9092
def schedule_batch(self, batch: Batch):
@@ -98,10 +100,14 @@ def schedule_batch(self, batch: Batch):
98100
jobs = list()
99101
for task, at, args, kwargs in batch.jobs_to_create:
100102
task = self._tasks.get(task)
101-
jobs.append(
102-
Job(task.name, task.queue, at, task.max_retries,
103-
task_args=args, task_kwargs=kwargs)
103+
job = Job(
104+
task.name, task.queue, at, task.max_retries,
105+
task_args=args, task_kwargs=kwargs
104106
)
107+
job.task_func = task.func
108+
job.check_signature()
109+
jobs.append(job)
110+
105111
return self._broker.enqueue_jobs(jobs)
106112

107113
def _arbiter_func(self, stop_when_queue_empty=False):

spinach/exc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ class SpinachError(Exception):
44

55
class UnknownTask(SpinachError):
66
"""Task name is not registered with the Engine."""
7+
8+
9+
class InvalidJobSignatureError(SpinachError):
10+
"""Job does not have proper arguments to execute the task function."""

spinach/job.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from datetime import datetime, timezone
22
import enum
3+
import inspect
34
import json
45
from logging import getLogger
56
import math
67
from typing import Optional
78
import uuid
89

910
from . import signals
11+
from .exc import InvalidJobSignatureError
1012
from .task import RetryException, AbortException
1113
from .utils import human_duration, exponential_backoff
1214

@@ -132,6 +134,30 @@ def deserialize(cls, job_json_string: str):
132134
job.retries = job_dict['retries']
133135
return job
134136

137+
def check_signature(self):
138+
"""Check if a job has the correct params to be executed.
139+
140+
This can be used to prevent the scheduling of a job that will fail
141+
during execution because its arguments do not match the task function.
142+
143+
:raises InvalidJobSignatureError: Job arguments do not match the task
144+
"""
145+
if self.task_func is None:
146+
raise ValueError(
147+
'Cannot verify signature until a task function is assigned'
148+
)
149+
150+
try:
151+
sig = inspect.signature(self.task_func)
152+
sig.bind(*self.task_args, **self.task_kwargs)
153+
except TypeError as e:
154+
msg = 'Arguments of job not compatible with task {}: {}'.format(
155+
self.task_name, e
156+
)
157+
raise InvalidJobSignatureError(msg)
158+
except ValueError:
159+
logger.info('Cannot verify job signature, assuming it is correct')
160+
135161
def __repr__(self):
136162
return 'Job <{} {} {}>'.format(
137163
self.task_name, self.status.name, self.id

tests/test_job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from spinach import RetryException, AbortException
77
from spinach.job import Job, JobStatus, advance_job_status
8+
from spinach.exc import InvalidJobSignatureError
89

910
from .conftest import get_now, set_now
1011

@@ -122,3 +123,18 @@ def test_advance_job_status(job):
122123
advance_job_status('namespace', job, 1.0, AbortException('kaboom'))
123124
assert job.max_retries == 0
124125
assert job.status is JobStatus.FAILED
126+
127+
128+
def test_check_signature(job):
129+
def compatible_func(a, b, foo=None):
130+
pass
131+
132+
def incompatible_func(a, bar=None):
133+
pass
134+
135+
job.task_func = compatible_func
136+
assert job.check_signature() is None
137+
138+
job.task_func = incompatible_func
139+
with pytest.raises(InvalidJobSignatureError):
140+
job.check_signature()

0 commit comments

Comments
 (0)