Skip to content

Commit d1cbf4c

Browse files
feat: route UploadFinisher to dedicated queue
This separates UploadFinisher tasks from UploadProcessor tasks to prevent lightweight finishing operations from being blocked by heavy processing work. Key changes: - Add explicit route for upload_finisher_task_name before the upload.* wildcard - Default to the upload queue for backward compatibility (no k8s config = no change) - When upload_finisher queue is configured, tasks route to upload_finisher (or enterprise_upload_finisher for enterprise customers) This change is safe to deploy before or after the k8s-v2 config changes.
1 parent 367d3f1 commit d1cbf4c

File tree

3 files changed

+63
-1
lines changed

3 files changed

+63
-1
lines changed

libs/shared/shared/celery_config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,16 @@ class BaseCeleryConfig:
476476
},
477477
}
478478

479+
# Get the upload queue for backward-compatible fallback
480+
# This ensures upload_finisher continues to use the upload queue if no dedicated queue is configured
481+
_upload_queue = get_config(
482+
"setup",
483+
"tasks",
484+
TaskConfigGroup.upload.value,
485+
"queue",
486+
default=task_default_queue,
487+
)
488+
479489
task_routes = {
480490
sync_teams_task_name: {
481491
"queue": get_config(
@@ -567,6 +577,18 @@ class BaseCeleryConfig:
567577
default=task_default_queue,
568578
)
569579
},
580+
# UploadFinisher gets its own queue to avoid being blocked by UploadProcessor tasks.
581+
# Falls back to the upload queue if no dedicated queue is configured (backward-compatible).
582+
upload_finisher_task_name: {
583+
"queue": get_config(
584+
"setup",
585+
"tasks",
586+
"upload_finisher",
587+
"queue",
588+
default=_upload_queue,
589+
)
590+
},
591+
# All other upload tasks (Upload, UploadProcessor, PreProcessUpload, etc.)
570592
f"app.tasks.{TaskConfigGroup.upload.value}.*": {
571593
"queue": get_config(
572594
"setup",

libs/shared/tests/unit/test_celery_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def test_celery_config():
5151
"app.tasks.test_results.*",
5252
"app.tasks.timeseries.*",
5353
"app.tasks.upload.*",
54+
"app.tasks.upload.UploadFinisher",
5455
]
5556
assert config.broker_transport_options == {"visibility_timeout": 900}
5657
assert config.result_extended is True

libs/shared/tests/unit/test_router.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import pytest
44

5-
from shared.celery_config import timeseries_backfill_task_name, upload_task_name
5+
from shared.celery_config import (
6+
timeseries_backfill_task_name,
7+
upload_finisher_task_name,
8+
upload_task_name,
9+
)
610
from shared.celery_router import route_tasks_based_on_user_plan
711
from shared.config import ConfigHelper
812
from shared.plan.constants import DEFAULT_FREE_PLAN, PlanName
@@ -106,3 +110,38 @@ def test_route_tasks_with_owner_config(self, mock_config):
106110
assert route_tasks_based_on_user_plan(
107111
upload_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, 1
108112
) == {"queue": "enterprise_uploads_super_special", "extra_config": {}}
113+
114+
@patch.dict(
115+
"shared.celery_config.BaseCeleryConfig.task_routes",
116+
{
117+
upload_finisher_task_name: {"queue": "upload_finisher"},
118+
"app.tasks.upload.*": {"queue": "uploads"},
119+
},
120+
)
121+
@pytest.mark.django_db
122+
def test_upload_finisher_has_separate_queue(self, mocker):
123+
"""UploadFinisher should route to its own queue, separate from UploadProcessor."""
124+
mock_all_plans_and_tiers()
125+
126+
# Non-enterprise: upload_finisher goes to upload_finisher queue
127+
assert route_tasks_based_on_user_plan(
128+
upload_finisher_task_name, DEFAULT_FREE_PLAN, None
129+
) == {
130+
"queue": "upload_finisher",
131+
"extra_config": {},
132+
}
133+
# Non-enterprise: other upload tasks go to uploads queue
134+
assert route_tasks_based_on_user_plan(
135+
upload_task_name, DEFAULT_FREE_PLAN, None
136+
) == {
137+
"queue": "uploads",
138+
"extra_config": {},
139+
}
140+
# Enterprise: upload_finisher goes to enterprise_upload_finisher queue
141+
assert route_tasks_based_on_user_plan(
142+
upload_finisher_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, None
143+
) == {"queue": "enterprise_upload_finisher", "extra_config": {}}
144+
# Enterprise: other upload tasks go to enterprise_uploads queue
145+
assert route_tasks_based_on_user_plan(
146+
upload_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, None
147+
) == {"queue": "enterprise_uploads", "extra_config": {}}

0 commit comments

Comments
 (0)