Skip to content

Commit a1de0be

Browse files
continue
1 parent d5cdf8c commit a1de0be

File tree

4 files changed

+19
-12
lines changed

4 files changed

+19
-12
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@
2626
_STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()
2727

2828

29-
def on_worker_init(sender, **_kwargs) -> None:
29+
def on_worker_init(
30+
app_factory,
31+
celery_settings,
32+
sender,
33+
**_kwargs,
34+
) -> None:
3035
startup_complete_event = threading.Event()
3136

3237
def _init(startup_complete_event: threading.Event) -> None:
3338
loop = asyncio.new_event_loop()
3439
asyncio.set_event_loop(loop)
3540
shutdown_event = asyncio.Event()
3641

37-
fastapi_app = create_app(app_settings)
38-
39-
assert app_settings.STORAGE_CELERY
40-
celery_settings = app_settings.STORAGE_CELERY
42+
fastapi_app = app_factory()
4143

4244
async def setup_task_worker():
4345
redis_client_sdk = RedisClientSDK(

services/storage/src/simcore_service_storage/modules/celery/__init__.py

Whitespace-only changes.

services/storage/src/simcore_service_storage/modules/celery/worker_main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Main application to be deployed in for example uvicorn."""
22

33
import logging
4+
from functools import partial
45

56
from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped]
67
from celery_library.common import create_app as create_celery_app
@@ -11,6 +12,7 @@
1112
from servicelib.logging_utils import config_all_loggers
1213
from simcore_service_storage.api._worker_tasks.tasks import setup_worker_tasks
1314

15+
from ...core.application import create_app
1416
from ...core.settings import ApplicationSettings
1517

1618
_settings = ApplicationSettings.create_from_envs()
@@ -26,7 +28,8 @@
2628

2729
assert _settings.STORAGE_CELERY
2830
app = create_celery_app(_settings.STORAGE_CELERY)
29-
worker_init.connect(on_worker_init)
31+
app_factory = partial(create_app(_settings))
32+
worker_init.connect(partial(on_worker_init, app_factory, _settings.STORAGE_CELERY))
3033
worker_shutdown.connect(on_worker_shutdown)
3134

3235

services/storage/tests/conftest.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import random
1313
import sys
1414
from collections.abc import AsyncIterator, Awaitable, Callable
15+
from functools import partial
1516
from pathlib import Path
1617
from typing import Any, Final, cast
1718

@@ -974,10 +975,7 @@ def celery_config() -> dict[str, Any]:
974975
def mock_celery_app(mocker: MockerFixture, celery_config: dict[str, Any]) -> Celery:
975976
celery_app = Celery(**celery_config)
976977

977-
for module in (
978-
"simcore_service_storage.modules.celery._common.create_app",
979-
"simcore_service_storage.modules.celery.create_app",
980-
):
978+
for module in ("celery_library.create_app",):
981979
mocker.patch(module, return_value=celery_app)
982980

983981
return celery_app
@@ -1000,13 +998,17 @@ async def with_storage_celery_worker_controller(
1000998
register_celery_tasks: Callable[[Celery], None],
1001999
) -> AsyncIterator[TestWorkController]:
10021000
# Signals must be explicitily connected
1003-
worker_init.connect(on_worker_init)
1001+
monkeypatch.setenv("STORAGE_WORKER_MODE", "true")
1002+
app_settings = ApplicationSettings.create_from_envs()
1003+
app_factory = partial(create_app, app_settings)
1004+
worker_init.connect(
1005+
partial(on_worker_init, app_factory, app_settings.STORAGE_CELERY)
1006+
)
10041007
worker_shutdown.connect(on_worker_shutdown)
10051008

10061009
setup_worker_tasks(celery_app)
10071010
register_celery_tasks(celery_app)
10081011

1009-
monkeypatch.setenv("STORAGE_WORKER_MODE", "true")
10101012
with start_worker(
10111013
celery_app,
10121014
pool="threads",

0 commit comments

Comments
 (0)