Skip to content
Merged
Show file tree
Hide file tree
Changes from 139 commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
119b9da
add distribute task queue
giancarloromeo Feb 12, 2025
235c8ed
add redis database
giancarloromeo Feb 12, 2025
f3827c7
add settings
giancarloromeo Feb 12, 2025
1a2cc8d
update reqs
giancarloromeo Feb 12, 2025
ce01b63
add celery task
giancarloromeo Feb 12, 2025
866ca9b
add celery task queue class
giancarloromeo Feb 13, 2025
9e90c73
rename
giancarloromeo Feb 13, 2025
b33cdae
make testable
giancarloromeo Feb 13, 2025
af41640
add storage worker
giancarloromeo Feb 13, 2025
6890444
continue working
giancarloromeo Feb 14, 2025
d6b85be
continue
giancarloromeo Feb 17, 2025
8a8cdd5
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Feb 17, 2025
00751db
use rabbit
giancarloromeo Feb 17, 2025
bd27fa5
continue
giancarloromeo Feb 17, 2025
2794e5b
continue
giancarloromeo Feb 17, 2025
f2e43b4
add unit tests
giancarloromeo Feb 18, 2025
dbf0e9c
continue
giancarloromeo Feb 18, 2025
2bc8ed4
base working tests
giancarloromeo Feb 19, 2025
6f84e1d
add progress
giancarloromeo Feb 20, 2025
b8a010c
continue fixing
giancarloromeo Feb 21, 2025
fa3e919
continue fixing
giancarloromeo Feb 21, 2025
6450c2e
fix docker
giancarloromeo Feb 21, 2025
bc2a773
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Feb 21, 2025
4220788
working
giancarloromeo Feb 24, 2025
b40a24c
continue fix
giancarloromeo Feb 24, 2025
341f7fa
fix tests
giancarloromeo Feb 24, 2025
80753e7
update boot.sh
giancarloromeo Feb 24, 2025
578573c
fix files endpoint
giancarloromeo Feb 24, 2025
499d290
rename
giancarloromeo Feb 24, 2025
ccd17b5
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Feb 24, 2025
b305a53
add abortable task
giancarloromeo Feb 24, 2025
a5e1128
typechecks
giancarloromeo Feb 24, 2025
920a912
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Feb 25, 2025
6ab33bd
add healthcheck
giancarloromeo Feb 25, 2025
5b86b64
register abortable tasks by default
giancarloromeo Feb 25, 2025
c27f509
continue
giancarloromeo Feb 25, 2025
e386ff9
rename
giancarloromeo Feb 25, 2025
2d4951a
add settings
giancarloromeo Feb 25, 2025
6d1e32b
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Feb 25, 2025
04ffc07
remove debug
giancarloromeo Feb 25, 2025
54b9a11
remuve unused pytest plugin
giancarloromeo Feb 25, 2025
1caeca5
fix healthcheck
giancarloromeo Feb 25, 2025
0d84c14
typecheck
giancarloromeo Feb 25, 2025
f297a3b
fix tests
giancarloromeo Feb 25, 2025
d760277
fix tests
giancarloromeo Feb 25, 2025
c768f0e
fix import
giancarloromeo Feb 25, 2025
9f52c9d
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Feb 25, 2025
8a81d22
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo Feb 25, 2025
3c8eb4c
add utils
giancarloromeo Feb 25, 2025
780a947
update utils
giancarloromeo Feb 25, 2025
830d562
fix tests
giancarloromeo Feb 25, 2025
b22abea
update example
giancarloromeo Feb 26, 2025
5072b3b
add async interface
giancarloromeo Feb 26, 2025
e09a66b
add tests
giancarloromeo Feb 26, 2025
e94f736
improve typehints
giancarloromeo Feb 26, 2025
079fd1b
improve typehint
giancarloromeo Feb 26, 2025
975b4d3
remove unused
giancarloromeo Feb 26, 2025
2efae6b
update
giancarloromeo Feb 26, 2025
754c065
fix util
giancarloromeo Feb 26, 2025
9c9adf0
refactor
giancarloromeo Feb 26, 2025
3810eda
update docker
giancarloromeo Feb 26, 2025
e7f3fa2
typecheck
giancarloromeo Feb 26, 2025
dcd9da3
fix typecheck
giancarloromeo Feb 26, 2025
abec223
change list to set
giancarloromeo Feb 27, 2025
0ce1aa4
rename
giancarloromeo Feb 27, 2025
651b327
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Feb 27, 2025
ce77046
add task context
giancarloromeo Feb 27, 2025
2f96e96
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Feb 27, 2025
f28cbc9
add rabbit
giancarloromeo Feb 27, 2025
ad2c55c
add fixture
giancarloromeo Feb 27, 2025
3da4515
update interface
giancarloromeo Feb 27, 2025
87a457f
typecheck
giancarloromeo Feb 27, 2025
d0a97fb
adapt code
giancarloromeo Feb 27, 2025
6a0fc4a
rename
giancarloromeo Feb 27, 2025
5bbf86c
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Mar 3, 2025
6f070ff
add stop
giancarloromeo Mar 3, 2025
5b44bb9
fix import
giancarloromeo Mar 3, 2025
8bb9055
move example
giancarloromeo Mar 3, 2025
ea709b3
change default
giancarloromeo Mar 3, 2025
f9ecc62
update description
giancarloromeo Mar 3, 2025
2b5f143
update prefix
giancarloromeo Mar 3, 2025
872baf7
add types
giancarloromeo Mar 3, 2025
320095d
refactor
giancarloromeo Mar 3, 2025
319570e
use suppress
giancarloromeo Mar 3, 2025
432f940
use longnames
giancarloromeo Mar 3, 2025
966baf6
change var names
giancarloromeo Mar 3, 2025
4a2cada
fix settings
giancarloromeo Mar 3, 2025
ce4c2fa
log_context
giancarloromeo Mar 3, 2025
7ce7d6f
remove example
giancarloromeo Mar 3, 2025
d1714fd
update docker compose
giancarloromeo Mar 3, 2025
67ec039
remove typehint
giancarloromeo Mar 3, 2025
3896794
add log
giancarloromeo Mar 3, 2025
4aeb1c9
add log
giancarloromeo Mar 3, 2025
558cd5d
remove comment
giancarloromeo Mar 3, 2025
4f3e722
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 3, 2025
1107435
add task params
giancarloromeo Mar 3, 2025
dd7f2fa
track started tasks
giancarloromeo Mar 3, 2025
4b4d5a2
add enums
giancarloromeo Mar 3, 2025
50550b4
add redis on startup
giancarloromeo Mar 3, 2025
aa08b2a
add enum
giancarloromeo Mar 3, 2025
363b15d
fix worker startup
giancarloromeo Mar 3, 2025
0fdd564
fix state
giancarloromeo Mar 3, 2025
46ac5df
continue
giancarloromeo Mar 4, 2025
5d7115b
add validation
giancarloromeo Mar 4, 2025
52d9219
remove started and stopped
giancarloromeo Mar 4, 2025
38b4be7
progress not nullable
giancarloromeo Mar 4, 2025
c3b2f4d
removed for now
giancarloromeo Mar 4, 2025
319f269
typecheck
giancarloromeo Mar 4, 2025
096c3b7
fix port
giancarloromeo Mar 4, 2025
02d401a
fix tests
giancarloromeo Mar 4, 2025
165b198
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 4, 2025
0b624ec
remove register
giancarloromeo Mar 4, 2025
e77d2dd
add concurrency
giancarloromeo Mar 4, 2025
26160a8
add fixture
giancarloromeo Mar 4, 2025
3bb2c6b
fix tests
giancarloromeo Mar 4, 2025
e6253da
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 4, 2025
964a22e
fix import
giancarloromeo Mar 4, 2025
994ee8f
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo Mar 4, 2025
c620103
progress
giancarloromeo Mar 5, 2025
95898e6
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Mar 5, 2025
c75ccae
fix done property
giancarloromeo Mar 5, 2025
317ee66
update keys
giancarloromeo Mar 5, 2025
1eab85b
add retry
giancarloromeo Mar 5, 2025
99cb6b7
Merge remote-tracking branch 'upstream/master' into add-distributed-t…
giancarloromeo Mar 5, 2025
3a7f7e9
improve error handling
giancarloromeo Mar 6, 2025
8e5b1f9
add task
giancarloromeo Mar 6, 2025
c2be8f0
update
giancarloromeo Mar 6, 2025
87fb925
fix get
giancarloromeo Mar 6, 2025
9a96dd8
fix get_uuids
giancarloromeo Mar 6, 2025
8df0a5c
fix
giancarloromeo Mar 6, 2025
c56b8b7
fix import
giancarloromeo Mar 6, 2025
0ec94b6
update
giancarloromeo Mar 6, 2025
403aaa1
use taskstate
giancarloromeo Mar 6, 2025
65a07e0
remove unused
giancarloromeo Mar 6, 2025
327c8f1
type hinting
giancarloromeo Mar 7, 2025
d077472
remove taskresult
giancarloromeo Mar 7, 2025
59b4a08
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 7, 2025
6c3779f
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo Mar 7, 2025
66e51d9
add celery env
giancarloromeo Mar 7, 2025
3c86023
get hostname
giancarloromeo Mar 7, 2025
d8a848e
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 7, 2025
8ea33e6
fix imports
giancarloromeo Mar 10, 2025
c76cbd3
fix settings
giancarloromeo Mar 10, 2025
ed8f92c
fix
giancarloromeo Mar 10, 2025
7f09d30
add comment
giancarloromeo Mar 10, 2025
5b1f97a
fix docker compose
giancarloromeo Mar 10, 2025
9e6284b
use settings
giancarloromeo Mar 10, 2025
9d71230
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 10, 2025
99ba805
fix typecheck
giancarloromeo Mar 10, 2025
1630c8a
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 10, 2025
8f95fd2
fix envs
giancarloromeo Mar 10, 2025
37e8704
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo Mar 10, 2025
ec4bb00
fix mypy
giancarloromeo Mar 10, 2025
28a440f
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 10, 2025
76e7fc3
fix env
giancarloromeo Mar 10, 2025
3cf012e
revert mypy setting
giancarloromeo Mar 10, 2025
cf96a04
typecheck
giancarloromeo Mar 10, 2025
a61e5eb
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 10, 2025
6d5640e
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 10, 2025
3133758
exclude service
giancarloromeo Mar 11, 2025
5050df1
Merge branch 'add-distributed-task-queue' of github.com:giancarlorome…
giancarloromeo Mar 11, 2025
a71cc9a
Merge branch 'master' into add-distributed-task-queue
giancarloromeo Mar 11, 2025
f6232ff
exclude sto-worker
giancarloromeo Mar 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1},
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'
CATALOG_TRACING=null

