Skip to content

Commit 30ff8c9

Browse files
tests: fix initialization
1 parent a26d82c commit 30ff8c9

File tree

8 files changed

+102
-99
lines changed

8 files changed

+102
-99
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/notifications/__init__.py

Whitespace-only changes.

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/notifications/messages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from models_library.api_schemas_notifications import NOTIFICATIONS_RPC_NAMESPACE
44
from models_library.rabbitmq_basic_types import RPCMethodName
5-
from models_library.rpc.notifications.messages import BaseRecipient, NotificationMessage
5+
from models_library.rpc.notifications.messages import NotificationMessage, Recipient
66
from pydantic import NonNegativeInt, TypeAdapter
77

88
from ... import RabbitMQRPCClient
@@ -14,7 +14,7 @@ async def send_notification_message(
1414
rabbitmq_rpc_client: RabbitMQRPCClient,
1515
*,
1616
message: NotificationMessage,
17-
recipients: list[BaseRecipient],
17+
recipients: list[Recipient],
1818
) -> None:
1919
await rabbitmq_rpc_client.request(
2020
NOTIFICATIONS_RPC_NAMESPACE,

services/notifications/src/simcore_service_notifications/modules/celery/_email_tasks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
EmailRecipient,
88
NotificationMessage,
99
)
10+
from servicelib.celery.models import TaskID
1011

1112
_logger = logging.getLogger(__name__)
1213

@@ -15,6 +16,7 @@
1516

1617
async def send_email(
1718
task: Task,
19+
task_id: TaskID,
1820
message: NotificationMessage,
1921
recipient: EmailRecipient,
2022
) -> None:

services/notifications/src/simcore_service_notifications/modules/celery/tasks.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@
1616
_logger = logging.getLogger(__name__)
1717

1818

19-
_TASK_QUEUE_PREFIX: str = "notifications"
19+
_NOTIFICATIONS_PREFIX: str = "notifications"
2020

2121

2222
class TaskQueue(StrEnum):
23-
DEFAULT = f"{_TASK_QUEUE_PREFIX}.default"
23+
DEFAULT = f"{_NOTIFICATIONS_PREFIX}.default"
2424

2525

2626
def setup_worker_tasks(app: Celery) -> None:
2727
register_celery_types()
2828
register_pydantic_types(NotificationMessage, EmailRecipient, SMSRecipient)
2929

3030
with log_context(_logger, logging.INFO, msg="worker tasks registration"):
31-
register_task(app, send_email, EMAIL_CHANNEL_NAME)
31+
register_task(
32+
app, send_email, ".".join((_NOTIFICATIONS_PREFIX, EMAIL_CHANNEL_NAME))
33+
)

services/notifications/src/simcore_service_notifications/services/notifications_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from enum import StrEnum
1+
import logging
22

33
from models_library.rpc.notifications.messages import NotificationMessage, Recipient
44
from servicelib.celery.models import TaskContext
55
from servicelib.celery.task_manager import TaskManager
66

7+
from ..modules.celery.tasks import TaskQueue
78

8-
class TaskQueues(StrEnum):
9-
DEFAULT = "notifications.default"
9+
_logger = logging.getLogger(__name__)
1010

1111

