Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.api_schemas_webserver.socketio import SocketIORoomStr
from models_library.users import UserID
from models_library.projects import ProjectID
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.services_utils import get_status_as_dict

Expand All @@ -23,20 +23,22 @@ def __init__(self, sio_manager: socketio.AsyncAioPikaManager):
self._sio_manager = sio_manager

async def notify_service_status(
self, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle
self, project_id: ProjectID, status: NodeGet | DynamicServiceGet | NodeGetIdle
) -> None:
await self._sio_manager.emit(
SOCKET_IO_SERVICE_STATUS_EVENT,
data=jsonable_encoder(get_status_as_dict(status)),
room=SocketIORoomStr.from_user_id(user_id),
room=SocketIORoomStr.from_project_id(project_id),
)


async def notify_service_status_change(
app: FastAPI, user_id: UserID, status: NodeGet | DynamicServiceGet | NodeGetIdle
app: FastAPI,
project_id: ProjectID,
status: NodeGet | DynamicServiceGet | NodeGetIdle,
) -> None:
notifier: Notifier = Notifier.get_from_app_state(app)
await notifier.notify_service_status(user_id=user_id, status=status)
await notifier.notify_service_status(project_id=project_id, status=status)


async def lifespan(app: FastAPI) -> AsyncIterator[State]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from ._api import (
NORMAL_RATE_POLL_INTERVAL,
get_all_tracked_services,
get_project_id_for_service,
get_tracked_service,
get_user_id_for_service,
remove_tracked_service,
set_frontend_notified_for_service,
set_if_status_changed_for_service,
Expand All @@ -17,11 +17,11 @@

__all__: tuple[str, ...] = (
"get_all_tracked_services",
"get_project_id_for_service",
"get_tracked_service",
"get_user_id_for_service",
"service_tracker_lifespan",
"NORMAL_RATE_POLL_INTERVAL",
"remove_tracked_service",
"service_tracker_lifespan",
"set_frontend_notified_for_service",
"set_if_status_changed_for_service",
"set_request_as_running",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_enums import ServiceState
from models_library.users import UserID
from servicelib.deferred_tasks import TaskUID

from ._models import SchedulerServiceState, TrackedServiceModel, UserRequestedState
from ._models import (
SchedulerServiceState,
TrackedServiceModel,
UserRequestedState,
)
from ._setup import get_tracker

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -242,7 +246,7 @@ async def get_all_tracked_services(app: FastAPI) -> dict[NodeID, TrackedServiceM
return await get_tracker(app).all()


async def get_user_id_for_service(app: FastAPI, node_id: NodeID) -> UserID | None:
"""returns user_id for the service"""
async def get_project_id_for_service(app: FastAPI, node_id: NodeID) -> ProjectID | None:
"""returns project_id for the service"""
model: TrackedServiceModel | None = await get_tracker(app).load(node_id)
return model.user_id if model else None
return model.project_id if model else None
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
RunningDynamicServiceDetails,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID
from servicelib.deferred_tasks._base_deferred_handler import DeferredContext

Expand Down Expand Up @@ -69,15 +69,15 @@ async def on_result(
if await service_tracker.should_notify_frontend_for_service(
app, node_id, status_changed=status_changed
):
user_id: UserID | None = await service_tracker.get_user_id_for_service(
app, node_id
project_id: ProjectID | None = (
await service_tracker.get_project_id_for_service(app, node_id)
)
if user_id:
await notify_service_status_change(app, user_id, result)
if project_id:
await notify_service_status_change(app, project_id, result)
await service_tracker.set_frontend_notified_for_service(app, node_id)
else:
_logger.info(
"Did not find a user for '%s', skipping status delivery of: %s",
"Did not find a project for '%s', skipping status delivery of: %s",
node_id,
result,
)
Loading