CELERY_RESULT_EXPIRES=P7D

CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH='{"type":"tls","tls_ca_file":"/home/scu/.dask/dask-crt.pem","tls_client_cert":"/home/scu/.dask/dask-crt.pem","tls_client_key":"/home/scu/.dask/dask-key.pem"}'
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest
CLUSTERS_KEEPER_DASK_NTHREADS=0
Expand Down
6 changes: 3 additions & 3 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ async def export_data(data_export: DataExportPost, location_id: LocationID):
response_model=Envelope[StorageAsyncJobStatus],
name="get_async_job_status",
)
async def get_async_job_status(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
async def get_async_job_status(job_id: UUID):
"""Get async job status"""


@router.post(
"/storage/async-jobs/{job_id}:abort",
name="abort_async_job",
)
async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
async def abort_async_job(job_id: UUID):
"""aborts execution of an async job"""


Expand All @@ -227,7 +227,7 @@ async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUI
response_model=Envelope[StorageAsyncJobResult],
name="get_async_job_result",
)
async def get_async_job_result(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
async def get_async_job_result(job_id: UUID):
"""Get the result of the async job"""


Expand Down
24 changes: 24 additions & 0 deletions packages/common-library/src/common_library/async_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
import functools
from collections.abc import Awaitable, Callable
from concurrent.futures import Executor
from typing import ParamSpec, TypeVar

R = TypeVar("R")
P = ParamSpec("P")


def make_async(
executor: Executor | None = None,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)

return wrapper

return decorator
45 changes: 45 additions & 0 deletions packages/common-library/tests/test_async_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor

import pytest
from common_library.async_tools import make_async


@make_async()
def sync_function(x: int, y: int) -> int:
return x + y


@make_async()
def sync_function_with_exception() -> None:
raise ValueError("This is an error!")


@pytest.mark.asyncio
async def test_make_async_returns_coroutine():
result = sync_function(2, 3)
assert asyncio.iscoroutine(result), "Function should return a coroutine"


@pytest.mark.asyncio
async def test_make_async_execution():
result = await sync_function(2, 3)
assert result == 5, "Function should return 5"


@pytest.mark.asyncio
async def test_make_async_exception():
with pytest.raises(ValueError, match="This is an error!"):
await sync_function_with_exception()


@pytest.mark.asyncio
async def test_make_async_with_executor():
executor = ThreadPoolExecutor()

@make_async(executor)
def heavy_computation(x: int) -> int:
return x * x

result = await heavy_computation(4)
assert result == 16, "Function should return 16"
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from datetime import datetime
from typing import Any, TypeAlias
from uuid import UUID

from models_library.users import UserID
from pydantic import BaseModel, model_validator
from typing_extensions import Self
from pydantic import BaseModel

from ..progress_bar import ProgressReport

Expand All @@ -15,18 +13,6 @@ class AsyncJobStatus(BaseModel):
job_id: AsyncJobId
progress: ProgressReport
done: bool
started: datetime
stopped: datetime | None

@model_validator(mode="after")
def _check_consistency(self) -> Self:
is_done = self.done
is_stopped = self.stopped is not None

if is_done != is_stopped:
msg = f"Inconsistent data: {self.done=}, {self.stopped=}"
raise ValueError(msg)
return self


class AsyncJobResult(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
from pathlib import Path
from typing import Annotated, Any

Expand Down Expand Up @@ -62,8 +61,6 @@ class StorageAsyncJobStatus(OutputSchema):
job_id: AsyncJobId
progress: ProgressReport
done: bool
started: datetime
stopped: datetime | None

@classmethod
def from_rpc_schema(
Expand All @@ -73,8 +70,6 @@ def from_rpc_schema(
job_id=async_job_rpc_status.job_id,
progress=async_job_rpc_status.progress,
done=async_job_rpc_status.done,
started=async_job_rpc_status.started,
stopped=async_job_rpc_status.stopped,
)


Expand Down
52 changes: 52 additions & 0 deletions packages/settings-library/src/settings_library/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta
from typing import Annotated

from pydantic import Field
from pydantic_settings import SettingsConfigDict
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings

from .base import BaseCustomSettings


class CelerySettings(BaseCustomSettings):
CELERY_RABBIT_BROKER: Annotated[
RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True})
]
CELERY_REDIS_RESULT_BACKEND: Annotated[
RedisSettings, Field(json_schema_extra={"auto_default_from_env": True})
]
CELERY_RESULT_EXPIRES: Annotated[
timedelta,
Field(
description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
),
] = timedelta(days=7)
CELERY_RESULT_PERSISTENT: Annotated[
bool,
Field(
description="If set to True, result messages will be persistent (after a broker restart)."
),
] = True

model_config = SettingsConfigDict(
json_schema_extra={
"examples": [
{
"CELERY_RABBIT_BROKER": {
"RABBIT_USER": "guest",
"RABBIT_SECURE": False,
"RABBIT_PASSWORD": "guest",
"RABBIT_HOST": "localhost",
"RABBIT_PORT": 5672,
},
"CELERY_REDIS_RESULT_BACKEND": {
"REDIS_HOST": "localhost",
"REDIS_PORT": 6379,
},
"CELERY_RESULT_EXPIRES": timedelta(days=1), # type: ignore[dict-item]
"CELERY_RESULT_PERSISTENT": True,
}
],
}
)
4 changes: 2 additions & 2 deletions packages/settings-library/src/settings_library/redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from enum import IntEnum

from pydantic import TypeAdapter
from pydantic.networks import RedisDsn
from pydantic.types import SecretStr

Expand All @@ -18,13 +17,14 @@ class RedisDatabase(IntEnum):
DISTRIBUTED_IDENTIFIERS = 6
DEFERRED_TASKS = 7
DYNAMIC_SERVICES = 8
CELERY_TASKS = 9


class RedisSettings(BaseCustomSettings):
# host
REDIS_SECURE: bool = False
REDIS_HOST: str = "redis"
REDIS_PORT: PortInt = TypeAdapter(PortInt).validate_python(6789)
REDIS_PORT: PortInt = 6789

# auth
REDIS_USER: str | None = None
Expand Down
1 change: 1 addition & 0 deletions services/docker-compose-ops.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ services:
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD},
deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD},
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD}
celery_tasks:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD}
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml
ports:
- "18081:8081"
Expand Down
17 changes: 14 additions & 3 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ services:
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-latest}
init: true
hostname: "sto-{{.Node.Hostname}}-{{.Task.Slot}}"
environment:
environment: &storage_environment
BF_API_KEY: ${BF_API_KEY}
BF_API_SECRET: ${BF_API_SECRET}
DATCORE_ADAPTER_HOST: ${DATCORE_ADAPTER_HOST:-datcore-adapter}
Expand All @@ -1179,17 +1179,28 @@ services:
S3_ENDPOINT: ${S3_ENDPOINT}
S3_REGION: ${S3_REGION}
S3_SECRET_KEY: ${S3_SECRET_KEY}
STORAGE_WORKER_MODE: "false"
STORAGE_LOGLEVEL: ${STORAGE_LOGLEVEL}
STORAGE_MONITORING_ENABLED: 1
STORAGE_PROFILING: ${STORAGE_PROFILING}
STORAGE_PORT: ${STORAGE_PORT}
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
networks:
networks: &storage_networks
- default
- interactive_services_subnet
- storage_subnet

