Skip to content

Commit e85ccfd

Browse files
add celery routing queues
1 parent bcc60b5 commit e85ccfd

File tree

5 files changed

+21
-3
lines changed

5 files changed

+21
-3
lines changed

services/docker-compose.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,10 +1193,21 @@ services:
11931193
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
11941194
init: true
11951195
hostname: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}"
1196+
environment:
1197+
<<: *storage_environment
1198+
STORAGE_WORKER_MODE: "true"
1199+
CELERY_CONCURRENCY: 100
1200+
networks: *storage_networks
1201+
1202+
sto-worker-low-prio:
1203+
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
1204+
init: true
1205+
hostname: "sto-worker-low-prio-{{.Node.Hostname}}-{{.Task.Slot}}"
11961206
environment:
11971207
<<: *storage_environment
11981208
STORAGE_WORKER_MODE: "true"
11991209
CELERY_CONCURRENCY: 1
1210+
CELERY_QUEUES: "low-prio"
12001211
networks: *storage_networks
12011212

12021213
rabbit:

services/storage/docker/boot.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
5454
--app=simcore_service_storage.modules.celery.worker_main:app \
5555
worker --pool=threads \
5656
--loglevel="${SERVER_LOG_LEVEL}" \
57-
--concurrency="${CELERY_CONCURRENCY}"
57+
--concurrency="${CELERY_CONCURRENCY}" \
58+
--queues="${CELERY_QUEUES:-default}"
5859
else
5960
exec celery \
6061
--app=simcore_service_storage.modules.celery.worker_main:app \
6162
worker --pool=threads \
6263
--loglevel="${SERVER_LOG_LEVEL}" \
63-
--concurrency="${CELERY_CONCURRENCY}"
64+
--concurrency="${CELERY_CONCURRENCY}" \
65+
--queues="${CELERY_QUEUES:-default}"
6466
fi
6567
else
6668
if [ "${SC_BOOT_MODE}" = "debug" ]; then

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def setup_worker_tasks(app: Celery) -> None:
2020
logging.INFO,
2121
msg="Storage setup Worker Tasks",
2222
):
23-
define_task(app, export_data)
23+
define_task(app, export_data, task_queue="low_prio")
2424
define_task(app, compute_path_size)
2525
define_task(app, delete_paths)
2626
define_task(app, complete_upload_file)

services/storage/src/simcore_service_storage/modules/celery/_common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
1515
"result_expires": celery_settings.CELERY_RESULT_EXPIRES,
1616
"result_extended": True,
1717
"result_serializer": "json",
18+
"task_default_queue": "default",
1819
"task_send_sent_event": True,
1920
"task_track_started": True,
2021
"worker_send_task_events": True,

services/storage/src/simcore_service_storage/modules/celery/_task.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def define_task(
8282
app: Celery,
8383
fn: Callable[Concatenate[AbortableTask, TaskId, P], Coroutine[Any, Any, R]],
8484
task_name: str | None = None,
85+
task_queue: str | None = None,
8586
) -> None: ...
8687

8788

@@ -90,6 +91,7 @@ def define_task(
9091
app: Celery,
9192
fn: Callable[Concatenate[AbortableTask, P], R],
9293
task_name: str | None = None,
94+
task_queue: str | None = None,
9395
) -> None: ...
9496

9597

@@ -100,6 +102,7 @@ def define_task( # type: ignore[misc]
100102
| Callable[Concatenate[AbortableTask, P], R]
101103
),
102104
task_name: str | None = None,
105+
task_queue: str | None = None,
103106
) -> None:
104107
"""Decorator to define a celery task with error handling and abortable support"""
105108
wrapped_fn: Callable[Concatenate[AbortableTask, P], R]
@@ -114,4 +117,5 @@ def define_task( # type: ignore[misc]
114117
name=task_name or fn.__name__,
115118
bind=True,
116119
base=AbortableTask,
120+
queue=task_queue,
117121
)(wrapped_fn)

0 commit comments

Comments
 (0)