Skip to content

Commit 94324cf

Browse files
committed
cleanup fixtures
1 parent 8b3ed88 commit 94324cf

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from collections.abc import AsyncIterator, Callable
2+
from functools import partial
3+
4+
import pytest
5+
from celery import Celery
6+
from celery.contrib.testing.worker import TestWorkController, start_worker
7+
from celery.signals import worker_init, worker_shutdown
8+
from celery.worker.worker import WorkController
9+
from celery_library.signals import on_worker_init, on_worker_shutdown
10+
from pytest_simcore.helpers.monkeypatch_envs import delenvs_from_dict, setenvs_from_dict
11+
from pytest_simcore.helpers.typing_env import EnvVarsDict
12+
from servicelib.fastapi.celery.app_server import FastAPIAppServer
13+
from simcore_service_api_server.celery.worker_main import setup_worker_tasks
14+
from simcore_service_api_server.core.application import create_app
15+
from simcore_service_api_server.core.settings import ApplicationSettings
16+
17+
pytest_plugins = [
18+
"pytest_simcore.rabbit_service",
19+
]
20+
21+
22+
@pytest.fixture
23+
def app_environment(
24+
monkeypatch: pytest.MonkeyPatch,
25+
app_environment: EnvVarsDict,
26+
rabbit_env_vars_dict: EnvVarsDict,
27+
) -> EnvVarsDict:
28+
# do not init other services
29+
delenvs_from_dict(monkeypatch, ["API_SERVER_RABBITMQ"])
30+
return setenvs_from_dict(
31+
monkeypatch,
32+
{
33+
**rabbit_env_vars_dict,
34+
"API_SERVER_POSTGRES": "null",
35+
"API_SERVER_HEALTH_CHECK_TASK_PERIOD_SECONDS": "3",
36+
"API_SERVER_HEALTH_CHECK_TASK_TIMEOUT_SECONDS": "1",
37+
},
38+
)
39+
40+
41+
@pytest.fixture
42+
def register_celery_tasks() -> Callable[[Celery], None]:
43+
"""override if tasks are needed"""
44+
45+
def _(celery_app: Celery) -> None: ...
46+
47+
return _
48+
49+
50+
@pytest.fixture
51+
async def with_storage_celery_worker(
52+
app_environment: EnvVarsDict,
53+
celery_app: Celery,
54+
monkeypatch: pytest.MonkeyPatch,
55+
register_celery_tasks: Callable[[Celery], None],
56+
) -> AsyncIterator[TestWorkController]:
57+
# Signals must be explicitily connected
58+
monkeypatch.setenv("API_SERVER_WORKER_MODE", "true")
59+
app_settings = ApplicationSettings.create_from_envs()
60+
61+
app_server = FastAPIAppServer(app=create_app(app_settings))
62+
63+
def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
64+
assert app_settings.API_SERVER_CELERY # nosec
65+
return partial(on_worker_init, app_server, app_settings.API_SERVER_CELERY)(
66+
sender, **_kwargs
67+
)
68+
69+
worker_init.connect(_on_worker_init_wrapper)
70+
worker_shutdown.connect(on_worker_shutdown)
71+
72+
setup_worker_tasks(celery_app)
73+
register_celery_tasks(celery_app)
74+
75+
with start_worker(
76+
celery_app,
77+
pool="threads",
78+
concurrency=1,
79+
loglevel="info",
80+
perform_ping_check=False,
81+
queues="default",
82+
) as worker:
83+
yield worker
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from celery.contrib.testing.worker import TestWorkController
2+
3+
4+
async def test_with_fake_run_function(with_storage_celery_worker: TestWorkController):
5+
pass

0 commit comments

Comments
 (0)