sto-worker:
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
init: true
hostname: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}"
environment:
<<: *storage_environment
STORAGE_WORKER_MODE: "true"
CELERY_CONCURRENCY: 1
networks: *storage_networks

rabbit:
image: itisfoundation/rabbitmq:3.11.2-management
init: true
Expand Down Expand Up @@ -1274,7 +1285,7 @@ services:
"--loglevel",
"verbose",
"--databases",
"9",
"10",
"--appendonly",
"yes",
"--requirepass",
Expand Down
2 changes: 1 addition & 1 deletion services/storage/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ openapi.json: .env-ignore
@set -o allexport; \
source $<; \
set +o allexport; \
python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(the_app.openapi(), indent=2) )" > $@
python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(app.openapi(), indent=2) )" > $@

# validates OAS file: $@
$(call validate_openapi_specs,$@)
17 changes: 13 additions & 4 deletions services/storage/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,17 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then
--log-level \"${SERVER_LOG_LEVEL}\"
"
else
exec uvicorn simcore_service_storage.main:the_app \
--host 0.0.0.0 \
--port ${STORAGE_PORT} \
--log-level "${SERVER_LOG_LEVEL}"
if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
exec celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--loglevel="${SERVER_LOG_LEVEL}" \
--hostname="${HOSTNAME}" \
--concurrency="${CELERY_CONCURRENCY}"
else
exec uvicorn simcore_service_storage.main:app \
--host 0.0.0.0 \
--port ${STORAGE_PORT} \
--log-level "${SERVER_LOG_LEVEL}"
fi
fi
34 changes: 33 additions & 1 deletion services/storage/docker/healthcheck.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,50 @@


