Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
fefdae2
prefork experiments
giancarloromeo Oct 10, 2025
3ebb954
fix shutdown
giancarloromeo Oct 10, 2025
9fc1e3d
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo Oct 13, 2025
97e50d4
remove logs
giancarloromeo Oct 13, 2025
29083b9
fix typecheck
giancarloromeo Oct 13, 2025
b3636fb
typecheck
giancarloromeo Oct 13, 2025
27f4925
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 13, 2025
76e886f
add settings
giancarloromeo Oct 13, 2025
fe05e66
unify worker setup
giancarloromeo Oct 13, 2025
e9371cb
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo Oct 13, 2025
33d23bc
remove log
giancarloromeo Oct 13, 2025
4ab42d8
remove unused
giancarloromeo Oct 13, 2025
c8e957b
fix api-server
giancarloromeo Oct 13, 2025
1a8d135
update docker-compose
giancarloromeo Oct 13, 2025
959fa79
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 13, 2025
d624966
fix test
giancarloromeo Oct 13, 2025
c97c75e
fix test
giancarloromeo Oct 13, 2025
d660041
fix test
giancarloromeo Oct 13, 2025
c4d33c4
fix test
giancarloromeo Oct 13, 2025
012ea99
fix test
giancarloromeo Oct 13, 2025
fb1ac05
fix
giancarloromeo Oct 13, 2025
e6d01f3
fix tests
giancarloromeo Oct 13, 2025
08e6c1a
fix tests
giancarloromeo Oct 13, 2025
4f839b8
fix tests
giancarloromeo Oct 13, 2025
1a86d71
typecheck
giancarloromeo Oct 13, 2025
ec1c828
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 13, 2025
fb4a847
fix tests
giancarloromeo Oct 13, 2025
54ee4ca
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo Oct 13, 2025
d8c148b
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 14, 2025
da302cb
update description
giancarloromeo Oct 14, 2025
d1e342e
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 14, 2025
d5aa656
execute only once init in prefork
giancarloromeo Oct 14, 2025
88ad319
refactoring
giancarloromeo Oct 14, 2025
ca12f4b
fix import
giancarloromeo Oct 14, 2025
377d334
typecheck
giancarloromeo Oct 14, 2025
3e218cd
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 14, 2025
d766256
use new signal handlers
giancarloromeo Oct 15, 2025
ade0d43
update name
giancarloromeo Oct 15, 2025
41ad946
fix signals
giancarloromeo Oct 15, 2025
1d8bb47
fix signals
giancarloromeo Oct 15, 2025
aaf0f12
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 15, 2025
db5d2d8
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 15, 2025
ce67003
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo Oct 15, 2025
f20badd
add rabbit
giancarloromeo Oct 15, 2025
056954c
fix service selection
giancarloromeo Oct 15, 2025
c95d706
fix test
giancarloromeo Oct 15, 2025
4ea087e
set worker mode
giancarloromeo Oct 15, 2025
935478f
set worker mode
giancarloromeo Oct 15, 2025
83fb4fe
fix
giancarloromeo Oct 15, 2025
80dfc30
add redis
giancarloromeo Oct 15, 2025
927fa7b
fix
giancarloromeo Oct 15, 2025
901df8c
fix pool name
giancarloromeo Oct 15, 2025
2409bd3
use wprker app settings
giancarloromeo Oct 15, 2025
47fb61c
try
giancarloromeo Oct 15, 2025
91a3f82
set pool
giancarloromeo Oct 15, 2025
b31d2f0
fix
giancarloromeo Oct 16, 2025
6908e7e
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 16, 2025
d0c0085
mock celery app
giancarloromeo Oct 16, 2025
bb25608
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo Oct 16, 2025
c718176
fix
giancarloromeo Oct 16, 2025
8f93ed4
fix mocks
giancarloromeo Oct 16, 2025
02ae9da
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 16, 2025
daf3447
add startup timeout
giancarloromeo Oct 16, 2025
4d04b63
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 16, 2025
5518f24
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo Oct 16, 2025
6713d4d
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo Oct 16, 2025
20b56de
verbose storage
giancarloromeo Oct 16, 2025
bea45b4
restore
giancarloromeo Oct 16, 2025
5970ecb
force shutdown
giancarloromeo Oct 16, 2025
cab7a6a
update
giancarloromeo Oct 16, 2025
18ce2c7
try
giancarloromeo Oct 16, 2025
2ceee86
revert
giancarloromeo Oct 16, 2025
f8a575b
fix test
giancarloromeo Oct 16, 2025
5dcda9c
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo Oct 17, 2025
ce82a27
fix storage
giancarloromeo Oct 17, 2025
945e0e5
fix api server
giancarloromeo Oct 17, 2025
c0cdaa2
fix api server tests
giancarloromeo Oct 17, 2025
436e6dc
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions packages/celery-library/src/celery_library/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,19 @@
import logging
import threading

