Skip to content

Commit af41640

Browse files
add storage worker
1 parent b33cdae commit af41640

File tree

7 files changed

+84
-24
lines changed

7 files changed

+84
-24
lines changed

services/docker-compose.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,45 @@ services:
11431143
S3_ENDPOINT: ${S3_ENDPOINT}
11441144
S3_REGION: ${S3_REGION}
11451145
S3_SECRET_KEY: ${S3_SECRET_KEY}
1146+
STORAGE_MODE: NORMAL
1147+
STORAGE_LOGLEVEL: ${STORAGE_LOGLEVEL}
1148+
STORAGE_MONITORING_ENABLED: 1
1149+
STORAGE_PROFILING: ${STORAGE_PROFILING}
1150+
STORAGE_PORT: ${STORAGE_PORT}
1151+
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
1152+
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
1153+
networks:
1154+
- default
1155+
- interactive_services_subnet
1156+
- storage_subnet
1157+
1158+
storage-worker:
1159+
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-latest}
1160+
init: true
1161+
hostname: "sto-{{.Node.Hostname}}-{{.Task.Slot}}"
1162+
environment:
1163+
BF_API_KEY: ${BF_API_KEY}
1164+
BF_API_SECRET: ${BF_API_SECRET}
1165+
DATCORE_ADAPTER_HOST: ${DATCORE_ADAPTER_HOST:-datcore-adapter}
1166+
LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED}
1167+
LOG_FILTER_MAPPING : ${LOG_FILTER_MAPPING}
1168+
POSTGRES_DB: ${POSTGRES_DB}
1169+
POSTGRES_ENDPOINT: ${POSTGRES_ENDPOINT}
1170+
POSTGRES_HOST: ${POSTGRES_HOST}
1171+
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
1172+
POSTGRES_PORT: ${POSTGRES_PORT}
1173+
POSTGRES_USER: ${POSTGRES_USER}
1174+
REDIS_HOST: ${REDIS_HOST}
1175+
REDIS_PORT: ${REDIS_PORT}
1176+
REDIS_SECURE: ${REDIS_SECURE}
1177+
REDIS_USER: ${REDIS_USER}
1178+
REDIS_PASSWORD: ${REDIS_PASSWORD}
1179+
S3_ACCESS_KEY: ${S3_ACCESS_KEY}
1180+
S3_BUCKET_NAME: ${S3_BUCKET_NAME}
1181+
S3_ENDPOINT: ${S3_ENDPOINT}
1182+
S3_REGION: ${S3_REGION}
1183+
S3_SECRET_KEY: ${S3_SECRET_KEY}
1184+
STORAGE_MODE: WORKER
11461185
STORAGE_LOGLEVEL: ${STORAGE_LOGLEVEL}
11471186
STORAGE_MONITORING_ENABLED: 1
11481187
STORAGE_PROFILING: ${STORAGE_PROFILING}

services/storage/docker/boot.sh

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,15 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
5656
--log-level \"${SERVER_LOG_LEVEL}\"
5757
"
5858
else
59-
exec uvicorn simcore_service_storage.main:the_app \
60-
--host 0.0.0.0 \
61-
--port ${STORAGE_PORT} \
62-
--log-level "${SERVER_LOG_LEVEL}"
59+
if [ "${STORAGE_MODE}" = "NORMAL" ]; then
60+
exec uvicorn simcore_service_storage.main:the_app \
61+
--host 0.0.0.0 \
62+
--port ${STORAGE_PORT} \
63+
--log-level "${SERVER_LOG_LEVEL}"
64+
else
65+
exec celery \
66+
-A simcore_service_storage.modules.celery.worker.main:app \
67+
worker
68+
--loglevel="${SERVER_LOG_LEVEL}"
69+
fi
6370
fi
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import logging
2+
3+
from celery import Celery
4+
from settings_library.redis import RedisDatabase
5+
6+
from ...core.settings import ApplicationSettings
7+
8+
_log = logging.getLogger(__name__)
9+
10+
11+
def create_celery_app(settings: ApplicationSettings) -> Celery:
12+
assert settings.STORAGE_REDIS
13+
app = Celery(
14+
broker=settings.STORAGE_REDIS.build_redis_dsn(RedisDatabase.CELERY_TASKS),
15+
backend=settings.STORAGE_REDIS.build_redis_dsn(RedisDatabase.CELERY_TASKS),
16+
)
17+
return app

services/storage/src/simcore_service_storage/modules/celery/core.py

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import logging
2-
from multiprocessing import Process
3-
from typing import Callable, cast
2+
from typing import cast
43

54
from celery import Celery
6-
from celery.apps.worker import Worker
75
from celery.result import AsyncResult
86
from fastapi import FastAPI
97
from settings_library.redis import RedisDatabase
10-
from simcore_service_storage.modules.celery.tasks import archive
118

129
from ...core.settings import get_application_settings
1310

@@ -18,11 +15,8 @@ class CeleryTaskQueue:
1815
def __init__(self, celery_app: Celery):
1916
self._celery_app = celery_app
2017

21-
def create_task(self, task_fn: Callable):
22-
self._celery_app.task()(task_fn)
23-
24-
def send_task(self, name: str, **kwargs) -> AsyncResult:
25-
return self._celery_app.send_task(name, **kwargs)
18+
def send_task(self, name: str, *args, **kwargs) -> AsyncResult:
19+
return self._celery_app.send_task(name, args=args, kwargs=kwargs)
2620

2721
def cancel_task(self, task_id: str):
2822
self._celery_app.control.revoke(task_id)
@@ -34,7 +28,6 @@ async def on_startup() -> None:
3428
settings = get_application_settings(app)
3529
assert settings.STORAGE_REDIS
3630

37-
assert settings.STORAGE_REDIS
3831
redis_dsn = settings.STORAGE_REDIS.build_redis_dsn(
3932
RedisDatabase.CELERY_TASKS,
4033
)
@@ -45,18 +38,9 @@ async def on_startup() -> None:
4538
)
4639

4740
task_queue = CeleryTaskQueue(celery_app)
48-
task_queue.create_task(archive)
4941

5042
app.state.task_queue = task_queue
5143

52-
# FIXME: Experiment: to start worker in a separate process
53-
def worker_process():
54-
worker = Worker(app=app.state.celery_app)
55-
worker.start()
56-
57-
worker_proc = Process(target=worker_process)
58-
worker_proc.start()
59-
6044
async def on_shutdown() -> None:
6145
_log.warning("Implementing shutdown of celery app")
6246

services/storage/src/simcore_service_storage/modules/celery/tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
_log = logging.getLogger(__name__)
44

55

6-
def archive(files: list[str]) -> None:
6+
def archive(files: list[str]) -> str:
77
_log.info("Archiving: %s", ", ".join(files))
8+
return "".join(files)

services/storage/src/simcore_service_storage/modules/celery/worker/__init__.py

Whitespace-only changes.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from ....core.settings import ApplicationSettings
2+
from ...celery.tasks import archive
3+
from ..configurator import create_celery_app
4+
5+
settings = ApplicationSettings.create_from_envs()
6+
7+
app = create_celery_app(settings)
8+
9+
app.task(name="archive")(archive)
10+
11+
12+
__all__ = ["app"]

0 commit comments

Comments
 (0)