Skip to content

Commit 118d764

Browse files
authored
Check queue size to launch the heavy processing worker (#4288)
1 parent e30ba83 commit 118d764

File tree

5 files changed

+47
-7
lines changed

5 files changed

+47
-7
lines changed

backend/pycon/celery.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ def setup_periodic_tasks(sender, **kwargs):
2323
)
2424
from schedule.tasks import process_schedule_items_videos_to_upload
2525
from files_upload.tasks import delete_unused_files
26-
from pycon.tasks import check_for_idle_heavy_processing_workers
26+
from pycon.tasks import (
27+
check_for_idle_heavy_processing_workers,
28+
check_pending_heavy_processing_work,
29+
)
2730

2831
add = sender.add_periodic_task
2932

@@ -47,3 +50,8 @@ def setup_periodic_tasks(sender, **kwargs):
4750
check_for_idle_heavy_processing_workers,
4851
name="Check for idle heavy processing workers",
4952
)
53+
add(
54+
timedelta(minutes=1),
55+
check_pending_heavy_processing_work,
56+
name="Check pending heavy processing work",
57+
)

backend/pycon/tasks.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import boto3
44
from django.conf import settings
55
from pycon.celery_utils import OnlyOneAtTimeTask
6-
6+
from redis import Redis
77
from collections import Counter
88
from pycon.celery import app
99
from django.core.cache import cache
@@ -153,3 +153,14 @@ def check_for_idle_heavy_processing_workers():
153153

154154
def build_idle_worker_cache_key(worker_name: str) -> str:
155155
return f"celery-workers-idle:{worker_name}"
156+
157+
158+
@app.task(base=OnlyOneAtTimeTask)
159+
def check_pending_heavy_processing_work():
160+
redis = Redis.from_url(settings.CELERY_BROKER_URL)
161+
tasks_in_queue = redis.llen("heavy_processing")
162+
163+
if tasks_in_queue == 0:
164+
return
165+
166+
launch_heavy_processing_worker.delay()

backend/pycon/tests/test_tasks.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
build_idle_worker_cache_key,
44
launch_heavy_processing_worker,
55
check_for_idle_heavy_processing_workers,
6+
check_pending_heavy_processing_work,
67
)
78

89

@@ -383,3 +384,23 @@ def test_dont_shutdown_heavy_processing_worker_if_last_check_is_not_5_mins_old(m
383384

384385
mock_cache.set.assert_not_called()
385386
mock_broadcast.assert_not_called()
387+
388+
389+
def test_check_pending_heavy_processing_work(mocker):
390+
mock_launch = mocker.patch("pycon.tasks.launch_heavy_processing_worker")
391+
redis_mock = mocker.patch("pycon.tasks.Redis", return_value=mocker.MagicMock())
392+
redis_mock.from_url.return_value.llen.return_value = 10
393+
394+
check_pending_heavy_processing_work()
395+
396+
mock_launch.delay.assert_called()
397+
398+
399+
def test_check_pending_heavy_processing_work_with_no_work(mocker):
400+
mock_launch = mocker.patch("pycon.tasks.launch_heavy_processing_worker")
401+
redis_mock = mocker.patch("pycon.tasks.Redis", return_value=mocker.MagicMock())
402+
redis_mock.from_url.return_value.llen.return_value = 0
403+
404+
check_pending_heavy_processing_work()
405+
406+
mock_launch.delay.assert_not_called()

backend/video_uploads/admin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from pycon.tasks import launch_heavy_processing_worker
21
from django.utils.safestring import mark_safe
32
from django.contrib import admin
43
from django.db import transaction
54

5+
from pycon.tasks import check_pending_heavy_processing_work
66
from video_uploads.tasks import process_wetransfer_to_s3_transfer_request
77
from video_uploads.models import WetransferToS3TransferRequest
88

@@ -16,7 +16,7 @@ def _on_commit():
1616
process_wetransfer_to_s3_transfer_request.apply_async(
1717
args=[request_obj.id], queue="heavy_processing"
1818
)
19-
launch_heavy_processing_worker.delay()
19+
check_pending_heavy_processing_work.delay()
2020

2121
transaction.on_commit(_on_commit)
2222

backend/video_uploads/tests/test_admin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ def test_queue_wetransfer_to_s3_transfer_request(
1212
mock_process_wetransfer_to_s3_transfer_request = mocker.patch(
1313
"video_uploads.admin.process_wetransfer_to_s3_transfer_request"
1414
)
15-
mock_launch_heavy_processing_worker = mocker.patch(
16-
"video_uploads.admin.launch_heavy_processing_worker"
15+
mock_check_pending_heavy_processing_work = mocker.patch(
16+
"video_uploads.admin.check_pending_heavy_processing_work"
1717
)
1818

1919
request = WetransferToS3TransferRequestFactory()
@@ -29,7 +29,7 @@ def test_queue_wetransfer_to_s3_transfer_request(
2929
mock_process_wetransfer_to_s3_transfer_request.apply_async.assert_called_once_with(
3030
args=[request.id], queue="heavy_processing"
3131
)
32-
mock_launch_heavy_processing_worker.delay.assert_called_once()
32+
mock_check_pending_heavy_processing_work.delay.assert_called_once()
3333

3434

3535
def test_retry_transfer():

0 commit comments

Comments
 (0)