Skip to content

Commit e458668

Browse files
authored
♻️Storage Celery integration: diverse enhancements (#7353)
1 parent c1e8fea commit e458668

File tree

14 files changed

+77
-35
lines changed

14 files changed

+77
-35
lines changed

services/docker-compose.devel.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,15 @@ services:
189189
STORAGE_PROFILING : ${STORAGE_PROFILING}
190190
STORAGE_LOGLEVEL: DEBUG
191191

192+
sto-worker:
193+
volumes:
194+
- ./storage:/devel/services/storage
195+
- ../packages:/devel/packages
196+
environment:
197+
<<: *common-environment
198+
STORAGE_PROFILING : ${STORAGE_PROFILING}
199+
STORAGE_LOGLEVEL: DEBUG
200+
192201
agent:
193202
environment:
194203
<<: *common-environment

services/docker-compose.local.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@ services:
126126
- "8080"
127127
- "3003:3000"
128128

129+
sto-worker:
130+
environment:
131+
<<: *common_environment
132+
STORAGE_REMOTE_DEBUGGING_PORT : 3000
133+
ports:
134+
- "8080"
135+
- "3021:3000"
129136
webserver:
130137
environment: &webserver_environment_local
131138
<<: *common_environment

services/storage/docker/boot.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
4747

4848
exec sh -c "
4949
cd services/storage/src/simcore_service_storage && \
50-
python -m debugpy --listen 0.0.0.0:${STORAGE_REMOTE_DEBUGGING_PORT} -m uvicorn main:the_app \
50+
python -m debugpy --listen 0.0.0.0:${STORAGE_REMOTE_DEBUGGING_PORT} -m uvicorn main:app \
5151
--host 0.0.0.0 \
5252
--port ${STORAGE_PORT} \
5353
--reload \

services/storage/src/simcore_service_storage/_meta.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,17 @@
2828
f"v{__version__}"
2929
)
3030

31+
APP_WORKER_STARTED_BANNER_MSG = r"""
32+
33+
____ _ __ __ _
34+
/ ___|| |_ ___ _ __ __ _ __ _ ___ \ \ / /__ _ __| | _____ _ __
35+
\___ \| __/ _ \| '__/ _` |/ _` |/ _ \____\ \ /\ / / _ \| '__| |/ / _ \ '__|
36+
___) | || (_) | | | (_| | (_| | __/_____\ V V / (_) | | | < __/ |
37+
|____/ \__\___/|_| \__,_|\__, |\___| \_/\_/ \___/|_| |_|\_\___|_|
38+
|___/ {}
39+
40+
""".format(
41+
f"v{__version__}"
42+
)
43+
3144
APP_FINISHED_BANNER_MSG = info.get_finished_banner()

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
APP_FINISHED_BANNER_MSG,
2727
APP_NAME,
2828
APP_STARTED_BANNER_MSG,
29+
APP_WORKER_STARTED_BANNER_MSG,
2930
)
3031
from ..api.rest.routes import setup_rest_api_routes
3132
from ..api.rpc.routes import setup_rpc_api_routes
@@ -114,7 +115,10 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
114115
setup_prometheus_instrumentation(app)
115116

116117
async def _on_startup() -> None:
117-
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
118+
if settings.STORAGE_WORKER_MODE:
119+
print(APP_WORKER_STARTED_BANNER_MSG, flush=True) # noqa: T201
120+
else:
121+
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
118122

119123
async def _on_shutdown() -> None:
120124
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201

services/storage/src/simcore_service_storage/main.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
import logging
44

55
from servicelib.logging_utils import config_all_loggers
6+
from simcore_service_storage.core.application import create_app
7+
from simcore_service_storage.core.settings import ApplicationSettings
68
from simcore_service_storage.modules.celery import setup_celery
79

8-
from .core.application import create_app
9-
from .core.settings import ApplicationSettings
10-
1110
_settings = ApplicationSettings.create_from_envs()
1211

1312
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
from asyncio import AbstractEventLoop
33

44
from fastapi import FastAPI
5-
from simcore_service_storage.modules.celery._common import create_app
6-
from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient
75

86
from ...core.settings import get_application_settings
7+
from ._common import create_app
8+
from .client import CeleryTaskQueueClient
99

1010
_logger = logging.getLogger(__name__)
1111

services/storage/src/simcore_service_storage/modules/celery/_common.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from collections.abc import Callable
2-
from functools import wraps
31
import logging
42
import traceback
3+
from collections.abc import Callable
4+
from functools import wraps
55
from typing import Any
66

