Skip to content

Commit ca423cf

Browse files
fix: startup
1 parent 92b2714 commit ca423cf

File tree

8 files changed

+76
-17
lines changed

8 files changed

+76
-17
lines changed
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
from fastapi import FastAPI
1+
from servicelib.celery.task_manager import TaskManager
22
from servicelib.rabbitmq import RPCRouter
33

4-
from ...models.schemas import NotificationMessage
4+
from ...models.schemas import NotificationMessage, Recipient
55
from ...services import notifications_service
66

77
router = RPCRouter()
88

99

1010
@router.expose(reraise_if_error_type=())
1111
async def send_notification_message(
12-
app: FastAPI,
12+
task_manager: TaskManager,
1313
*,
1414
message: NotificationMessage,
15+
recipients: list[Recipient],
1516
) -> None:
16-
await notifications_service.send_notification_message(message=message)
17+
await notifications_service.send_notification(
18+
task_manager, message=message, recipients=recipients
19+
)

services/notifications/src/simcore_service_notifications/api/rpc/routes.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from models_library.api_schemas_notifications import NOTIFICATIONS_RPC_NAMESPACE
66
from servicelib.rabbitmq import RPCRouter
77

8+
from ...clients.celery import get_task_manager_from_app
89
from ...clients.rabbitmq import get_rabbitmq_rpc_server
910
from . import _notifications
1011

@@ -13,10 +14,10 @@
1314

1415
async def rpc_api_routes_lifespan(app: FastAPI) -> AsyncIterator[State]:
1516
rpc_server = get_rabbitmq_rpc_server(app)
16-
17+
task_manager = get_task_manager_from_app(app)
1718
for router in ROUTERS:
1819
await rpc_server.register_router(
19-
router, NOTIFICATIONS_RPC_NAMESPACE, app
20+
router, NOTIFICATIONS_RPC_NAMESPACE, task_manager=task_manager
2021
) # pragma: no cover
2122

2223
yield {}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import logging
2+
from collections.abc import AsyncIterator
3+
4+
from celery_library.common import create_app, create_task_manager
5+
from celery_library.task_manager import CeleryTaskManager
6+
from celery_library.types import register_celery_types, register_pydantic_types
7+
from fastapi import FastAPI
8+
from fastapi_lifespan_manager import State
9+
from settings_library.celery import CelerySettings
10+
11+
from ..core.settings import ApplicationSettings
12+
from ..models.schemas import EmailRecipient, NotificationMessage, SMSRecipient
13+
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
async def celery_lifespan(app: FastAPI) -> AsyncIterator[State]:
18+
settings: ApplicationSettings = app.state.settings
19+
if settings.NOTIFICATIONS_CELERY and not settings.NOTIFICATIONS_WORKER_MODE:
20+
celery_settings: CelerySettings = settings.NOTIFICATIONS_CELERY
21+
22+
app.state.task_manager = await create_task_manager(
23+
create_app(celery_settings), celery_settings
24+
)
25+
26+
register_celery_types()
27+
register_pydantic_types(NotificationMessage, EmailRecipient, SMSRecipient)
28+
yield {}
29+
30+
31+
def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager:
32+
assert hasattr(app.state, "task_manager") # nosec
33+
task_manager = app.state.task_manager
34+
assert isinstance(task_manager, CeleryTaskManager) # nosec
35+
return task_manager

services/notifications/src/simcore_service_notifications/clients/rabbitmq.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async def rabbitmq_lifespan(app: FastAPI) -> AsyncIterator[State]:
1717
await wait_till_rabbitmq_responsive(rabbit_settings.dsn)
1818

1919
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
20-
client_name="dynamic_scheduler_rpc_server", settings=rabbit_settings
20+
client_name="notifications_rpc_server", settings=rabbit_settings
2121
)
2222

2323
yield {}

services/notifications/src/simcore_service_notifications/core/events.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
1515
from ..api.rpc.routes import rpc_api_routes_lifespan
16+
from ..clients.celery import celery_lifespan
1617
from ..clients.postgres import postgres_lifespan
1718
from ..clients.rabbitmq import rabbitmq_lifespan
1819
from .settings import ApplicationSettings
@@ -47,6 +48,9 @@ def create_app_lifespan():
4748
# - rabbitmq
4849
app_lifespan.add(rabbitmq_lifespan)
4950

51+
# - celery
52+
app_lifespan.add(celery_lifespan)
53+
5054
# - rpc api routes
5155
app_lifespan.add(rpc_api_routes_lifespan)
5256

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
import logging
2+
3+
from celery import Task # type: ignore[import-untyped]
4+
15
from ...models.schemas import EmailRecipient, NotificationMessage
26

7+
_logger = logging.getLogger(__name__)
8+
39

4-
async def send_email_notification(
5-
message: NotificationMessage, recipient: EmailRecipient
10+
async def send_email(
11+
task: Task,
12+
message: NotificationMessage,
13+
recipient: EmailRecipient,
614
) -> None:
7-
pass
15+
# TODO: render email template with message and recipient details
16+
# and send the email using an email service
17+
_logger.info(f"Sending email notification to {recipient.email}")
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
import logging
22

33
from celery import Celery # type: ignore[import-untyped]
4-
from celery_library.types import register_celery_types
4+
from celery_library.task import register_task
5+
from celery_library.types import register_celery_types, register_pydantic_types
56
from servicelib.logging_utils import log_context
67

8+
from ...models.schemas import NotificationMessage, SMSRecipient
9+
from ...modules.celery._email_tasks import EmailRecipient, send_email
10+
711
_logger = logging.getLogger(__name__)
812

913

1014
def setup_worker_tasks(app: Celery) -> None:
1115
register_celery_types()
12-
# TODO: add more types as needed
13-
# register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody)
16+
register_pydantic_types(NotificationMessage, EmailRecipient, SMSRecipient)
1417

1518
with log_context(_logger, logging.INFO, msg="worker tasks registration"):
16-
...
17-
# TODO: register tasks here
18-
# register_task(app, send_email_notification)
19+
register_task(app, send_email)
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
from servicelib.celery.task_manager import TaskManager
2+
13
from ..models.schemas import NotificationMessage, Recipient
24

35

46
async def send_notification(
5-
message: NotificationMessage, *recipients: list[Recipient]
7+
task_manager: TaskManager,
8+
*,
9+
message: NotificationMessage,
10+
recipients: list[Recipient],
611
) -> None: ...

0 commit comments

Comments
 (0)