Skip to content

Commit d4a7d50

Browse files
author
Pete Wildsmith
committed
Merge pull request #8 from dabapps/jobs-in-child-processes
Use multiprocessing to run tasks in child processes
2 parents 8533f0e + 5dfa656 commit d4a7d50

File tree

7 files changed

+52
-23
lines changed

7 files changed

+52
-23
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
language: python
2+
sudo: false
23
python:
34
- '2.7'
45
- '3.4'
56
install:
6-
- pip install -r requirements.txt
7+
- pip install -r test-requirements.txt
78
script: python manage.py test
89
deploy:
910
provider: pypi

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,7 @@ To start a worker:
127127
manage.py worker [queue_name]
128128

129129
`queue_name` is optional, and will default to `default`
130+
131+
## Testing
132+
133+
It may be necessary to supply a DATABASE_PORT environment variable.

django_dbq/management/commands/worker.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from simplesignals.process import WorkerProcessBase
77
from time import sleep
88
import logging
9+
import multiprocessing
910

1011

1112
logger = logging.getLogger(__name__)
@@ -14,18 +15,8 @@
1415
DEFAULT_QUEUE_NAME = 'default'
1516

1617

17-
def process_job(queue_name):
18-
"""This function grabs the next available job for a given queue, and runs its next task."""
19-
20-
with transaction.atomic():
21-
job = Job.objects.get_ready_or_none(queue_name)
22-
if not job:
23-
return
24-
25-
logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task)
26-
job.state = Job.STATES.PROCESSING
27-
job.save()
28-
18+
def run_next_task(job):
19+
"""Updates a job by running its next task"""
2920
try:
3021
task_function = import_by_path(job.next_task)
3122
task_function(job)
@@ -55,6 +46,23 @@ def process_job(queue_name):
5546
raise
5647

5748

49+
def process_job(queue_name):
50+
"""This function grabs the next available job for a given queue, and runs its next task."""
51+
52+
with transaction.atomic():
53+
job = Job.objects.get_ready_or_none(queue_name)
54+
if not job:
55+
return
56+
57+
logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task)
58+
job.state = Job.STATES.PROCESSING
59+
job.save()
60+
61+
child = multiprocessing.Process(target=run_next_task, args=(job,))
62+
child.start()
63+
child.join()
64+
65+
5866
class Worker(WorkerProcessBase):
5967

6068
process_title = "jobworker"

django_dbq/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Job(models.Model):
6666
queue_name = models.CharField(max_length=20, default='default', db_index=True)
6767

6868
class Meta:
69-
ordering = ['-created']
69+
ordering = ['created']
7070

7171
objects = JobManager()
7272

@@ -100,3 +100,4 @@ def run_creation_hook(self):
100100
logger.info("Running creation hook %s for new job", creation_hook_name)
101101
creation_hook_function = import_by_path(creation_hook_name)
102102
creation_hook_function(self)
103+

django_dbq/tests.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,29 @@ def test_create_job_with_queue(self):
9393
def test_get_next_ready_job(self):
9494
self.assertTrue(Job.objects.get_ready_or_none('default') is None)
9595

96-
Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now())
97-
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
98-
expected = Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now() - timedelta(minutes=1))
96+
Job.objects.create(name='testjob', state=Job.STATES.READY)
97+
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
98+
expected = Job.objects.create(name='testjob', state=Job.STATES.READY)
99+
expected.created = datetime.now() - timedelta(minutes=1)
100+
expected.save()
99101

100102
self.assertEqual(Job.objects.get_ready_or_none('default'), expected)
101103

102104
def test_get_next_ready_job_created(self):
103105
"""
104-
Created jobs should be picked too
106+
Created jobs should be picked too.
107+
108+
We create three jobs, and expect the oldest in NEW or READY to be
109+
selected by get_ready_or_none (the model is ordered by 'created' and the
110+
query picks the .first())
105111
"""
106112
self.assertTrue(Job.objects.get_ready_or_none('default') is None)
107113

108-
Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now())
109-
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
110-
expected = Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now() - timedelta(minutes=1))
114+
Job.objects.create(name='testjob', state=Job.STATES.NEW)
115+
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
116+
expected = Job.objects.create(name='testjob', state=Job.STATES.NEW)
117+
expected.created = datetime.now() - timedelta(minutes=1)
118+
expected.save()
111119

112120
self.assertEqual(Job.objects.get_ready_or_none('default'), expected)
113121

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-r requirements.txt
2+
pymysql==0.6.7

testsettings.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import os
2+
import pymysql
3+
pymysql.install_as_MySQLdb()
4+
15
DATABASES = {
26
'default': {
3-
'ENGINE': 'django.db.backends.sqlite3',
4-
'NAME': ':memory:',
7+
'ENGINE': 'django.db.backends.mysql',
8+
'NAME': 'django_db_queue',
9+
'PORT': os.getenv('DATABASE_PORT', 3306),
510
},
611
}
712

0 commit comments

Comments
 (0)