Skip to content

Commit 0e1b230

Browse files
committed
fix:defaulting to JobExecution
1 parent 91c6646 commit 0e1b230

File tree

5 files changed

+42
-9
lines changed

5 files changed

+42
-9
lines changed

scheduler/management/commands/rqworker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import click
66
from django.core.management.base import BaseCommand
77
from django.db import connections
8+
from django.template.defaultfilters import default
89
from redis.exceptions import ConnectionError
910
from rq.logutils import setup_loghandlers
1011

scheduler/rq_classes.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,11 @@ def stop_execution(self, connection: Redis):
7171
class DjangoWorker(Worker):
7272
def __init__(self, *args, **kwargs):
7373
self.fork_job_execution = kwargs.pop("fork_job_execution", True)
74-
job_class = kwargs.get("job_class", JobExecution)
74+
job_class = kwargs.get("job_class") or JobExecution
7575
if not isinstance(job_class, type) or not issubclass(job_class, JobExecution):
76-
if isinstance(job_class, type):
77-
job_class = type("JobExecutionWrapper", (JobExecution, job_class), {})
78-
else:
79-
job_class = JobExecution
76+
raise ValueError("job_class must be a subclass of JobExecution")
8077

8178
# Update kwargs with the potentially modified job_class
82-
kwargs["job_class"] = job_class
83-
8479
kwargs["job_class"] = job_class
8580
kwargs["queue_class"] = DjangoQueue
8681
super(DjangoWorker, self).__init__(*args, **kwargs)

scheduler/tests/test_mgmt_cmds.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,39 @@ def test_rqworker__no_queues_params(self):
3636
for job in jobs:
3737
self.assertTrue(job.is_failed)
3838

39+
def test_rqworker__job_class_param__green(self):
40+
queue = get_queue('default')
41+
42+
# enqueue some jobs that will fail
43+
jobs = []
44+
job_ids = []
45+
for _ in range(0, 3):
46+
job = queue.enqueue(failing_job)
47+
jobs.append(job)
48+
job_ids.append(job.id)
49+
50+
# Create a worker to execute these jobs
51+
call_command('rqworker', '--job-class', 'scheduler.rq_classes.JobExecution', fork_job_execution=False, burst=True)
52+
53+
# check if all jobs are really failed
54+
for job in jobs:
55+
self.assertTrue(job.is_failed)
56+
57+
def test_rqworker__bad_job_class__fail(self):
58+
queue = get_queue('default')
59+
60+
# enqueue some jobs that will fail
61+
jobs = []
62+
job_ids = []
63+
for _ in range(0, 3):
64+
job = queue.enqueue(failing_job)
65+
jobs.append(job)
66+
job_ids.append(job.id)
67+
68+
# Create a worker to execute these jobs
69+
with self.assertRaises(ImportError):
70+
call_command('rqworker', '--job-class', 'rq.badclass', fork_job_execution=False, burst=True)
71+
3972
def test_rqworker__run_jobs(self):
4073
queue = get_queue('default')
4174

scheduler/tests/test_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def test_create_worker__scheduler_interval(self):
4343

4444
def test_get_worker_with_custom_job_class(self):
4545
# Test with string representation of job_class
46-
worker = create_worker('default', job_class='rq.job.Job')
46+
worker = create_worker('default', job_class='scheduler.rq_classes.JobExecution')
4747
self.assertTrue(issubclass(worker.job_class, Job))
4848
self.assertTrue(issubclass(worker.job_class, JobExecution))
4949

scheduler/tools.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,12 @@ def create_worker(*queue_names, **kwargs):
7474
kwargs['name'] = kwargs['name'].replace('/', '.')
7575

7676
# Handle job_class if provided
77-
if 'job_class' in kwargs and isinstance(kwargs['job_class'], str):
77+
if 'job_class' not in kwargs or kwargs["job_class"] is None:
78+
kwargs['job_class'] = 'scheduler.rq_classes.JobExecution'
79+
try:
7880
kwargs['job_class'] = import_string(kwargs['job_class'])
81+
except ImportError:
82+
raise ImportError(f"Could not import job class {kwargs['job_class']}")
7983

8084
worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
8185
return worker

0 commit comments

Comments
 (0)