import os
import subprocess
import sys
from urllib.request import urlopen

from simcore_service_storage.main import ApplicationSettings

SUCCESS, UNHEALTHY = 0, 1

# Disabled if boots with debugger
ok = os.environ.get("SC_BOOT_MODE", "").lower() == "debug"
ok = os.getenv("SC_BOOT_MODE", "").lower() == "debug"

# Queries host
# pylint: disable=consider-using-with


def _is_celery_worker_healthy():
app_settings = ApplicationSettings.create_from_envs()

assert app_settings.STORAGE_CELERY
broker_url = app_settings.STORAGE_CELERY.CELERY_RABBIT_BROKER.dsn

try:
result = subprocess.run(
[
"celery",
"--broker",
broker_url,
"inspect",
"ping",
"--destination",
"celery@" + os.getenv("HOSTNAME", ""),
],
capture_output=True,
text=True,
check=True,
)
return "pong" in result.stdout
except subprocess.CalledProcessError:
return False


ok = (
ok
or (bool(os.getenv("STORAGE_WORKER_MODE", "") and _is_celery_worker_healthy()))
or urlopen(
"{host}{baseurl}".format(
host=sys.argv[1], baseurl=os.environ.get("SIMCORE_NODE_BASEPATH", "")
Expand Down
2 changes: 2 additions & 0 deletions services/storage/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

aioboto3 # s3 storage
aiofiles # i/o
asgi_lifespan
asyncpg # database
celery[redis]
httpx
opentelemetry-instrumentation-botocore
packaging
Expand Down
Loading
Loading