22# pylint: disable=unused-argument
33
44
5+ import datetime
6+ from collections .abc import AsyncIterator
7+ from functools import partial
58from pathlib import Path
9+ from typing import Any
610
711import pytest
12+ from celery import Celery # type: ignore[import-untyped]
13+ from celery .contrib .testing .worker import ( # type: ignore[import-untyped]
14+ TestWorkController ,
15+ start_worker ,
16+ )
17+ from celery .signals import worker_init , worker_shutdown # type: ignore[import-untyped]
18+ from celery .worker .worker import WorkController # type: ignore[import-untyped]
19+ from celery_library .signals import on_worker_init , on_worker_shutdown
820from models_library .basic_types import BootModeEnum
921from pytest_simcore .helpers .monkeypatch_envs import EnvVarsDict , setenvs_from_dict
22+ from servicelib .fastapi .celery .app_server import FastAPIAppServer
23+ from simcore_service_notifications .core .application import create_app
24+ from simcore_service_notifications .core .settings import ApplicationSettings
25+ from simcore_service_notifications .modules .celery .tasks import (
26+ TaskQueue , # type: ignore[import-untyped]
27+ )
28+ from simcore_service_notifications .modules .celery .tasks import (
29+ setup_worker_tasks ,
30+ )
1031
1132pytest_plugins = [
1233 "pytest_simcore.docker_compose" ,
1334 "pytest_simcore.docker_swarm" ,
1435 "pytest_simcore.environment_configs" ,
1536 "pytest_simcore.postgres_service" ,
1637 "pytest_simcore.rabbit_service" ,
38+ "pytest_simcore.redis_service" ,
1739 "pytest_simcore.repository_paths" ,
1840]
1941
@@ -40,3 +62,53 @@ def mock_environment(
4062 "SC_BOOT_MODE" : BootModeEnum .DEBUG ,
4163 },
4264 )
65+
66+
67+ @pytest .fixture (scope = "session" )
68+ def celery_config () -> dict [str , Any ]:
69+ return {
70+ "broker_connection_retry_on_startup" : True ,
71+ "broker_url" : "memory://localhost//" ,
72+ "result_backend" : "cache+memory://localhost//" ,
73+ "result_expires" : datetime .timedelta (days = 7 ),
74+ "result_extended" : True ,
75+ "pool" : "threads" ,
76+ "task_default_queue" : "default" ,
77+ "task_send_sent_event" : True ,
78+ "task_track_started" : True ,
79+ "worker_send_task_events" : True ,
80+ }
81+
82+
83+ @pytest .fixture
84+ async def with_celery_worker (
85+ mock_environment : EnvVarsDict ,
86+ celery_app : Celery ,
87+ monkeypatch : pytest .MonkeyPatch ,
88+ ) -> AsyncIterator [TestWorkController ]:
89+ # Signals must be explicitily connected
90+ monkeypatch .setenv ("NOTIFICATIONS_WORKER_MODE" , "true" )
91+ app_settings = ApplicationSettings .create_from_envs ()
92+
93+ def _on_worker_init_wrapper (sender : WorkController , ** _kwargs ):
94+ assert app_settings .NOTIFICATIONS_CELERY # nosec
95+ return partial (
96+ on_worker_init ,
97+ FastAPIAppServer (app = create_app (app_settings )),
98+ app_settings .NOTIFICATIONS_CELERY ,
99+ )(sender , ** _kwargs )
100+
101+ worker_init .connect (_on_worker_init_wrapper )
102+ worker_shutdown .connect (on_worker_shutdown )
103+
104+ setup_worker_tasks (celery_app )
105+
106+ with start_worker (
107+ celery_app ,
108+ pool = "threads" ,
109+ concurrency = 1 ,
110+ loglevel = "info" ,
111+ perform_ping_check = False ,
112+ queues = "," .join (queue .value for queue in TaskQueue ),
113+ ) as worker :
114+ yield worker
0 commit comments