Skip to content

Commit 3b1483b

Browse files
committed
Queue names
1 parent 05e1d90 commit 3b1483b

File tree

4 files changed

+25
-15
lines changed

4 files changed

+25
-15
lines changed

django_dbq/management/commands/create_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ def handle(self, *args, **options):
4444
kwargs['queue_name'] = queue_name
4545

4646
job = Job.objects.create(**kwargs)
47-
self.stdout.write('Created job: "%s", id=%s' % (job.name, job.pk))
47+
self.stdout.write('Created job: "%s", id=%s for queue "%s"' % (job.name, job.pk, queue_name if queue_name else 'default'))

django_dbq/management/commands/worker.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13-
def process_job():
14-
"""This function grabs the next available job, and runs its next task."""
13+
def process_job(queue_name):
14+
"""This function grabs the next available job for a given queue, and runs its next task."""
1515

1616
with transaction.atomic():
17-
job = Job.objects.get_ready_or_none()
17+
job = Job.objects.get_ready_or_none(queue_name)
1818
if not job:
1919
return
2020

21-
logger.info('Processing job: name="%s" id=%s state=%s next_task=%s', job.name, job.pk, job.state, job.next_task)
21+
logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task)
2222
job.state = Job.STATES.PROCESSING
2323
job.save()
2424

@@ -55,16 +55,26 @@ class Worker(WorkerProcessBase):
5555

5656
process_title = "jobworker"
5757

58+
def __init__(self, name):
59+
self.queue_name = name
60+
super(Worker, self).__init__()
61+
5862
def do_work(self):
5963
sleep(1)
60-
process_job()
64+
process_job(self.queue_name)
6165

6266

6367
class Command(NoArgsCommand):
6468

6569
help = "Run a queue worker process"
6670

6771
def handle_noargs(self, **options):
72+
self.handle('default')
73+
74+
def handle(self, *args, **options):
75+
if len(args) != 1:
76+
raise CommandError("Please supply a single queue job name")
77+
6878
logger.info("Starting job worker")
6979
worker = Worker()
7080
worker.run()

django_dbq/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212

1313
class JobManager(models.Manager):
1414

15-
def get_ready_or_none(self, max_retries=3):
15+
def get_ready_or_none(self, queue_name, max_retries=3):
1616
"""
17-
Get a job in state READY or NEW. Supports retrying in case of database deadlock
17+
Get a job in state READY or NEW for a given queue. Supports retrying in case of database deadlock
1818
1919
See https://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
2020
@@ -31,7 +31,7 @@ def get_ready_or_none(self, max_retries=3):
3131
retries_left = max_retries
3232
while True:
3333
try:
34-
return self.select_for_update().filter(state__in=(Job.STATES.READY, Job.STATES.NEW)).first()
34+
return self.select_for_update().filter(queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW)).first()
3535
except Exception as e:
3636
if retries_left == 0:
3737
raise

django_dbq/tests.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,25 +66,25 @@ def test_create_job(self):
6666
self.assertEqual(job.state, Job.STATES.NEW)
6767

6868
def test_get_next_ready_job(self):
69-
self.assertTrue(Job.objects.get_ready_or_none() is None)
69+
self.assertTrue(Job.objects.get_ready_or_none('default') is None)
7070

7171
Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now())
7272
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
7373
expected = Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now() - timedelta(minutes=1))
7474

75-
self.assertEqual(Job.objects.get_ready_or_none(), expected)
75+
self.assertEqual(Job.objects.get_ready_or_none('default'), expected)
7676

7777
def test_get_next_ready_job_created(self):
7878
"""
7979
Created jobs should be picked too
8080
"""
81-
self.assertTrue(Job.objects.get_ready_or_none() is None)
81+
self.assertTrue(Job.objects.get_ready_or_none('default') is None)
8282

8383
Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now())
8484
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
8585
expected = Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now() - timedelta(minutes=1))
8686

87-
self.assertEqual(Job.objects.get_ready_or_none(), expected)
87+
self.assertEqual(Job.objects.get_ready_or_none('default'), expected)
8888

8989

9090
@override_settings(JOBS={'testjob': {'tasks': ['a', 'b', 'c']}})
@@ -106,7 +106,7 @@ class ProcessJobTestCase(TestCase):
106106

107107
def test_process_job(self):
108108
job = Job.objects.create(name='testjob')
109-
process_job()
109+
process_job('default')
110110
job = Job.objects.get()
111111
self.assertEqual(job.state, Job.STATES.COMPLETE)
112112

@@ -133,7 +133,7 @@ class JobFailureHookTestCase(TestCase):
133133

134134
def test_failure_hook(self):
135135
job = Job.objects.create(name='testjob')
136-
process_job()
136+
process_job('default')
137137
job = Job.objects.get()
138138
self.assertEqual(job.state, Job.STATES.FAILED)
139139
self.assertEqual(job.workspace['output'], 'failure hook ran')

0 commit comments

Comments
 (0)