Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions libs/shared/shared/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,16 @@ class BaseCeleryConfig:
},
}

# Get the upload queue for backward-compatible fallback
# This ensures upload_finisher continues to use the upload queue if no dedicated queue is configured
_upload_queue = get_config(
"setup",
"tasks",
TaskConfigGroup.upload.value,
"queue",
default=task_default_queue,
)

task_routes = {
sync_teams_task_name: {
"queue": get_config(
Expand Down Expand Up @@ -567,6 +577,18 @@ class BaseCeleryConfig:
default=task_default_queue,
)
},
# UploadFinisher gets its own queue to avoid being blocked by UploadProcessor tasks.
# Falls back to the upload queue if no dedicated queue is configured (backward-compatible).
upload_finisher_task_name: {
"queue": get_config(
"setup",
"tasks",
"upload_finisher",
"queue",
default=_upload_queue,
)
},
# All other upload tasks (Upload, UploadProcessor, PreProcessUpload, etc.)
f"app.tasks.{TaskConfigGroup.upload.value}.*": {
"queue": get_config(
"setup",
Expand Down
1 change: 1 addition & 0 deletions libs/shared/tests/unit/test_celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def test_celery_config():
"app.tasks.test_results.*",
"app.tasks.timeseries.*",
"app.tasks.upload.*",
"app.tasks.upload.UploadFinisher",
]
assert config.broker_transport_options == {"visibility_timeout": 900}
assert config.result_extended is True
Expand Down
41 changes: 40 additions & 1 deletion libs/shared/tests/unit/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import pytest

from shared.celery_config import timeseries_backfill_task_name, upload_task_name
from shared.celery_config import (
timeseries_backfill_task_name,
upload_finisher_task_name,
upload_task_name,
)
from shared.celery_router import route_tasks_based_on_user_plan
from shared.config import ConfigHelper
from shared.plan.constants import DEFAULT_FREE_PLAN, PlanName
Expand Down Expand Up @@ -106,3 +110,38 @@ def test_route_tasks_with_owner_config(self, mock_config):
assert route_tasks_based_on_user_plan(
upload_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, 1
) == {"queue": "enterprise_uploads_super_special", "extra_config": {}}

@patch.dict(
"shared.celery_config.BaseCeleryConfig.task_routes",
{
upload_finisher_task_name: {"queue": "upload_finisher"},
"app.tasks.upload.*": {"queue": "uploads"},
},
)
@pytest.mark.django_db
def test_upload_finisher_has_separate_queue(self, mocker):
"""UploadFinisher should route to its own queue, separate from UploadProcessor."""
mock_all_plans_and_tiers()

# Non-enterprise: upload_finisher goes to upload_finisher queue
assert route_tasks_based_on_user_plan(
upload_finisher_task_name, DEFAULT_FREE_PLAN, None
) == {
"queue": "upload_finisher",
"extra_config": {},
}
# Non-enterprise: other upload tasks go to uploads queue
assert route_tasks_based_on_user_plan(
upload_task_name, DEFAULT_FREE_PLAN, None
) == {
"queue": "uploads",
"extra_config": {},
}
# Enterprise: upload_finisher goes to enterprise_upload_finisher queue
assert route_tasks_based_on_user_plan(
upload_finisher_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, None
) == {"queue": "enterprise_upload_finisher", "extra_config": {}}
# Enterprise: other upload tasks go to enterprise_uploads queue
assert route_tasks_based_on_user_plan(
upload_task_name, PlanName.ENTERPRISE_CLOUD_MONTHLY.value, None
) == {"queue": "enterprise_uploads", "extra_config": {}}
Loading