Skip to content

Commit 6f63769

Browse files
author
Andrei Neagu
committed
added reusable sse utils
1 parent ccc3ea7 commit 6f63769

File tree

3 files changed

+106
-18
lines changed

3 files changed

+106
-18
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/ui/_services.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import asyncio
2-
from collections.abc import AsyncIterable
31
from typing import Annotated, Final
42

53
from fastapi import APIRouter, Depends, FastAPI
@@ -11,13 +9,13 @@
119

1210
from ..dependencies import get_app
1311
from ._constants import API_ROOT_PATH
12+
from ._sse_utils import AbstractSSERenderer, render_as_sse_items
1413

1514
_PREFIX: Final[str] = "/services"
1615

1716
router = APIRouter()
1817

1918

20-
# root entrypoint for the application
2119
@router.get(
2220
f"{API_ROOT_PATH}/", response_model=FastUI, response_model_exclude_none=True
2321
)
@@ -47,24 +45,20 @@ def api_index() -> list[AnyComponent]:
4745
]
4846

4947

50-
# SSE endpoint
48+
class ServicesSSERenderer(AbstractSSERenderer):
49+
@staticmethod
50+
def render_item(item: str) -> list[AnyComponent]:
51+
return item
52+
53+
5154
@router.get(f"{API_ROOT_PATH}{_PREFIX}/sse/")
5255
async def sse_ai_response(
5356
app: Annotated[FastAPI, Depends(get_app)]
5457
) -> StreamingResponse:
55-
return StreamingResponse(_render_messages(app), media_type="text/event-stream")
56-
57-
58-
async def _render_messages(app: FastAPI) -> AsyncIterable[str]:
59-
_ = app # TODO: fetch storage and render content from here
60-
messages: list[AnyComponent] = []
61-
# Avoid the browser reconnecting
62-
while True:
63-
# TODO: yield only if content changed, store a hash of the messages
64-
messages.append(c.Markdown(text="# LOL \n this is it!"))
65-
await asyncio.sleep(3)
66-
message = FastUI(root=messages)
67-
yield f"data: {message.model_dump_json(by_alias=True, exclude_none=True)}\n\n"
58+
return StreamingResponse(
59+
render_as_sse_items(app, renderer_type=ServicesSSERenderer),
60+
media_type="text/event-stream",
61+
)
6862

6963

7064
@router.get("/{path:path}", status_code=status.HTTP_404_NOT_FOUND)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import asyncio
2+
import json
3+
from abc import ABC, abstractmethod
4+
from collections.abc import AsyncIterable
5+
from typing import Any, TypeAlias
6+
from weakref import WeakSet
7+
8+
from fastapi import FastAPI
9+
from fastui import AnyComponent, FastUI
10+
from pydantic import NonNegativeFloat, TypeAdapter
11+
from servicelib.fastapi.app_state import SingletonInAppStateMixin
12+
13+
UpdateID: TypeAlias = int
14+
15+
16+
class AbstractSSERenderer(ABC):
17+
def __init__(self) -> None:
18+
self._items: list[Any] = []
19+
20+
def update(self, items: list[Any]) -> None:
21+
self._items = items
22+
23+
def _get_update_id(self) -> UpdateID:
24+
return hash(json.dumps(TypeAdapter(list[Any]).validate_python(self._items)))
25+
26+
def changes_detected(self, last_update_id: UpdateID) -> bool:
27+
return last_update_id != self._get_update_id()
28+
29+
@staticmethod
30+
@abstractmethod
31+
def render_item(item: Any) -> AnyComponent:
32+
"""return a rendered component to display"""
33+
34+
def get_messages(self) -> tuple[UpdateID, list[AnyComponent]]:
35+
return self._get_update_id(), [self.render_item(x) for x in self._items]
36+
37+
38+
class RendererManager(SingletonInAppStateMixin):
39+
app_state_name: str = "renderer_manager"
40+
"""Allows to register SSE renderers and distribute data based on type"""
41+
42+
def __init__(self) -> None:
43+
self._renderers: dict[
44+
type[AbstractSSERenderer], WeakSet[AbstractSSERenderer]
45+
] = {}
46+
47+
def register_renderer(self, renderer: AbstractSSERenderer) -> None:
48+
"""NOTE: there is no reason to unregister anything due to WeakSet tracking"""
49+
renderer_type = type(renderer)
50+
51+
if renderer_type not in self._renderers:
52+
self._renderers[renderer_type] = WeakSet()
53+
54+
self._renderers[renderer_type].add(renderer)
55+
56+
def update_renderer(
57+
self, renderer_type: type[AbstractSSERenderer], items: list[Any]
58+
) -> None:
59+
"""propagate updates to all instances of said type SSERenderer"""
60+
for renderer in self._renderers[renderer_type]:
61+
renderer.update(items)
62+
63+
64+
async def render_as_sse_items(
65+
app: FastAPI,
66+
*,
67+
renderer_type: type[AbstractSSERenderer],
68+
messages_check_interval: NonNegativeFloat = 3,
69+
) -> AsyncIterable[str]:
70+
"""used by the sse endpoint to render the content as it changes"""
71+
72+
manager = RendererManager.get_from_app_state(app)
73+
renderer = renderer_type()
74+
manager.register_renderer(renderer)
75+
76+
update_id, messages = renderer.get_messages()
77+
78+
# Avoid the browser reconnecting
79+
while True:
80+
if renderer.changes_detected(last_update_id=update_id):
81+
yield f"data: {FastUI(root=messages).model_dump_json(by_alias=True, exclude_none=True)}\n\n"
82+
83+
await asyncio.sleep(messages_check_interval)
84+
update_id, messages = renderer.get_messages()
85+
86+
87+
def setup_sse(app: FastAPI) -> None:
88+
async def on_startup() -> None:
89+
renderer_manager = RendererManager()
90+
renderer_manager.set_to_app_state(app)
91+
92+
app.add_event_handler("startup", on_startup)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/ui/routes.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33

44
from fastapi import FastAPI
55

6-
from . import _index, _services
6+
from . import _index, _services, _sse_utils
77
from ._constants import UI_MOUNT_PREFIX
88

99
_TAGS: Final[list[str | Enum]] = ["FastUI"]
1010

1111

1212
def setup_ui_api(app: FastAPI) -> None:
13+
_sse_utils.setup_sse(app)
14+
1315
app.include_router(_services.router, prefix=UI_MOUNT_PREFIX, tags=_TAGS)
1416

1517
# keep as last entry

0 commit comments

Comments
 (0)