7-
from celery import Celery, Task # type: ignore[import-untyped]
8-
from celery.exceptions import Ignore # type: ignore[import-untyped]
7+
from celery import Celery, Task # type: ignore[import-untyped]
98
from celery.contrib.abortable import AbortableTask # type: ignore[import-untyped]
9+
from celery.exceptions import Ignore # type: ignore[import-untyped]
1010
from settings_library.celery import CelerySettings
1111
from settings_library.redis import RedisDatabase
1212

@@ -39,7 +39,7 @@ def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any:
3939
except Exception as exc:
4040
exc_type = type(exc).__name__
4141
exc_message = f"{exc}"
42-
exc_traceback = traceback.format_exc().split('\n')
42+
exc_traceback = traceback.format_exc().split("\n")
4343

4444
_logger.exception(
4545
"Task %s failed with exception: %s",
@@ -53,9 +53,10 @@ def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any:
5353
exc_type=exc_type,
5454
exc_msg=exc_message,
5555
).model_dump(mode="json"),
56-
traceback=exc_traceback
56+
traceback=exc_traceback,
5757
)
58-
raise Ignore from exc # ignore doing state updates
58+
raise Ignore from exc # ignore doing state updates
59+
5960
return wrapper
6061

6162

services/storage/src/simcore_service_storage/modules/celery/client.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from typing import Any, Final
44
from uuid import uuid4
55

6-
from celery import Celery # type: ignore[import-untyped]
7-
from celery.contrib.abortable import AbortableAsyncResult # type: ignore[import-untyped]
6+
from celery import Celery # type: ignore[import-untyped]
7+
from celery.contrib.abortable import ( # type: ignore[import-untyped]
8+
AbortableAsyncResult,
9+
)
810
from common_library.async_tools import make_async
911
from models_library.progress_bar import ProgressReport
1012
from pydantic import ValidationError
@@ -47,7 +49,9 @@ def _build_task_id_prefix(task_context: TaskContext) -> str:
4749

4850

4951
def _build_task_id(task_context: TaskContext, task_uuid: TaskUUID) -> TaskID:
50-
return _CELERY_TASK_ID_KEY_SEPARATOR.join([_build_task_id_prefix(task_context), f"{task_uuid}"])
52+
return _CELERY_TASK_ID_KEY_SEPARATOR.join(
53+
[_build_task_id_prefix(task_context), f"{task_uuid}"]
54+
)
5155

5256

5357
class CeleryTaskQueueClient:
@@ -114,13 +118,13 @@ def get_task_status(
114118
)
115119

116120
def _get_completed_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
117-
search_key = (
118-
_CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context)
119-
)
121+
search_key = _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context)
120122
redis = self._celery_app.backend.client
121123
if hasattr(redis, "keys") and (keys := redis.keys(search_key + "*")):
122124
return {
123-
TaskUUID(f"{key.decode(_CELERY_TASK_ID_KEY_ENCODING).removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)}")
125+
TaskUUID(
126+
f"{key.decode(_CELERY_TASK_ID_KEY_ENCODING).removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)}"
127+
)
124128
for key in keys
125129
}
126130
return set()
@@ -129,15 +133,19 @@ def _get_completed_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
129133
def get_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
130134
all_task_ids = self._get_completed_task_uuids(task_context)
131135

132-
search_key = (
133-
_CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context)
134-
)
136+
search_key = _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context)
135137
for task_inspect_status in _CELERY_INSPECT_TASK_STATUSES:
136138
if task_ids := getattr(
137139
self._celery_app.control.inspect(), task_inspect_status
138140
)():
139141
for values in task_ids.values():
140142
for value in values:
141-
all_task_ids.add(TaskUUID(value.removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)))
143+
all_task_ids.add(
144+
TaskUUID(
145+
value.removeprefix(
146+
search_key + _CELERY_TASK_ID_KEY_SEPARATOR
147+
)
148+
)
149+
)
142150

143151
return all_task_ids

services/storage/src/simcore_service_storage/modules/celery/signals.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
from typing import Final
55

66
from asgi_lifespan import LifespanManager
7-
from celery import Celery # type: ignore[import-untyped]
7+
from celery import Celery # type: ignore[import-untyped]
88
from fastapi import FastAPI
99
from servicelib.async_utils import cancel_wait_task
10+
1011
from ...core.application import create_app
1112
from ...core.settings import ApplicationSettings
1213
from ...modules.celery import get_event_loop, set_event_loop

0 commit comments

Comments
 (0)