Skip to content

Commit 9aa33ee

Browse files
giancarloromeosanderegg
authored andcommitted
fix queue
1 parent 0a42019 commit 9aa33ee

File tree

5 files changed

+8
-10
lines changed

5 files changed

+8
-10
lines changed

services/docker-compose.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,15 +1199,15 @@ services:
11991199
CELERY_CONCURRENCY: 100
12001200
networks: *storage_networks
12011201

1202-
sto-worker-low-prio:
1202+
sto-worker-cpu-bound:
12031203
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
12041204
init: true
1205-
hostname: "sto-worker-low-prio-{{.Node.Hostname}}-{{.Task.Slot}}"
1205+
hostname: "sto-worker-cpu-bound-{{.Node.Hostname}}-{{.Task.Slot}}"
12061206
environment:
12071207
<<: *storage_environment
12081208
STORAGE_WORKER_MODE: "true"
12091209
CELERY_CONCURRENCY: 1
1210-
CELERY_QUEUES: "low-prio"
1210+
CELERY_QUEUES: "cpu-bound"
12111211
networks: *storage_networks
12121212

12131213
rabbit:

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def setup_worker_tasks(app: Celery) -> None:
2020
logging.INFO,
2121
msg="Storage setup Worker Tasks",
2222
):
23-
define_task(app, export_data, task_queue="low-prio")
23+
define_task(app, export_data)
2424
define_task(app, compute_path_size)
2525
define_task(app, delete_paths)
2626
define_task(app, complete_upload_file)

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ async def start_data_export(
6060
task_uuid = await get_celery_client(app).send_task(
6161
"export_data",
6262
task_context=job_id_data.model_dump(),
63+
task_queue="cpu-bound",
6364
files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature
6465
)
6566
except CeleryError as exc:

services/storage/src/simcore_service_storage/modules/celery/_task.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ def define_task(
8282
app: Celery,
8383
fn: Callable[Concatenate[AbortableTask, TaskId, P], Coroutine[Any, Any, R]],
8484
task_name: str | None = None,
85-
task_queue: str | None = None,
8685
) -> None: ...
8786

8887

@@ -91,7 +90,6 @@ def define_task(
9190
app: Celery,
9291
fn: Callable[Concatenate[AbortableTask, P], R],
9392
task_name: str | None = None,
94-
task_queue: str | None = None,
9593
) -> None: ...
9694

9795

@@ -102,7 +100,6 @@ def define_task( # type: ignore[misc]
102100
| Callable[Concatenate[AbortableTask, P], R]
103101
),
104102
task_name: str | None = None,
105-
task_queue: str | None = None,
106103
) -> None:
107104
"""Decorator to define a celery task with error handling and abortable support"""
108105
wrapped_fn: Callable[Concatenate[AbortableTask, P], R]
@@ -117,5 +114,4 @@ def define_task( # type: ignore[misc]
117114
name=task_name or fn.__name__,
118115
bind=True,
119116
base=AbortableTask,
120-
queue=task_queue,
121117
)(wrapped_fn)

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import contextlib
22
import logging
33
from dataclasses import dataclass
4+
import queue
45
from typing import Any, Final
56
from uuid import uuid4
67

@@ -46,7 +47,7 @@ class CeleryTaskQueueClient:
4647
_task_store: TaskStore
4748

4849
async def send_task(
49-
self, task_name: str, *, task_context: TaskContext, **task_params
50+
self, task_name: str, *, task_context: TaskContext, task_queue: str = "default", **task_params
5051
) -> TaskUUID:
5152
with log_context(
5253
_logger,
@@ -55,7 +56,7 @@ async def send_task(
5556
):
5657
task_uuid = uuid4()
5758
task_id = build_task_id(task_context, task_uuid)
58-
self._celery_app.send_task(task_name, task_id=task_id, kwargs=task_params)
59+
self._celery_app.send_task(task_name, task_id=task_id, kwargs=task_params, queue=task_queue)
5960
await self._task_store.set_task(
6061
task_id, TaskData(status=TaskState.PENDING.name)
6162
)

0 commit comments

Comments
 (0)