Skip to content

Commit 6890444

Browse files
continue working
1 parent af41640 commit 6890444

File tree

12 files changed

+72
-71
lines changed

12 files changed

+72
-71
lines changed

services/storage/docker/boot.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
5757
"
5858
else
5959
if [ "${STORAGE_MODE}" = "NORMAL" ]; then
60-
exec uvicorn simcore_service_storage.main:the_app \
60+
exec uvicorn simcore_service_storage.main:app \
6161
--host 0.0.0.0 \
6262
--port ${STORAGE_PORT} \
6363
--log-level "${SERVER_LOG_LEVEL}"
6464
else
6565
exec celery \
66-
-A simcore_service_storage.modules.celery.worker.main:app \
66+
-A simcore_service_storage.main:app \
6767
worker
6868
--loglevel="${SERVER_LOG_LEVEL}"
6969
fi

services/storage/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
aioboto3 # s3 storage
1818
aiofiles # i/o
19+
asgi_lifespan
1920
asyncpg # database
2021
celery[redis]
2122
httpx

services/storage/requirements/_base.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ arrow==1.3.0
8787
# -r requirements/../../../packages/models-library/requirements/_base.in
8888
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
8989
# -r requirements/../../../packages/service-library/requirements/_base.in
90+
asgi-lifespan==2.1.0
91+
# via -r requirements/_base.in
9092
asgiref==3.8.1
9193
# via opentelemetry-instrumentation-asgi
9294
asyncpg==0.30.0
@@ -756,7 +758,9 @@ shellingham==1.5.4
756758
six==1.17.0
757759
# via python-dateutil
758760
sniffio==1.3.1
759-
# via anyio
761+
# via
762+
# anyio
763+
# asgi-lifespan
760764
sqlalchemy==1.4.54
761765
# via
762766
# -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt

services/storage/requirements/_test.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
--constraint _base.txt
77

88

9-
asgi_lifespan
109
asyncpg-stubs
1110
coverage
1211
docker

services/storage/requirements/_test.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ anyio==4.8.0
2121
# via
2222
# -c requirements/_base.txt
2323
# httpx
24-
asgi-lifespan==2.1.0
25-
# via -r requirements/_test.in
2624
asyncpg==0.30.0
2725
# via
2826
# -c requirements/_base.txt
@@ -337,7 +335,6 @@ sniffio==1.3.1
337335
# via
338336
# -c requirements/_base.txt
339337
# anyio
340-
# asgi-lifespan
341338
sortedcontainers==2.4.0
342339
# via fakeredis
343340
sqlalchemy==1.4.54

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from ..dsm import setup_dsm
3030
from ..dsm_cleaner import setup_dsm_cleaner
3131
from ..exceptions.handlers import set_exception_handlers
32-
from ..modules.celery.core import setup_celery
3332
from ..modules.db import setup_db
3433
from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads
3534
from ..modules.redis import setup as setup_redis
@@ -88,8 +87,6 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
8887
if settings.STORAGE_CLEANER_INTERVAL_S:
8988
setup_dsm_cleaner(app)
9089

91-
setup_celery(app)
92-
9390
if settings.STORAGE_PROFILING:
9491
app.add_middleware(ProfilerMiddleware)
9592

services/storage/src/simcore_service_storage/core/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
101101
description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of _logger message patterns that should be filtered out.",
102102
)
103103

104+
STORAGE_MODE: str
105+
104106
@field_validator("LOG_LEVEL", mode="before")
105107
@classmethod
106108
def _validate_loglevel(cls, value: str) -> str:
Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,59 @@
11
"""Main application to be deployed in for example uvicorn."""
22

3+
import asyncio
34
import logging
45

5-
from fastapi import FastAPI
6+
from asgi_lifespan import LifespanManager
7+
from celery.signals import worker_init, worker_shutdown
68
from servicelib.logging_utils import config_all_loggers
79
from simcore_service_storage.core.application import create_app
810
from simcore_service_storage.core.settings import ApplicationSettings
11+
from simcore_service_storage.modules.celery.application import create_celery_app
12+
from simcore_service_storage.modules.celery.tasks import archive
913

