-
Couldn't load subscription status.
- Fork 32
🐛 Use Celery prefork pool for CPU-bound tasks
#8500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
giancarloromeo
merged 78 commits into
ITISFoundation:master
from
giancarloromeo:is8496/use-prefork-pool-for-cpu-bound-celery-tasks
Oct 17, 2025
Merged
Changes from 36 commits
Commits
Show all changes
78 commits
Select commit
Hold shift + click to select a range
fefdae2
prefork experiments
giancarloromeo 3ebb954
fix shutdown
giancarloromeo 9fc1e3d
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo 97e50d4
remove logs
giancarloromeo 29083b9
fix typecheck
giancarloromeo b3636fb
typecheck
giancarloromeo 27f4925
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo 76e886f
add settings
giancarloromeo fe05e66
unify worker setup
giancarloromeo e9371cb
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo 33d23bc
remove log
giancarloromeo 4ab42d8
remove unused
giancarloromeo c8e957b
fix api-server
giancarloromeo 1a8d135
update docker-compose
giancarloromeo 959fa79
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo d624966
fix test
giancarloromeo c97c75e
fix test
giancarloromeo d660041
fix test
giancarloromeo c4d33c4
fix test
giancarloromeo 012ea99
fix test
giancarloromeo fb1ac05
fix
giancarloromeo e6d01f3
fix tests
giancarloromeo 08e6c1a
fix tests
giancarloromeo 4f839b8
fix tests
giancarloromeo 1a86d71
typecheck
giancarloromeo ec1c828
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo fb4a847
fix tests
giancarloromeo 54ee4ca
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo d8c148b
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo da302cb
update description
giancarloromeo d1e342e
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo d5aa656
execute only once init in prefork
giancarloromeo 88ad319
refactoring
giancarloromeo ca12f4b
fix import
giancarloromeo 377d334
typecheck
giancarloromeo 3e218cd
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo d766256
use new signal handlers
giancarloromeo ade0d43
update name
giancarloromeo 41ad946
fix signals
giancarloromeo 1d8bb47
fix signals
giancarloromeo aaf0f12
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo db5d2d8
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo ce67003
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo f20badd
add rabbit
giancarloromeo 056954c
fix service selection
giancarloromeo c95d706
fix test
giancarloromeo 4ea087e
set worker mode
giancarloromeo 935478f
set worker mode
giancarloromeo 83fb4fe
fix
giancarloromeo 80dfc30
add redis
giancarloromeo 927fa7b
fix
giancarloromeo 901df8c
fix pool name
giancarloromeo 2409bd3
use wprker app settings
giancarloromeo 47fb61c
try
giancarloromeo 91a3f82
set pool
giancarloromeo b31d2f0
fix
giancarloromeo 6908e7e
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo d0c0085
mock celery app
giancarloromeo bb25608
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo c718176
fix
giancarloromeo 8f93ed4
fix mocks
giancarloromeo 02ae9da
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo daf3447
add startup timeout
giancarloromeo 4d04b63
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo 5518f24
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo 6713d4d
Merge branch 'is8496/use-prefork-pool-for-cpu-bound-celery-tasks' of …
giancarloromeo 20b56de
verbose storage
giancarloromeo bea45b4
restore
giancarloromeo 5970ecb
force shutdown
giancarloromeo cab7a6a
update
giancarloromeo 18ce2c7
try
giancarloromeo 2ceee86
revert
giancarloromeo f8a575b
fix test
giancarloromeo 5dcda9c
Merge remote-tracking branch 'upstream/master' into is8496/use-prefor…
giancarloromeo ce82a27
fix storage
giancarloromeo 945e0e5
fix api server
giancarloromeo c0cdaa2
fix api server tests
giancarloromeo 436e6dc
Merge branch 'master' into is8496/use-prefork-pool-for-cpu-bound-cele…
giancarloromeo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from collections.abc import Callable | ||
|
|
||
| from celery import Celery # type: ignore[import-untyped] | ||
| from servicelib.celery.app_server import BaseAppServer | ||
| from settings_library.celery import CelerySettings | ||
|
|
||
| from ..app import create_app | ||
| from .signals import register_worker_signals | ||
|
|
||
|
|
||
| def create_worker_app( | ||
| settings: CelerySettings, | ||
| register_worker_tasks_cb: Callable[[Celery], None], | ||
| app_server_factory_cb: Callable[[], BaseAppServer], | ||
| ) -> Celery: | ||
| app = create_app(settings) | ||
| register_worker_tasks_cb(app) | ||
| register_worker_signals(app, settings, app_server_factory_cb) | ||
|
|
||
| return app |
File renamed without changes.
59 changes: 59 additions & 0 deletions
59
packages/celery-library/src/celery_library/worker/signals.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import asyncio | ||
| import threading | ||
| from collections.abc import Callable | ||
|
|
||
| from celery import Celery # type: ignore[import-untyped] | ||
| from celery.signals import ( # type: ignore[import-untyped] | ||
| worker_init, | ||
| worker_process_init, | ||
| worker_process_shutdown, | ||
| worker_shutdown, | ||
| ) | ||
| from servicelib.celery.app_server import BaseAppServer | ||
| from settings_library.celery import CelerySettings | ||
|
|
||
| from .app_server import get_app_server, set_app_server | ||
|
|
||
|
|
||
| def register_worker_signals( | ||
| app: Celery, | ||
| settings: CelerySettings, | ||
| app_server_factory: Callable[[], BaseAppServer], | ||
| ) -> None: | ||
| def _worker_init_wrapper(**_kwargs) -> None: | ||
| startup_complete_event = threading.Event() | ||
|
|
||
| def _init(startup_complete_event: threading.Event) -> None: | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
|
|
||
| app_server = app_server_factory() | ||
| app_server.event_loop = loop | ||
|
|
||
| set_app_server(app, app_server) | ||
|
|
||
| loop.run_until_complete( | ||
| app_server.run_until_shutdown(startup_complete_event) | ||
| ) | ||
|
|
||
| thread = threading.Thread( | ||
| group=None, | ||
| target=_init, | ||
| name="app_server_init", | ||
| args=(startup_complete_event,), | ||
| daemon=True, | ||
| ) | ||
| thread.start() | ||
|
|
||
| startup_complete_event.wait() | ||
|
|
||
| def _worker_shutdown_wrapper(**_kwargs) -> None: | ||
| get_app_server(app).shutdown_event.set() | ||
|
|
||
| match settings.CELERY_POOL: | ||
| case "prefork": | ||
| worker_process_init.connect(_worker_init_wrapper, weak=False) | ||
| worker_process_shutdown.connect(_worker_shutdown_wrapper, weak=False) | ||
| case _: | ||
| worker_init.connect(_worker_init_wrapper, weak=False) | ||
| worker_shutdown.connect(_worker_shutdown_wrapper, weak=False) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
45 changes: 0 additions & 45 deletions
45
services/api-server/src/simcore_service_api_server/celery_worker/worker_main.py
This file was deleted.
Oops, something went wrong.
2 changes: 1 addition & 1 deletion
2
services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
Empty file.
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 39 additions & 0 deletions
39
services/api-server/src/simcore_service_api_server/modules/celery/worker/main.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| from celery_library.worker.app import create_worker_app | ||
| from servicelib.fastapi.celery.app_server import FastAPIAppServer | ||
| from servicelib.logging_utils import setup_loggers | ||
| from servicelib.tracing import TracingConfig | ||
|
|
||
| from ....core.application import create_app | ||
| from ....core.settings import ApplicationSettings | ||
| from .tasks import register_worker_tasks | ||
|
|
||
| _settings = ApplicationSettings.create_from_envs() | ||
| _tracing_settings = _settings.API_SERVER_TRACING | ||
| _tracing_config = TracingConfig.create( | ||
| tracing_settings=_tracing_settings, | ||
| service_name="api-server-celery-worker", | ||
| ) | ||
|
|
||
|
|
||
| def get_app(): | ||
| setup_loggers( | ||
| log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED, | ||
| logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING, | ||
| tracing_config=_tracing_config, | ||
| log_base_level=_settings.log_level, | ||
| noisy_loggers=None, | ||
| ) | ||
|
|
||
| def _app_server_factory() -> FastAPIAppServer: | ||
| fastapi_app = create_app(_settings, tracing_config=_tracing_config) | ||
| return FastAPIAppServer(app=fastapi_app) | ||
|
|
||
| assert _settings.API_SERVER_CELERY # nosec | ||
| return create_worker_app( | ||
| _settings.API_SERVER_CELERY, | ||
| register_worker_tasks, | ||
| _app_server_factory, | ||
| ) | ||
|
|
||
|
|
||
| app = get_app() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.