|
1 | | -import asyncio |
| 1 | +import logging |
| 2 | +from asyncio import Task |
| 3 | +from datetime import timedelta |
2 | 4 | from typing import Annotated, Any, Final |
3 | 5 |
|
4 | 6 | from fastapi import APIRouter, Depends, FastAPI |
5 | 7 | from fastapi.responses import StreamingResponse |
6 | 8 | from fastui import AnyComponent, FastUI |
7 | 9 | from fastui import components as c |
8 | 10 | from fastui.events import PageEvent |
| 11 | +from servicelib.background_task import start_periodic_task, stop_periodic_task |
9 | 12 | from servicelib.fastapi.app_state import SingletonInAppStateMixin |
10 | 13 | from starlette import status |
11 | 14 |
|
| 15 | +from ...services.service_tracker import get_all_tracked_services |
12 | 16 | from ..dependencies import get_app |
13 | 17 | from ._constants import API_ROOT_PATH |
14 | 18 | from ._sse_utils import ( |
|
17 | 21 | update_renderer_items, |
18 | 22 | ) |
19 | 23 |
|
| 24 | +_logger = logging.getLogger(__name__) |
| 25 | + |
20 | 26 | _PREFIX: Final[str] = "/services" |
21 | 27 |
|
22 | 28 | router = APIRouter() |
@@ -72,38 +78,46 @@ async def not_found(): |
72 | 78 | return {"message": "Not Found"} |
73 | 79 |
|
74 | 80 |
|
75 | | -class MockMessagesProvider(SingletonInAppStateMixin): |
76 | | - app_state_name: str = "mock_messages_provider" |
| 81 | +class ServicesStatusRetriever(SingletonInAppStateMixin): |
| 82 | + app_state_name: str = "services_status_retriever" |
77 | 83 |
|
78 | | - def __init__(self, app: FastAPI) -> None: |
| 84 | + def __init__(self, app: FastAPI, poll_interval: timedelta) -> None: |
79 | 85 | self.app = app |
80 | | - self._task: asyncio.Task | None = None |
| 86 | + self.poll_interval = poll_interval |
| 87 | + |
| 88 | + self._task: Task | None = None |
81 | 89 |
|
82 | | - async def _publish_mock_data(self) -> None: |
83 | | - messages: list[Any] = [] |
84 | | - while True: |
85 | | - await asyncio.sleep(3) |
| 90 | + async def _task_service_state_retrieval(self) -> None: |
| 91 | + all_tracked_services = await get_all_tracked_services(self.app) |
86 | 92 |
|
87 | | - messages.append({"name": "a", "surname": "b"}) |
88 | | - await update_renderer_items( |
89 | | - self.app, renderer_type=ServicesSSERenderer, items=messages |
90 | | - ) |
| 93 | + items = sorted(all_tracked_services.items(), reverse=True) |
| 94 | + _logger.error(f"PROPAGATING: {items=}") |
| 95 | + await update_renderer_items( |
| 96 | + self.app, renderer_type=ServicesSSERenderer, items=items |
| 97 | + ) |
91 | 98 |
|
92 | 99 | def startup(self) -> None: |
93 | | - self._task = asyncio.create_task(self._publish_mock_data()) |
| 100 | + self._task = start_periodic_task( |
| 101 | + self._task_service_state_retrieval, |
| 102 | + interval=self.poll_interval, |
| 103 | + task_name="sse_periodic_status_poll", |
| 104 | + ) |
94 | 105 |
|
95 | 106 | async def shutdown(self) -> None: |
96 | 107 | if self._task: |
97 | | - self._task.cancel() |
98 | | - await self._task |
| 108 | + await stop_periodic_task(self._task, timeout=5) |
99 | 109 |
|
100 | 110 |
|
101 | 111 | def setup_services(app: FastAPI) -> None: |
102 | 112 | async def on_startup() -> None: |
103 | | - MockMessagesProvider.get_from_app_state(app).startup() |
| 113 | + status_retriever = ServicesStatusRetriever( |
| 114 | + app, poll_interval=timedelta(seconds=1) |
| 115 | + ) |
| 116 | + status_retriever.set_to_app_state(app) |
| 117 | + status_retriever.startup() |
104 | 118 |
|
105 | 119 | async def on_shutdown() -> None: |
106 | | - await MockMessagesProvider.get_from_app_state(app).shutdown() |
| 120 | + await ServicesStatusRetriever.get_from_app_state(app).shutdown() |
107 | 121 |
|
108 | 122 | app.add_event_handler("startup", on_startup) |
109 | 123 | app.add_event_handler("shutdown", on_shutdown) |
0 commit comments