from celery import Celery # type: ignore[import-untyped]
from celery.worker.worker import WorkController # type: ignore[import-untyped]
from servicelib.celery.app_server import BaseAppServer
from servicelib.logging_utils import log_context

from .utils import get_app_server, set_app_server

_logger = logging.getLogger(__name__)


def on_worker_init(
sender: WorkController,
app_server: BaseAppServer,
**_kwargs,
) -> None:
def on_worker_init(app_server: BaseAppServer, **_kwargs) -> None:
startup_complete_event = threading.Event()

def _init(startup_complete_event: threading.Event) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

assert sender.app # nosec
assert isinstance(sender.app, Celery) # nosec

set_app_server(sender.app, app_server)

app_server.event_loop = loop

loop.run_until_complete(app_server.run_until_shutdown(startup_complete_event))
Expand All @@ -44,9 +31,6 @@ def _init(startup_complete_event: threading.Event) -> None:
startup_complete_event.wait()


def on_worker_shutdown(sender, **_kwargs) -> None:
def on_worker_shutdown(app_server: BaseAppServer, **_kwargs) -> None:
with log_context(_logger, logging.INFO, "Worker shutdown"):
assert isinstance(sender.app, Celery)
app_server = get_app_server(sender.app)

app_server.shutdown_event.set()
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ async def run_until_shutdown(
startup_timeout=None, # waits for full app initialization (DB migrations, etc.)
shutdown_timeout=_SHUTDOWN_TIMEOUT,
):
_logger.info("fastapi app initialized")
_logger.info("FastAPI initialized: %s", self.app)
startup_completed_event.set()
await self.shutdown_event.wait() # NOTE: wait here until shutdown is requested
9 changes: 8 additions & 1 deletion packages/settings-library/src/settings_library/celery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Annotated
from typing import Annotated, Literal

from pydantic import Field
from pydantic_settings import SettingsConfigDict
Expand Down Expand Up @@ -35,6 +35,13 @@ class CelerySettings(BaseCustomSettings):
),
] = True

CELERY_POOL: Annotated[
Literal["prefork", "eventlet", "gevent", "solo", "threads"],
Field(
description="Type of pool to use. One of: prefork, eventlet, gevent, solo, threads. See https://docs.celeryq.dev/en/stable/userguide/workers.html#choosing-a-concurrency-implementation for details.",
),
] = "prefork"

