2424from celery import Celery
2525from celery .contrib .testing .worker import TestWorkController , start_worker
2626from celery .signals import worker_init , worker_shutdown
27- from celery_library .worker .signals import _worker_init_wrapper , _worker_shutdown_wrapper
27+ from celery .worker .worker import WorkController
28+ from celery_library .signals import on_worker_init , on_worker_shutdown
2829from faker import Faker
2930from fakeredis .aioredis import FakeRedis
3031from fastapi import FastAPI
6970from settings_library .rabbit import RabbitSettings
7071from simcore_postgres_database .models .tokens import tokens
7172from simcore_postgres_database .storage_models import file_meta_data , projects , users
72- from simcore_service_storage .api ._worker_tasks .tasks import register_worker_tasks
73+ from simcore_service_storage .api ._worker_tasks .tasks import setup_worker_tasks
7374from simcore_service_storage .core .application import create_app
7475from simcore_service_storage .core .settings import ApplicationSettings
7576from simcore_service_storage .datcore_dsm import DatCoreDataManager
@@ -219,9 +220,9 @@ def app_settings(
219220 enabled_rabbitmq : RabbitSettings ,
220221 sqlalchemy_async_engine : AsyncEngine ,
221222 postgres_host_config : dict [str , str ],
223+ mocked_s3_server_envs : EnvVarsDict ,
222224 datcore_adapter_service_mock : respx .MockRouter ,
223225 mocked_redis_server : None ,
224- mocked_s3_server_envs : EnvVarsDict , # Moved to end to ensure S3_ENDPOINT override happens last
225226) -> ApplicationSettings :
226227 test_app_settings = ApplicationSettings .create_from_envs ()
227228 print (f"{ test_app_settings .model_dump_json (indent = 2 )= } " )
@@ -999,7 +1000,7 @@ def mock_celery_app(mocker: MockerFixture, celery_config: dict[str, Any]) -> Cel
9991000
10001001
10011002@pytest .fixture
1002- def register_test_tasks () -> Callable [[Celery ], None ]:
1003+ def register_celery_tasks () -> Callable [[Celery ], None ]:
10031004 """override if tasks are needed"""
10041005
10051006 def _ (celery_app : Celery ) -> None : ...
@@ -1008,47 +1009,32 @@ def _(celery_app: Celery) -> None: ...
10081009
10091010
10101011@pytest .fixture
1011- def app_server_factory_with_worker_mode (
1012+ async def with_storage_celery_worker (
10121013 app_environment : EnvVarsDict ,
1014+ celery_app : Celery ,
10131015 monkeypatch : pytest .MonkeyPatch ,
1014- ) -> Callable [[], FastAPIAppServer ]:
1016+ register_celery_tasks : Callable [[Celery ], None ],
1017+ ) -> AsyncIterator [TestWorkController ]:
1018+ # Signals must be explicitily connected
10151019 monkeypatch .setenv ("STORAGE_WORKER_MODE" , "true" )
1020+ app_settings = ApplicationSettings .create_from_envs ()
1021+ tracing_config = TracingConfig .create (
1022+ tracing_settings = None , # disable tracing in tests
1023+ service_name = "storage-api" ,
1024+ )
10161025
1017- # Cache to ensure we create the app server only once per test
1018- _app_server_cache : list [FastAPIAppServer ] = []
1019-
1020- def _app_server_factory () -> FastAPIAppServer :
1021- if not _app_server_cache :
1022- app_settings = ApplicationSettings .create_from_envs ()
1023- assert app_settings .STORAGE_WORKER_MODE is True
1024-
1025- tracing_config = TracingConfig .create (
1026- tracing_settings = None , # disable tracing in tests
1027- service_name = "storage-worker" ,
1028- )
1029- _app_server_cache .append (
1030- FastAPIAppServer (app = create_app (app_settings , tracing_config ))
1031- )
1032- return _app_server_cache [0 ]
1033-
1034- return _app_server_factory
1035-
1026+ app_server = FastAPIAppServer (
1027+ app = create_app (app_settings , tracing_config = tracing_config )
1028+ )
10361029
1037- @pytest .fixture
1038- async def with_storage_celery_worker (
1039- celery_app : Celery ,
1040- app_server_factory_with_worker_mode : Callable [[], FastAPIAppServer ],
1041- register_test_tasks : Callable [[Celery ], None ],
1042- ) -> AsyncIterator [TestWorkController ]:
1030+ def _on_worker_init_wrapper (sender : WorkController , ** _kwargs ):
1031+ return on_worker_init (sender , app_server , ** _kwargs )
10431032
1044- worker_init .connect (
1045- _worker_init_wrapper (celery_app , app_server_factory_with_worker_mode ),
1046- weak = False ,
1047- )
1048- worker_shutdown .connect (_worker_shutdown_wrapper (celery_app ), weak = False )
1033+ worker_init .connect (_on_worker_init_wrapper )
1034+ worker_shutdown .connect (on_worker_shutdown )
10491035
1050- register_worker_tasks (celery_app )
1051- register_test_tasks (celery_app )
1036+ setup_worker_tasks (celery_app )
1037+ register_celery_tasks (celery_app )
10521038
10531039 with start_worker (
10541040 celery_app ,
@@ -1060,8 +1046,6 @@ async def with_storage_celery_worker(
10601046 ) as worker :
10611047 yield worker
10621048
1063- _worker_shutdown_wrapper (celery_app )()
1064-
10651049
10661050@pytest .fixture
10671051async def storage_rabbitmq_rpc_client (
0 commit comments