Skip to content

Commit e6f5a5f

Browse files
author
Andrei Neagu
committed
added base example to render service status
1 parent 3b6ee6f commit e6f5a5f

File tree

2 files changed

+61
-26
lines changed

2 files changed

+61
-26
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/ui/_services.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Annotated, Final
1+
from typing import Annotated, Any, Final
22

33
from fastapi import APIRouter, Depends, FastAPI
44
from fastapi.responses import StreamingResponse
@@ -9,7 +9,7 @@
99

1010
from ..dependencies import get_app
1111
from ._constants import API_ROOT_PATH
12-
from ._sse_utils import AbstractSSERenderer, render_as_sse_items
12+
from ._sse_utils import AbstractSSERenderer, render_items_on_change
1313

1414
_PREFIX: Final[str] = "/services"
1515

@@ -47,16 +47,16 @@ def api_index() -> list[AnyComponent]:
4747

4848
class ServicesSSERenderer(AbstractSSERenderer):
4949
@staticmethod
50-
def render_item(item: str) -> list[AnyComponent]:
51-
return item
50+
def render_item(item: Any) -> AnyComponent:
51+
return c.Paragraph(text=f"{item}")
5252

5353

5454
@router.get(f"{API_ROOT_PATH}{_PREFIX}/sse/")
5555
async def sse_ai_response(
5656
app: Annotated[FastAPI, Depends(get_app)]
5757
) -> StreamingResponse:
5858
return StreamingResponse(
59-
render_as_sse_items(app, renderer_type=ServicesSSERenderer),
59+
render_items_on_change(app, renderer_type=ServicesSSERenderer),
6060
media_type="text/event-stream",
6161
)
6262

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/ui/_sse_utils.py

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,36 @@
77

88
from fastapi import FastAPI
99
from fastui import AnyComponent, FastUI
10-
from pydantic import NonNegativeFloat, TypeAdapter
10+
from pydantic import NonNegativeFloat
1111
from servicelib.fastapi.app_state import SingletonInAppStateMixin
1212

1313
UpdateID: TypeAlias = int
1414

1515

1616
class AbstractSSERenderer(ABC):
17-
def __init__(self) -> None:
17+
def __init__(self, app: FastAPI) -> None:
18+
self.app = app
1819
self._items: list[Any] = []
20+
self._hash = self._get_items_hash()
21+
22+
async def __aenter__(self):
23+
await RendererManager.get_from_app_state(self.app).register_renderer(self)
24+
return self
25+
26+
async def __aexit__(self, *args):
27+
await RendererManager.get_from_app_state(self.app).unregister_renderer(
28+
type(self), self
29+
)
30+
31+
def _get_items_hash(self) -> int:
32+
return hash(json.dumps(self._items))
1933

2034
def update(self, items: list[Any]) -> None:
2135
self._items = items
36+
self._hash = self._get_items_hash()
2237

2338
def _get_update_id(self) -> UpdateID:
24-
return hash(json.dumps(TypeAdapter(list[Any]).validate_python(self._items)))
39+
return self._hash
2540

2641
def changes_detected(self, last_update_id: UpdateID) -> bool:
2742
return last_update_id != self._get_update_id()
@@ -40,48 +55,68 @@ class RendererManager(SingletonInAppStateMixin):
4055
"""Allows to register SSE renderers and distribute data based on type"""
4156

4257
def __init__(self) -> None:
58+
self._lock = asyncio.Lock()
4359
self._renderers: dict[
4460
type[AbstractSSERenderer], WeakSet[AbstractSSERenderer]
4561
] = {}
4662

47-
def register_renderer(self, renderer: AbstractSSERenderer) -> None:
48-
"""NOTE: there is no reason to unregister anything due to WeakSet tracking"""
63+
async def register_renderer(self, renderer: AbstractSSERenderer) -> None:
4964
renderer_type = type(renderer)
5065

5166
if renderer_type not in self._renderers:
5267
self._renderers[renderer_type] = WeakSet()
5368

54-
self._renderers[renderer_type].add(renderer)
69+
async with self._lock:
70+
self._renderers[renderer_type].add(renderer)
71+
72+
async def unregister_renderer(
73+
self, renderer_type: type[AbstractSSERenderer], renderer: AbstractSSERenderer
74+
) -> None:
75+
if renderer_type not in self._renderers:
76+
pass
77+
async with self._lock:
78+
self._renderers[renderer_type].remove(renderer)
5579

56-
def update_renderer(
80+
async def update_renderers(
5781
self, renderer_type: type[AbstractSSERenderer], items: list[Any]
5882
) -> None:
5983
"""propagate updates to all instances of said type SSERenderer"""
60-
for renderer in self._renderers[renderer_type]:
61-
renderer.update(items)
84+
if renderer_type not in self._renderers:
85+
return
86+
87+
async with self._lock:
88+
for renderer in self._renderers[renderer_type]:
89+
renderer.update(items)
6290

6391

64-
async def render_as_sse_items(
92+
async def render_items_on_change(
6593
app: FastAPI,
6694
*,
6795
renderer_type: type[AbstractSSERenderer],
68-
messages_check_interval: NonNegativeFloat = 3,
96+
messages_check_interval: NonNegativeFloat = 1,
6997
) -> AsyncIterable[str]:
7098
"""used by the sse endpoint to render the content as it changes"""
7199

72-
manager = RendererManager.get_from_app_state(app)
73-
renderer = renderer_type()
74-
manager.register_renderer(renderer)
100+
async with renderer_type(app) as renderer:
101+
102+
last_update_id, messages = renderer.get_messages()
103+
104+
# Avoid the browser reconnecting
105+
while True:
106+
await asyncio.sleep(messages_check_interval)
107+
108+
update_id, messages = renderer.get_messages()
109+
110+
if renderer.changes_detected(last_update_id=last_update_id):
111+
yield f"data: {FastUI(root=messages).model_dump_json(by_alias=True, exclude_none=True)}\n\n"
75112

76-
update_id, messages = renderer.get_messages()
113+
last_update_id = update_id
77114

78-
# Avoid the browser reconnecting
79-
while True:
80-
if renderer.changes_detected(last_update_id=update_id):
81-
yield f"data: {FastUI(root=messages).model_dump_json(by_alias=True, exclude_none=True)}\n\n"
82115

83-
await asyncio.sleep(messages_check_interval)
84-
update_id, messages = renderer.get_messages()
116+
async def update_items(
117+
app: FastAPI, *, renderer_type: type[AbstractSSERenderer], items: list[Any]
118+
) -> None:
119+
await RendererManager.get_from_app_state(app).update_renderers(renderer_type, items)
85120

86121

87122
def setup_sse(app: FastAPI) -> None:

0 commit comments

Comments
 (0)