Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def add_arguments(self, parser):
help="Fork job execution to another process",
)
parser.add_argument("--job-class", action="store", dest="job_class", help="Jobs class to use")
parser.add_argument('--worker-class', action='store', dest='worker_class', help='RQ Worker class to use')
parser.add_argument(
"queues",
nargs="*",
Expand Down Expand Up @@ -102,6 +103,7 @@ def handle(self, **options):
*queues,
name=options["name"],
job_class=options.get("job_class"),
worker_class=options.get("worker_class"),
default_worker_ttl=options["worker_ttl"],
fork_job_execution=options["fork_job_execution"],
)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
"""
from .settings import QUEUES

kwargs["job_class"] = JobExecution
kwargs["job_class"] = kwargs.get("job_class") or JobExecution
queue_params = QUEUES[queue_names[0]]
queues = [get_queue(queue_names[0], **kwargs)]
# perform consistency checks while building return list
Expand Down
14 changes: 8 additions & 6 deletions scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(self, *args, **kwargs):
# Update kwargs with the potentially modified job_class
kwargs["job_class"] = job_class
kwargs["queue_class"] = DjangoQueue
kwargs.pop("worker_class", None)
super(DjangoWorker, self).__init__(*args, **kwargs)

def __eq__(self, other):
Expand Down Expand Up @@ -184,7 +185,8 @@ class DjangoQueue(Queue):
"""

def __init__(self, *args, **kwargs):
kwargs["job_class"] = JobExecution
kwargs["job_class"] = kwargs.get("job_class") or JobExecution
self.job_class = kwargs["job_class"]
super(DjangoQueue, self).__init__(*args, **kwargs)

def get_registry(self, name: str) -> Union[None, BaseRegistry, "DjangoQueue"]:
Expand All @@ -204,39 +206,39 @@ def started_job_registry(self):
return StartedJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
job_class=self.job_class,
)

@property
def deferred_job_registry(self):
return DeferredJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
job_class=self.job_class,
)

@property
def failed_job_registry(self):
return FailedJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
job_class=self.job_class,
)

@property
def scheduled_job_registry(self):
return ScheduledJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
job_class=self.job_class,
)

@property
def canceled_job_registry(self):
return CanceledJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
job_class=self.job_class,
)

def get_all_job_ids(self) -> List[str]:
Expand Down
68 changes: 64 additions & 4 deletions scheduler/tests/test_mgmt_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@


class RqworkerTestCase(TestCase):

def test_rqworker__no_queues_params(self):
queue = get_queue("default")

Expand Down Expand Up @@ -49,7 +48,11 @@ def test_rqworker__job_class_param__green(self):

# Create a worker to execute these jobs
call_command(
"rqworker", "--job-class", "scheduler.rq_classes.JobExecution", fork_job_execution=False, burst=True
"rqworker",
"--job-class",
"scheduler.rq_classes.JobExecution",
fork_job_execution=False,
burst=True,
)

# check if all jobs are really failed
Expand All @@ -69,7 +72,58 @@ def test_rqworker__bad_job_class__fail(self):

# Create a worker to execute these jobs
with self.assertRaises(ImportError):
call_command("rqworker", "--job-class", "rq.badclass", fork_job_execution=False, burst=True)
call_command(
"rqworker",
"--job-class",
"rq.badclass",
fork_job_execution=False,
burst=True,
)

def test_rqworker__worker_class_param__fail(self):
queue = get_queue("default")

# enqueue some jobs that will fail
jobs = []
job_ids = []
for _ in range(0, 3):
job = queue.enqueue(failing_job)
jobs.append(job)
job_ids.append(job.id)

# Create a worker to execute these jobs with a bad worker class
with self.assertRaises(ImportError):
call_command(
"rqworker",
"--worker-class",
"scheduler.bad_worker_class",
fork_job_execution=False,
burst=True,
)

def test_rqworker__worker_class_param__green(self):
queue = get_queue("default")

# enqueue some jobs that will fail
jobs = []
job_ids = []
for _ in range(0, 3):
job = queue.enqueue(failing_job)
jobs.append(job)
job_ids.append(job.id)

# Create a worker to execute these jobs with a good worker class
call_command(
"rqworker",
"--worker-class",
"scheduler.rq_classes.DjangoWorker",
fork_job_execution=False,
burst=True,
)

# check if all jobs are really failed
for job in jobs:
self.assertTrue(job.is_failed)

def test_rqworker__run_jobs(self):
queue = get_queue("default")
Expand Down Expand Up @@ -105,7 +159,13 @@ def test_rqworker__worker_with_two_queues(self):
job_ids.append(job.id)

# Create a worker to execute these jobs
call_command("rqworker", "default", "django_tasks_scheduler_test", fork_job_execution=False, burst=True)
call_command(
"rqworker",
"default",
"django_tasks_scheduler_test",
fork_job_execution=False,
burst=True,
)

# check if all jobs are really failed
for job in jobs:
Expand Down
20 changes: 19 additions & 1 deletion scheduler/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid

from rq.job import Job
from scheduler.rq_classes import JobExecution
from scheduler.rq_classes import JobExecution, DjangoWorker
from scheduler.tests.testtools import SchedulerBaseCase
from scheduler.tools import create_worker
from . import test_settings # noqa
Expand Down Expand Up @@ -50,3 +50,21 @@ def test_get_worker_with_custom_job_class(self):
def test_get_worker_without_custom_job_class(self):
worker = create_worker("default")
self.assertTrue(issubclass(worker.job_class, JobExecution))

def test_get_worker_with_custom_worker_class(self):
worker = create_worker(
"default", worker_class="scheduler.rq_classes.DjangoWorker"
)
self.assertIsInstance(worker, DjangoWorker)

def test_get_worker_with_bad_custom_worker_class(self):
with self.assertRaises(ImportError):
create_worker("default", worker_class="scheduler.non_existent_class")

def test_create_worker_with_rq_worker_class(self):
with self.assertRaises(ValueError):
create_worker("default", worker_class="rq.Worker")

def test_get_worker_without_custom_worker_class(self):
worker = create_worker("default")
self.assertIsInstance(worker, DjangoWorker)
14 changes: 13 additions & 1 deletion scheduler/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,24 @@ def create_worker(*queue_names, **kwargs):
# Handle job_class if provided
if "job_class" not in kwargs or kwargs["job_class"] is None:
kwargs["job_class"] = "scheduler.rq_classes.JobExecution"

try:
kwargs["job_class"] = import_string(kwargs["job_class"])
except ImportError:
raise ImportError(f"Could not import job class {kwargs['job_class']}")

worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
# Handle worker_class if provided
if "worker_class" in kwargs and kwargs["worker_class"]:
try:
worker_class = import_string(kwargs["worker_class"])
if not issubclass(worker_class, DjangoWorker):
raise ValueError("worker_class must be a subclass of DjangoWorker")
except ImportError:
raise ImportError(f"Could not import worker class {kwargs['worker_class']}")
else:
worker_class = DjangoWorker

worker = worker_class(queues, connection=queues[0].connection, **kwargs)
return worker


Expand Down
Loading