1212
async def send_notification(
@@ -19,7 +19,7 @@ async def send_notification(
1919
await task_manager.send_task(
2020
name=f"notifications.{recipient.type}",
2121
context=TaskContext(),
22-
queue=TaskQueues.DEFAULT,
22+
queue=TaskQueue.DEFAULT,
2323
message=message,
2424
recipient=recipient,
2525
)
Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,11 @@
11
# pylint: disable=redefined-outer-name
22
# pylint: disable=unused-argument
33

4-
5-
import datetime
6-
from collections.abc import AsyncIterator, Awaitable, Callable
7-
from functools import partial
84
from pathlib import Path
9-
from typing import Any
105

116
import pytest
12-
from celery import Celery # type: ignore[import-untyped]
13-
from celery.contrib.testing.worker import ( # type: ignore[import-untyped]
14-
TestWorkController,
15-
start_worker,
16-
)
17-
from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped]
18-
from celery.worker.worker import WorkController
19-
from celery_library.signals import on_worker_init, on_worker_shutdown
207
from models_library.basic_types import BootModeEnum
218
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
22-
from servicelib.fastapi.celery.app_server import FastAPIAppServer
23-
from servicelib.rabbitmq import RabbitMQRPCClient
24-
from simcore_service_notifications.core.application import create_app
25-
from simcore_service_notifications.core.settings import ApplicationSettings
26-
from simcore_service_notifications.modules.celery.tasks import (
27-
TaskQueue,
28-
setup_worker_tasks,
29-
)
309

3110
pytest_plugins = [
3211
"pytest_simcore.docker_compose",
@@ -60,62 +39,3 @@ def mock_environment(
6039
"SC_BOOT_MODE": BootModeEnum.DEBUG,
6140
},
6241
)
63-
64-
65-
@pytest.fixture(scope="session")
66-
def celery_config() -> dict[str, Any]:
67-
return {
68-
"broker_connection_retry_on_startup": True,
69-
"broker_url": "memory://localhost//",
70-
"result_backend": "cache+memory://localhost//",
71-
"result_expires": datetime.timedelta(days=7),
72-
"result_extended": True,
73-
"pool": "threads",
74-
"task_default_queue": "default",
75-
"task_send_sent_event": True,
76-
"task_track_started": True,
77-
"worker_send_task_events": True,
78-
}
79-
80-
81-
@pytest.fixture
82-
async def with_celery_worker(
83-
app_environment: EnvVarsDict,
84-
celery_app: Celery,
85-
monkeypatch: pytest.MonkeyPatch,
86-
) -> AsyncIterator[TestWorkController]:
87-
# Signals must be explicitily connected
88-
monkeypatch.setenv("NOTIFICATIONS_WORKER_MODE", "true")
89-
app_settings = ApplicationSettings.create_from_envs()
90-
91-
def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
92-
assert app_settings.NOTIFICATIONS_CELERY # nosec
93-
return partial(
94-
on_worker_init,
95-
FastAPIAppServer(app=create_app(app_settings)),
96-
app_settings.NOTIFICATIONS_CELERY,
97-
)(sender, **_kwargs)
98-
99-
worker_init.connect(_on_worker_init_wrapper)
100-
worker_shutdown.connect(on_worker_shutdown)
101-
102-
setup_worker_tasks(celery_app)
103-
104-
with start_worker(
105-
celery_app,
106-
pool="threads",
107-
concurrency=1,
108-
loglevel="info",
109-
perform_ping_check=False,
110-
queues=",".join(queue.value for queue in TaskQueue),
111-
) as worker:
112-
yield worker
113-
114-
115-
@pytest.fixture
116-
async def notifications_rabbitmq_rpc_client(
117-
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
118-
) -> RabbitMQRPCClient:
119-
rpc_client = await rabbitmq_rpc_client("pytest_notifications_rpc_client")
120-
assert rpc_client
121-
return rpc_client

services/notifications/tests/unit/conftest.py

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,31 @@
11
# pylint: disable=redefined-outer-name
22
# pylint: disable=unused-argument
33

4-
from collections.abc import AsyncIterator
5-
from typing import Final
4+
import datetime
5+
from collections.abc import AsyncIterator, Awaitable, Callable
6+
from functools import partial
7+
from typing import Any, Final
68

79
import pytest
810
from asgi_lifespan import LifespanManager
11+
from celery import Celery
12+
from celery.contrib.testing.worker import start_worker
13+
from celery.signals import worker_init, worker_shutdown
14+
from celery.worker.worker import WorkController
15+
from celery_library.signals import on_worker_init, on_worker_shutdown
916
from fastapi import FastAPI
10-
from fastapi.testclient import TestClient
17+
from pytest_mock import MockerFixture
1118
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
19+
from servicelib.fastapi.celery.app_server import FastAPIAppServer
20+
from servicelib.rabbitmq import RabbitMQRPCClient
1221
from settings_library.rabbit import RabbitSettings
1322
from settings_library.redis import RedisSettings
1423
from simcore_service_notifications.core.application import create_app
1524
from simcore_service_notifications.core.settings import ApplicationSettings
25+
from simcore_service_notifications.modules.celery.tasks import (
26+
TaskQueue,
27+
setup_worker_tasks,
28+
)
1629

1730
_LIFESPAN_TIMEOUT: Final[int] = 30
1831

@@ -56,13 +69,77 @@ def app_settings(
5669

5770

5871
@pytest.fixture
59-
async def initialized_app(app_settings: ApplicationSettings) -> AsyncIterator[FastAPI]:
72+
async def fastapi_app(app_settings: ApplicationSettings) -> AsyncIterator[FastAPI]:
6073
app: FastAPI = create_app(app_settings)
6174

6275
async with LifespanManager(app, startup_timeout=30, shutdown_timeout=30):
6376
yield app
6477

6578

79+
@pytest.fixture(scope="session")
80+
def celery_config() -> dict[str, Any]:
81+
return {
82+
"broker_connection_retry_on_startup": True,
83+
"broker_url": "memory://localhost//",
84+
"result_backend": "cache+memory://localhost//",
85+
"result_expires": datetime.timedelta(days=7),
86+
"result_extended": True,
87+
"pool": "threads",
88+
"task_default_queue": "default",
89+
"task_send_sent_event": True,
90+
"task_track_started": True,
91+
"worker_send_task_events": True,
92+
}
93+
94+
95+
@pytest.fixture
96+
def mock_celery_app(mocker: MockerFixture, celery_config: dict[str, Any]) -> Celery:
97+
celery_app = Celery(**celery_config)
98+
99+
for module in ("simcore_service_notifications.clients.celery.create_app",):
100+
mocker.patch(module, return_value=celery_app)
101+
102+
return celery_app
103+
104+
105+
@pytest.fixture
106+
async def mock_celery_worker(
107+
app_environment: EnvVarsDict,
108+
celery_app: Celery,
109+
fastapi_app: FastAPI,
110+
monkeypatch: pytest.MonkeyPatch,
111+
) -> AsyncIterator[Any]:
112+
monkeypatch.setenv("NOTIFICATIONS_WORKER_MODE", "true")
113+
app_settings = ApplicationSettings.create_from_envs()
114+
115+
def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
116+
assert app_settings.NOTIFICATIONS_CELERY # nosec
117+
return partial(
118+
on_worker_init,
119+
FastAPIAppServer(app=fastapi_app),
120+
app_settings.NOTIFICATIONS_CELERY,
121+
)(sender, **_kwargs)
122+
123+
worker_init.connect(_on_worker_init_wrapper)
124+
worker_shutdown.connect(on_worker_shutdown)
125+
126+
setup_worker_tasks(celery_app)
127+
128+
with start_worker(
129+
celery_app,
130+
pool="threads",
131+
concurrency=1,
132+
loglevel="debug",
133+
perform_ping_check=False,
134+
queues=",".join(queue.value for queue in TaskQueue),
135+
) as worker:
136+
yield worker
137+
138+
66139
@pytest.fixture
67-
def test_client(initialized_app: FastAPI) -> TestClient:
68-
return TestClient(initialized_app)
140+
async def notifications_rabbitmq_rpc_client(
141+
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
142+
) -> RabbitMQRPCClient:
143+
rpc_client = await rabbitmq_rpc_client("pytest_notifications_rpc_client")
144+
assert rpc_client
145+
return rpc_client

services/notifications/tests/unit/test_tasks.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
from celery.contrib.testing.worker import TestWorkController
2-
from fastapi import FastAPI
1+
import pytest
32
from models_library.rpc.notifications.messages import NotificationMessage
43
from servicelib.rabbitmq import RabbitMQRPCClient
54
from servicelib.rabbitmq.rpc_interfaces.notifications.messages import (
@@ -13,10 +12,13 @@
1312
]
1413

1514

15+
@pytest.mark.usefixtures(
16+
"mock_celery_app",
17+
"mock_celery_worker",
18+
"fastapi_app",
19+
)
1620
async def test_send_email(
17-
initialized_app: FastAPI,
1821
notifications_rabbitmq_rpc_client: RabbitMQRPCClient,
19-
with_celery_worker: TestWorkController,
2022
):
2123
await send_notification_message(
2224
notifications_rabbitmq_rpc_client,

0 commit comments

Comments
 (0)