model_config = SettingsConfigDict(
json_schema_extra={
"examples": [
Expand Down
10 changes: 4 additions & 6 deletions services/api-server/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,16 @@ if [ "${API_SERVER_WORKER_MODE}" = "true" ]; then
--recursive \
-- \
celery \
--app=boot_celery_worker:app \
--workdir=services/api-server/docker \
worker --pool=threads \
--app=simcore_service_api_server.modules.celery.worker.main:the_app \
worker --pool="${CELERY_POOL:-prefork}" \
--loglevel="${API_SERVER_LOGLEVEL}" \
--concurrency="${CELERY_CONCURRENCY}" \
--hostname="${API_SERVER_WORKER_NAME}" \
--queues="${CELERY_QUEUES:-default}"
else
exec celery \
--app=boot_celery_worker:app \
--workdir=services/api-server/docker \
worker --pool=threads \
--app=simcore_service_api_server.modules.celery.worker.main:the_app \
worker --pool="${CELERY_POOL:-prefork}" \
--loglevel="${API_SERVER_LOGLEVEL}" \
--concurrency="${CELERY_CONCURRENCY}" \
--hostname="${API_SERVER_WORKER_NAME}" \
Expand Down
13 changes: 0 additions & 13 deletions services/api-server/docker/boot_celery_worker.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from servicelib.celery.models import TaskKey
from simcore_service_api_server._service_function_jobs import FunctionJobService

from ...api.dependencies.authentication import Identity
from ...api.dependencies.rabbitmq import get_rabbitmq_rpc_client
from ...api.dependencies.services import (
from ....api.dependencies.authentication import Identity
from ....api.dependencies.rabbitmq import get_rabbitmq_rpc_client
from ....api.dependencies.services import (
get_catalog_service,
get_directorv2_service,
get_function_job_service,
Expand All @@ -20,13 +20,16 @@
get_solver_service,
get_storage_service,
)
from ...api.dependencies.webserver_http import get_session_cookie, get_webserver_session
from ...api.dependencies.webserver_rpc import get_wb_api_rpc_client
from ...models.api_resources import JobLinks
from ...models.domain.functions import PreRegisteredFunctionJobData
from ...models.schemas.jobs import JobPricingSpecification
from ...services_http.director_v2 import DirectorV2Api
from ...services_http.storage import StorageApi
from ....api.dependencies.webserver_http import (
get_session_cookie,
get_webserver_session,
)
from ....api.dependencies.webserver_rpc import get_wb_api_rpc_client
from ....models.api_resources import JobLinks
from ....models.domain.functions import PreRegisteredFunctionJobData
from ....models.schemas.jobs import JobPricingSpecification
from ....services_http.director_v2 import DirectorV2Api
from ....services_http.storage import StorageApi


async def _assemble_function_job_service(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
"""Main application to be deployed in for example uvicorn."""

from functools import partial

from celery.signals import ( # type: ignore[import-untyped]
worker_init,
worker_shutdown,
)
from celery_library.common import create_app as create_celery_app
from celery_library.signals import (
on_worker_init,
on_worker_shutdown,
)
from celery_library.utils import get_app_server, set_app_server
from servicelib.fastapi.celery.app_server import FastAPIAppServer
from servicelib.logging_utils import setup_loggers
from servicelib.tracing import TracingConfig

from ..core.application import create_app
from ..core.settings import ApplicationSettings
from .worker_tasks.tasks import setup_worker_tasks
from ....core.application import create_app
from ....core.settings import ApplicationSettings
from .tasks import setup_worker_tasks

_settings = ApplicationSettings.create_from_envs()
_tracing_settings = _settings.API_SERVER_TRACING
_tracing_config = TracingConfig.create(
tracing_settings=_tracing_settings,
service_name="api-server-celery-worker",
)


def get_app():
_settings = ApplicationSettings.create_from_envs()
_tracing_settings = _settings.API_SERVER_TRACING
_tracing_config = TracingConfig.create(
tracing_settings=_tracing_settings,
service_name="api-server-celery-worker",
)
setup_loggers(
log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED,
logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING,
Expand All @@ -37,9 +42,19 @@ def get_app():
return app


def worker_init_wrapper(sender, **_kwargs):
the_app = get_app()


@worker_init.connect
def worker_init_wrapper(**kwargs):
_settings = ApplicationSettings.create_from_envs()
assert _settings.API_SERVER_CELERY # nosec
app_server = FastAPIAppServer(app=create_app(_settings))
set_app_server(the_app, app_server)
return on_worker_init(app_server, **kwargs)


return partial(on_worker_init, app_server=app_server)(sender, **_kwargs)
@worker_shutdown.connect
def worker_shutdown_wrapper(**kwargs):
app_server = get_app_server(the_app)
return on_worker_shutdown(app_server, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from celery_library.types import register_celery_types, register_pydantic_types
from servicelib.logging_utils import log_context

from ...models.domain.celery_models import pydantic_types_to_register
from .functions_tasks import run_function
from ....models.domain.celery_models import pydantic_types_to_register
from ._functions_tasks import run_function

_logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.fastapi.celery.app_server import FastAPIAppServer
from settings_library.redis import RedisSettings
from simcore_service_api_server.celery_worker.worker_main import setup_worker_tasks
from simcore_service_api_server.celery.main import setup_worker_tasks
from simcore_service_api_server.clients import celery_task_manager
from simcore_service_api_server.core.application import create_app
from simcore_service_api_server.core.settings import ApplicationSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from simcore_service_api_server.api.dependencies.celery import (
get_task_manager,
)
from simcore_service_api_server.celery_worker.worker_tasks.functions_tasks import (
from simcore_service_api_server.celery.worker_tasks.functions_tasks import (
run_function as run_function_task,
)
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from servicelib.rabbitmq import RabbitMQRPCClient
from simcore_service_api_server._meta import API_VTAG
from simcore_service_api_server.api.dependencies.authentication import Identity
from simcore_service_api_server.celery_worker.worker_tasks import functions_tasks
from simcore_service_api_server.celery.worker_tasks import functions_tasks
from simcore_service_api_server.models.api_resources import JobLinks
from simcore_service_api_server.models.domain.functions import (
PreRegisteredFunctionJobData,
Expand Down
2 changes: 2 additions & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ services:
API_SERVER_WORKER_NAME: "api-worker-{{.Node.Hostname}}-{{.Task.Slot}}-{{.Task.ID}}"
API_SERVER_WORKER_MODE: "true"
CELERY_CONCURRENCY: ${API_SERVER_CELERY_CONCURRENCY}
CELERY_POOL: threads
CELERY_QUEUES: "api_worker_queue"
networks: *api_server_networks

Expand Down Expand Up @@ -1254,6 +1255,7 @@ services:
STORAGE_WORKER_NAME: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}-{{.Task.ID}}"
STORAGE_WORKER_MODE: "true"
CELERY_CONCURRENCY: 100
CELERY_POOL: "threads"
networks: *storage_networks

sto-worker-cpu-bound:
Expand Down
8 changes: 4 additions & 4 deletions services/storage/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ if [ "${STORAGE_WORKER_MODE}" = "true" ]; then
--recursive \
-- \
celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--app=simcore_service_storage.modules.celery.worker.main:the_app \
worker --pool="${CELERY_POOL:-prefork}" \
--loglevel="${SERVER_LOG_LEVEL}" \
--concurrency="${CELERY_CONCURRENCY}" \
--hostname="${STORAGE_WORKER_NAME}" \
--queues="${CELERY_QUEUES:-default}"
else
exec celery \
--app=simcore_service_storage.modules.celery.worker_main:app \
worker --pool=threads \
--app=simcore_service_storage.modules.celery.worker.main:the_app \
worker --pool="${CELERY_POOL:-prefork}" \
--loglevel="${SERVER_LOG_LEVEL}" \
--concurrency="${CELERY_CONCURRENCY}" \
--hostname="${STORAGE_WORKER_NAME}" \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from celery.signals import ( # type: ignore[import-untyped]
worker_init,
worker_process_init,
worker_process_shutdown,
worker_shutdown,
)
from celery_library.common import create_app as create_celery_app
from celery_library.signals import (
on_worker_init,
on_worker_shutdown,
)
from celery_library.utils import get_app_server, set_app_server
from servicelib.fastapi.celery.app_server import FastAPIAppServer
from servicelib.logging_utils import setup_loggers
from servicelib.tracing import TracingConfig

from ....api._worker_tasks.tasks import setup_worker_tasks
from ....core.application import create_app
from ....core.settings import ApplicationSettings

_settings = ApplicationSettings.create_from_envs()
_tracing_config = TracingConfig.create(
tracing_settings=_settings.STORAGE_TRACING,
service_name="storage-celery-worker",
)


def get_app():
setup_loggers(
log_format_local_dev_enabled=_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED,
logger_filter_mapping=_settings.STORAGE_LOG_FILTER_MAPPING,
tracing_config=_tracing_config,
log_base_level=_settings.log_level,
noisy_loggers=None,
)

assert _settings.STORAGE_CELERY # nosec
app = create_celery_app(_settings.STORAGE_CELERY)
setup_worker_tasks(app)

return app


the_app = get_app()


@worker_init.connect
@worker_process_init.connect
def worker_init_wrapper(**kwargs):
fastapi_app = create_app(_settings, tracing_config=_tracing_config)
app_server = FastAPIAppServer(app=fastapi_app)
set_app_server(the_app, app_server)
return on_worker_init(app_server, **kwargs)


@worker_shutdown.connect
@worker_process_shutdown.connect
def worker_shutdown_wrapper(**kwargs):
app_server = get_app_server(the_app)
return on_worker_shutdown(app_server, **kwargs)

This file was deleted.

Loading