10-
_the_settings = ApplicationSettings.create_from_envs()
14+
_settings = ApplicationSettings.create_from_envs()
1115

1216
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148
13-
logging.basicConfig(level=_the_settings.log_level) # NOSONAR
14-
logging.root.setLevel(_the_settings.log_level)
17+
logging.basicConfig(level=_settings.log_level) # NOSONAR
18+
logging.root.setLevel(_settings.log_level)
1519
config_all_loggers(
16-
log_format_local_dev_enabled=_the_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED,
17-
logger_filter_mapping=_the_settings.STORAGE_LOG_FILTER_MAPPING,
18-
tracing_settings=_the_settings.STORAGE_TRACING,
20+
log_format_local_dev_enabled=_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED,
21+
logger_filter_mapping=_settings.STORAGE_LOG_FILTER_MAPPING,
22+
tracing_settings=_settings.STORAGE_TRACING,
1923
)
2024

25+
_logger = logging.getLogger(__name__)
2126

22-
# SINGLETON FastAPI app
23-
the_app: FastAPI = create_app(_the_settings)
27+
fastapi_app = create_app(_settings)
28+
29+
celery_app = create_celery_app(_settings)
30+
celery_app.task(name="archive")(archive)
31+
32+
33+
@worker_init.connect
34+
def on_worker_init(**_kwargs):
35+
loop = asyncio.new_event_loop()
36+
# asyncio.set_event_loop(loop)
37+
38+
async def lifespan():
39+
async with LifespanManager(fastapi_app):
40+
_logger.error("FastAPI lifespan started")
41+
await asyncio.Event().wait()
42+
43+
fastapi_app.state.lifespan_task = loop.create_task(lifespan())
44+
_logger.error("Worker init: FastAPI lifespan task started")
45+
46+
47+
@worker_shutdown.connect
48+
def on_worker_shutdown(**_kwargs):
49+
if fastapi_app.state.lifespan_task:
50+
fastapi_app.state.lifespan_task.cancel()
51+
_logger.info("FastAPI lifespan stopped.")
52+
53+
54+
if _settings.STORAGE_MODE == "WORKER":
55+
celery_app.conf.fastapi_app = fastapi_app
56+
app = celery_app
57+
else:
58+
fastapi_app.state.celery = celery_app
59+
app = fastapi_app

services/storage/src/simcore_service_storage/modules/celery/core.py renamed to services/storage/src/simcore_service_storage/modules/celery/application.py

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from fastapi import FastAPI
77
from settings_library.redis import RedisDatabase
88

9-
from ...core.settings import get_application_settings
9+
from ...core.settings import ApplicationSettings
1010

1111
_log = logging.getLogger(__name__)
1212

@@ -23,29 +23,19 @@ def cancel_task(self, task_id: str):
2323

2424

2525
# TODO: move and use new FastAPI lifespan
26-
def setup_celery(app: FastAPI) -> None:
27-
async def on_startup() -> None:
28-
settings = get_application_settings(app)
29-
assert settings.STORAGE_REDIS
26+
def create_celery_app(settings: ApplicationSettings) -> Celery:
27+
assert settings.STORAGE_REDIS
3028

31-
redis_dsn = settings.STORAGE_REDIS.build_redis_dsn(
32-
RedisDatabase.CELERY_TASKS,
33-
)
29+
redis_dsn = settings.STORAGE_REDIS.build_redis_dsn(
30+
RedisDatabase.CELERY_TASKS,
31+
)
3432

35-
celery_app = Celery(
36-
broker=redis_dsn,
37-
backend=redis_dsn,
38-
)
33+
celery_app = Celery(
34+
broker=redis_dsn,
35+
backend=redis_dsn,
36+
)
3937

40-
task_queue = CeleryTaskQueue(celery_app)
41-
42-
app.state.task_queue = task_queue
43-
44-
async def on_shutdown() -> None:
45-
_log.warning("Implementing shutdown of celery app")
46-
47-
app.add_event_handler("startup", on_startup)
48-
app.add_event_handler("shutdown", on_shutdown)
38+
return celery_app
4939

5040

5141
def get_celery_task_queue(app: FastAPI) -> CeleryTaskQueue:

services/storage/src/simcore_service_storage/modules/celery/configurator.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)