Skip to content

Commit 1c50f23

Browse files
refactor: queue names
1 parent abbeaa4 commit 1c50f23

File tree

11 files changed

+63
-20
lines changed

11 files changed

+63
-20
lines changed

packages/celery-library/tests/unit/test_async_jobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async def rpc_sync_job(
8282
task_manager: TaskManager, *, job_id_data: AsyncJobNameData, **kwargs: Any
8383
) -> AsyncJobGet:
8484
task_name = sync_job.__name__
85-
task_uuid = await task_manager.submit_task(
85+
task_uuid = await task_manager.send_task(
8686
TaskMetadata(name=task_name), task_context=job_id_data.model_dump(), **kwargs
8787
)
8888

@@ -94,7 +94,7 @@ async def rpc_async_job(
9494
task_manager: TaskManager, *, job_id_data: AsyncJobNameData, **kwargs: Any
9595
) -> AsyncJobGet:
9696
task_name = async_job.__name__
97-
task_uuid = await task_manager.submit_task(
97+
task_uuid = await task_manager.send_task(
9898
TaskMetadata(name=task_name), task_context=job_id_data.model_dump(), **kwargs
9999
)
100100

packages/service-library/src/servicelib/celery/models.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@
1111
TaskName: TypeAlias = Annotated[
1212
str, StringConstraints(strip_whitespace=True, min_length=1)
1313
]
14+
15+
TaskQueue: TypeAlias = Annotated[
16+
str,
17+
StringConstraints(
18+
strip_whitespace=True,
19+
min_length=1,
20+
max_length=64,
21+
),
22+
]
23+
TASK_QUEUE_DEFAULT: TaskQueue = "default"
24+
1425
TaskUUID: TypeAlias = UUID
1526

1627

@@ -23,15 +34,10 @@ class TaskState(StrEnum):
2334
ABORTED = "ABORTED"
2435

2536

26-
class TasksQueue(StrEnum):
27-
CPU_BOUND = "cpu_bound"
28-
DEFAULT = "default"
29-
30-
3137
class TaskMetadata(BaseModel):
3238
name: TaskName
3339
ephemeral: bool = True
34-
queue: TasksQueue = TasksQueue.DEFAULT
40+
queue: TaskQueue = TASK_QUEUE_DEFAULT
3541

3642

3743
class Task(BaseModel):

packages/service-library/src/servicelib/celery/task_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414

1515
class TaskManager(Protocol):
16-
async def submit_task(
17-
self, task_metadata: TaskMetadata, *, task_context: TaskContext, **task_param
16+
async def send_task(
17+
self, task_metadata: TaskMetadata, *, task_context: TaskContext, **task_params
1818
) -> TaskUUID: ...
1919

2020
async def cancel_task(

services/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ services:
12991299
STORAGE_WORKER_NAME: "sto-worker-cpu-bound-{{.Node.Hostname}}-{{.Task.Slot}}-{{.Task.ID}}"
13001300
STORAGE_WORKER_MODE: "true"
13011301
CELERY_CONCURRENCY: 1
1302-
CELERY_QUEUES: "cpu_bound"
1302+
CELERY_QUEUES: "storage.cpu_bound"
13031303
networks: *storage_networks
13041304

13051305
rabbit:

services/notifications/src/simcore_service_notifications/modules/celery/tasks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from enum import StrEnum
23

34
from celery import Celery # type: ignore[import-untyped]
45
from celery_library.task import register_task
@@ -11,6 +12,13 @@
1112
_logger = logging.getLogger(__name__)
1213

1314

15+
_TASK_QUEUE_PREFIX: str = "notifications"
16+
17+
18+
class TaskQueue(StrEnum):
19+
DEFAULT = f"{_TASK_QUEUE_PREFIX}.default"
20+
21+
1422
def setup_worker_tasks(app: Celery) -> None:
1523
register_celery_types()
1624
register_pydantic_types(NotificationMessage, EmailRecipient, SMSRecipient)
Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,30 @@
1+
from enum import StrEnum
2+
3+
from servicelib.celery.models import TaskContext, TaskMetadata
14
from servicelib.celery.task_manager import TaskManager
25

36
from ..models.schemas import NotificationMessage, Recipient
47

8+
CHANNELS = {
9+
"email": "notifications.email",
10+
}
11+
12+
13+
class TaskQueues(StrEnum):
14+
DEFAULT = "notifications.default"
15+
516

617
async def send_notification(
718
task_manager: TaskManager,
819
*,
920
message: NotificationMessage,
1021
recipients: list[Recipient],
11-
) -> None: ...
22+
) -> None:
23+
for recipient in recipients:
24+
await task_manager.send_task(
25+
TaskMetadata(
26+
name="notifications.send_email",
27+
queue=TaskQueues.DEFAULT,
28+
),
29+
task_context=TaskContext(),
30+
)

services/storage/src/simcore_service_storage/api/rest/_files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ async def complete_upload_file(
288288
user_id=query_params.user_id,
289289
product_name=_UNDEFINED_PRODUCT_NAME_FOR_WORKER_TASKS, # NOTE: I would need to change the API here
290290
)
291-
task_uuid = await task_manager.submit_task(
291+
task_uuid = await task_manager.send_task(
292292
TaskMetadata(
293293
name=remote_complete_upload_file.__name__,
294294
),

services/storage/src/simcore_service_storage/api/rpc/_paths.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async def compute_path_size(
2525
path: Path,
2626
) -> AsyncJobGet:
2727
task_name = remote_compute_path_size.__name__
28-
task_uuid = await task_manager.submit_task(
28+
task_uuid = await task_manager.send_task(
2929
task_metadata=TaskMetadata(
3030
name=task_name,
3131
),
@@ -46,7 +46,7 @@ async def delete_paths(
4646
paths: set[Path],
4747
) -> AsyncJobGet:
4848
task_name = remote_delete_paths.__name__
49-
task_uuid = await task_manager.submit_task(
49+
task_uuid = await task_manager.send_task(
5050
task_metadata=TaskMetadata(
5151
name=task_name,
5252
),

services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
)
55
from models_library.api_schemas_storage.storage_schemas import FoldersBody
66
from models_library.api_schemas_webserver.storage import PathToExport
7-
from servicelib.celery.models import TaskMetadata, TasksQueue
7+
from servicelib.celery.models import TaskMetadata
88
from servicelib.celery.task_manager import TaskManager
99
from servicelib.rabbitmq import RPCRouter
1010

11+
from ...modules.celery.tasks import TaskQueue
1112
from .._worker_tasks._simcore_s3 import deep_copy_files_from_project, export_data
1213

1314
router = RPCRouter()
@@ -20,7 +21,7 @@ async def copy_folders_from_project(
2021
body: FoldersBody,
2122
) -> AsyncJobGet:
2223
task_name = deep_copy_files_from_project.__name__
23-
task_uuid = await task_manager.submit_task(
24+
task_uuid = await task_manager.send_task(
2425
task_metadata=TaskMetadata(
2526
name=task_name,
2627
),
@@ -39,11 +40,11 @@ async def start_export_data(
3940
paths_to_export: list[PathToExport],
4041
) -> AsyncJobGet:
4142
task_name = export_data.__name__
42-
task_uuid = await task_manager.submit_task(
43+
task_uuid = await task_manager.send_task(
4344
task_metadata=TaskMetadata(
4445
name=task_name,
4546
ephemeral=False,
46-
queue=TasksQueue.CPU_BOUND,
47+
queue=TaskQueue.CPU_BOUND,
4748
),
4849
task_context=job_id_data.model_dump(),
4950
user_id=job_id_data.user_id,
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from enum import StrEnum
2+
3+
_TASK_QUEUE_PREFIX: str = "storage."
4+
5+
6+
class TaskQueue(StrEnum):
7+
DEFAULT = f"{_TASK_QUEUE_PREFIX}.default"
8+
CPU_BOUND = f"{_TASK_QUEUE_PREFIX}.cpu_bound"

0 commit comments

Comments
 (0)