Skip to content

Commit 08ce0fe

Browse files
✨ Add Celery routing queues (#7471)
Co-authored-by: sanderegg <[email protected]>
1 parent 2aa5080 commit 08ce0fe

File tree

15 files changed

+185
-61
lines changed

15 files changed

+185
-61
lines changed

packages/pytest-simcore/src/pytest_simcore/simcore_services.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import json
77
import logging
88
import warnings
9+
from collections.abc import Iterator
910
from dataclasses import dataclass
1011
from io import StringIO
11-
from typing import Iterator
1212

1313
import aiohttp
1414
import pytest
@@ -38,6 +38,7 @@
3838
"traefik",
3939
"whoami",
4040
"sto-worker",
41+
"sto-worker-cpu-bound",
4142
}
4243
# TODO: unify healthcheck policies see https://github.com/ITISFoundation/osparc-simcore/pull/2281
4344
SERVICE_PUBLISHED_PORT = {}

packages/settings-library/src/settings_library/celery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ class CelerySettings(BaseCustomSettings):
2222
description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
2323
),
2424
] = timedelta(days=7)
25+
CELERY_EPHEMERAL_RESULT_EXPIRES: Annotated[
26+
timedelta,
27+
Field(
28+
description="Time after which ephemeral task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
29+
),
30+
] = timedelta(hours=1)
2531
CELERY_RESULT_PERSISTENT: Annotated[
2632
bool,
2733
Field(

services/docker-compose.devel.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,16 @@ services:
215215
STORAGE_PROFILING : ${STORAGE_PROFILING}
216216
STORAGE_LOGLEVEL: DEBUG
217217

218+
sto-worker-cpu-bound:
219+
volumes:
220+
- ./storage:/devel/services/storage
221+
- ../packages:/devel/packages
222+
- ${HOST_UV_CACHE_DIR}:/home/scu/.cache/uv
223+
environment:
224+
<<: *common-environment
225+
STORAGE_PROFILING : ${STORAGE_PROFILING}
226+
STORAGE_LOGLEVEL: DEBUG
227+
218228
agent:
219229
environment:
220230
<<: *common-environment

services/docker-compose.local.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ services:
133133
ports:
134134
- "8080"
135135
- "3021:3000"
136+
137+
sto-worker-cpu-bound:
138+
environment:
139+
<<: *common_environment
140+
STORAGE_REMOTE_DEBUGGING_PORT : 3000
141+
ports:
142+
- "8080"
143+
- "3022:3000"
136144
webserver:
137145
environment: &webserver_environment_local
138146
<<: *common_environment

services/docker-compose.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,10 +1193,21 @@ services:
11931193
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
11941194
init: true
11951195
hostname: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}"
1196+
environment:
1197+
<<: *storage_environment
1198+
STORAGE_WORKER_MODE: "true"
1199+
CELERY_CONCURRENCY: 100
1200+
networks: *storage_networks
1201+
1202+
sto-worker-cpu-bound:
1203+
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
1204+
init: true
1205+
hostname: "sto-worker-cpu-bound-{{.Node.Hostname}}-{{.Task.Slot}}"
11961206
environment:
11971207
<<: *storage_environment
11981208
STORAGE_WORKER_MODE: "true"
11991209
CELERY_CONCURRENCY: 1
1210+
CELERY_QUEUES: "cpu-bound"
12001211
networks: *storage_networks
12011212

12021213
rabbit:

services/storage/docker/boot.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
5454
--app=simcore_service_storage.modules.celery.worker_main:app \
5555
worker --pool=threads \
5656
--loglevel="${SERVER_LOG_LEVEL}" \
57-
--concurrency="${CELERY_CONCURRENCY}"
57+
--concurrency="${CELERY_CONCURRENCY}" \
58+
--queues="${CELERY_QUEUES:-default}"
5859
else
5960
exec celery \
6061
--app=simcore_service_storage.modules.celery.worker_main:app \
6162
worker --pool=threads \
6263
--loglevel="${SERVER_LOG_LEVEL}" \
63-
--concurrency="${CELERY_CONCURRENCY}"
64+
--concurrency="${CELERY_CONCURRENCY}" \
65+
--queues="${CELERY_QUEUES:-default}"
6466
fi
6567
else
6668
if [ "${SC_BOOT_MODE}" = "debug" ]; then

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from servicelib.rabbitmq import RPCRouter
2222

2323
from ...modules.celery import get_celery_client
24-
from ...modules.celery.errors import decode_celery_transferrable_error
24+
from ...modules.celery.errors import (
25+
TransferrableCeleryError,
26+
decode_celery_transferrable_error,
27+
)
2528
from ...modules.celery.models import TaskState
2629

2730
_logger = logging.getLogger(__name__)
@@ -102,6 +105,7 @@ async def result(
102105
# try to recover the original error
103106
exception = None
104107
with log_catch(_logger, reraise=False):
108+
assert isinstance(_result, TransferrableCeleryError) # nosec
105109
exception = decode_celery_transferrable_error(_result)
106110
exc_type = type(exception).__name__
107111
exc_msg = f"{exception}"

services/storage/src/simcore_service_storage/api/rpc/_simcore_s3.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from servicelib.rabbitmq import RPCRouter
99

1010
from ...modules.celery import get_celery_client
11+
from ...modules.celery.models import TaskMetadata, TasksQueue
1112
from .._worker_tasks._simcore_s3 import deep_copy_files_from_project, export_data
1213

1314
router = RPCRouter()
@@ -36,6 +37,10 @@ async def start_export_data(
3637
task_uuid = await get_celery_client(app).send_task(
3738
export_data.__name__,
3839
task_context=job_id_data.model_dump(),
40+
task_metadata=TaskMetadata(
41+
ephemeral=False,
42+
queue=TasksQueue.CPU_BOUND,
43+
),
3944
user_id=job_id_data.user_id,
4045
paths_to_export=paths_to_export,
4146
)

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from ...core.settings import get_application_settings
1010
from ._celery_types import register_celery_types
1111
from ._common import create_app
12-
from .backends._redis import RedisTaskStore
12+
from .backends._redis import RedisTaskMetadataStore
1313
from .client import CeleryTaskQueueClient
1414

1515
_logger = logging.getLogger(__name__)
@@ -29,7 +29,9 @@ async def on_startup() -> None:
2929
)
3030

3131
app.state.celery_client = CeleryTaskQueueClient(
32-
celery_app, RedisTaskStore(redis_client_sdk)
32+
celery_app,
33+
celery_settings,
34+
RedisTaskMetadataStore(redis_client_sdk),
3335
)
3436

3537
register_celery_types()

services/storage/src/simcore_service_storage/modules/celery/_common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
1515
"result_expires": celery_settings.CELERY_RESULT_EXPIRES,
1616
"result_extended": True,
1717
"result_serializer": "json",
18+
"task_default_queue": "default",
1819
"task_send_sent_event": True,
1920
"task_track_started": True,
2021
"worker_send_task_events": True,

0 commit comments

Comments
 (0)