Skip to content

Commit f0fdedd

Browse files
committed
feat:add job-class kwarg to rqworker
1 parent 369de22 commit f0fdedd

File tree

4 files changed

+30
-3
lines changed

4 files changed

+30
-3
lines changed

scheduler/management/commands/rqworker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def add_arguments(self, parser):
4747
help='Maximum number of jobs to execute before terminating worker')
4848
parser.add_argument('--fork-job-execution', action='store', default=True, dest='fork_job_execution', type=bool,
4949
help='Fork job execution to another process')
50+
parser.add_argument('--job-class', action='store', dest='job_class',
51+
help='Jobs class to use')
5052
parser.add_argument(
5153
'queues', nargs='*', type=str,
5254
help='The queues to work on, separated by space, all queues should be using the same redis')
@@ -71,6 +73,7 @@ def handle(self, **options):
7173
w = create_worker(
7274
*queues,
7375
name=options['name'],
76+
job_class=options.get('job_class'),
7477
default_worker_ttl=options['worker_ttl'],
7578
fork_job_execution=options['fork_job_execution'], )
7679

scheduler/rq_classes.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,19 @@ def stop_execution(self, connection: Redis):
7070

7171
class DjangoWorker(Worker):
7272
def __init__(self, *args, **kwargs):
73-
self.fork_job_execution = kwargs.pop('fork_job_execution', True)
74-
kwargs['job_class'] = JobExecution
75-
kwargs['queue_class'] = DjangoQueue
73+
self.fork_job_execution = kwargs.pop("fork_job_execution", True)
74+
job_class = kwargs.get("job_class", JobExecution)
75+
if not isinstance(job_class, type) or not issubclass(job_class, JobExecution):
76+
if isinstance(job_class, type):
77+
job_class = type("JobExecutionWrapper", (JobExecution, job_class), {})
78+
else:
79+
job_class = JobExecution
80+
81+
# Update kwargs with the potentially modified job_class
82+
kwargs["job_class"] = job_class
83+
84+
kwargs["job_class"] = job_class
85+
kwargs["queue_class"] = DjangoQueue
7686
super(DjangoWorker, self).__init__(*args, **kwargs)
7787

7888
def __eq__(self, other):

scheduler/tests/test_worker.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
import uuid
33

4+
from rq.job import Job
5+
from scheduler.rq_classes import JobExecution
46
from scheduler.tests.testtools import SchedulerBaseCase
57
from scheduler.tools import create_worker
68
from . import test_settings # noqa
@@ -38,3 +40,9 @@ def test_create_worker__scheduler_interval(self):
3840
worker.work(burst=True)
3941
self.assertEqual(worker.scheduler.interval, 1)
4042
settings.SCHEDULER_CONFIG['SCHEDULER_INTERVAL'] = prev
43+
44+
def test_get_worker_custom_classes(self):
45+
# Test with string representation of job_class
46+
worker = create_worker('default', job_class='rq.job.Job')
47+
self.assertTrue(issubclass(worker.job_class, Job))
48+
self.assertTrue(issubclass(worker.job_class, JobExecution))

scheduler/tools.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import croniter
55
from django.apps import apps
66
from django.utils import timezone
7+
from django.utils.module_loading import import_string
78

89
from scheduler.queues import get_queues, logger, get_queue
910
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES
@@ -71,6 +72,11 @@ def create_worker(*queue_names, **kwargs):
7172
kwargs['name'] = _calc_worker_name(existing_worker_names)
7273

7374
kwargs['name'] = kwargs['name'].replace('/', '.')
75+
76+
# Handle job_class if provided
77+
if 'job_class' in kwargs and isinstance(kwargs['job_class'], str):
78+
kwargs['job_class'] = import_string(kwargs['job_class'])
79+
7480
worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
7581
return worker
7682

0 commit comments

Comments
 (0)