Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ def routing_key(self) -> str | None:
return None


class WebserverInternalEventRabbitMessageAction(str, Enum):
UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE = (
"UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE"
)


class WebserverInternalEventRabbitMessage(RabbitMessageBase):
channel_name: Literal["simcore.services.webserver_internal_events"] = (
"simcore.services.webserver_internal_events"
)
action: WebserverInternalEventRabbitMessageAction
data: dict[str, Any] = Field(default_factory=dict)

def routing_key(self) -> str | None:
return None


class ProgressType(StrAutoEnum):
COMPUTATION_RUNNING = auto() # NOTE: this is the original only progress report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from aiohttp import web
from models_library.groups import GroupID
from models_library.projects import ProjectID
from models_library.projects_state import RUNNING_STATE_COMPLETED_STATES
from models_library.rabbitmq_messages import (
ComputationalPipelineStatusMessage,
Expand All @@ -15,6 +16,8 @@
ProgressRabbitMessageProject,
ProgressType,
WalletCreditsMessage,
WebserverInternalEventRabbitMessage,
WebserverInternalEventRabbitMessageAction,
)
from models_library.socketio import SocketMessageDict
from pydantic import TypeAdapter
Expand All @@ -34,6 +37,7 @@
)
from ..socketio.models import WebSocketNodeProgress, WebSocketProjectProgress
from ..wallets import api as wallets_service
from . import project_logs
from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -151,6 +155,44 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool:
return True


async def _webserver_internal_events_message_parser(
app: web.Application, data: bytes
) -> bool:
"""
Handles internal webserver events that need to be propagated to other webserver replicas

Ex. Log unsubscription is triggered by user closing a project, which is a REST API call
that can reach any webserver replica. Then this event is propagated to all replicas
so that the one holding the websocket connection can unsubscribe from the logs queue.
"""

rabbit_message = WebserverInternalEventRabbitMessage.model_validate_json(data)

if (
rabbit_message.action
== WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE
):
_project_id = rabbit_message.data.get("project_id")

if _project_id:
_logger.debug(
"Received UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE event for project %s",
_project_id,
)
await project_logs.unsubscribe(app, ProjectID(_project_id))
else:
_logger.exception(
"Missing project_id in UNSUBSCRIBE_FROM_LOGS_RABBIT_QUEUE event, this should never happen, investigate!"
)

else:
_logger.warning(
"Unknown webserver internal event message action %s", rabbit_message.action
)

return True


async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message = TypeAdapter(WalletCreditsMessage).validate_json(data)
wallet_groups = await wallets_service.list_wallet_groups_with_read_access_by_wallet(
Expand Down Expand Up @@ -191,6 +233,11 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b
_events_message_parser,
{},
),
SubcribeArgumentsTuple(
WebserverInternalEventRabbitMessage.get_channel_name(),
_webserver_internal_events_message_parser,
{},
),
SubcribeArgumentsTuple(
WalletCreditsMessage.get_channel_name(),
_osparc_credits_message_parser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
managed_resource,
)
from .._projects_service import (
conditionally_unsubscribe_from_project_logs,
conditionally_unsubscribe_project_logs_across_replicas,
retrieve_and_notify_project_locked_state,
)

Expand Down Expand Up @@ -83,7 +83,7 @@ async def _on_user_disconnected(
)

for _project_id in projects: # At the moment, only 1 is expected
await conditionally_unsubscribe_from_project_logs(
await conditionally_unsubscribe_project_logs_across_replicas(
app, ProjectID(_project_id), user_id
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ...users import users_service
from ...utils_aiohttp import envelope_json_response, get_api_base_url
from .. import _projects_service, projects_wallets_service
from .._projects_service import conditionally_unsubscribe_from_project_logs
from .._projects_service import conditionally_unsubscribe_project_logs_across_replicas
from ..exceptions import ProjectStartsTooManyDynamicNodesError
from ._rest_exceptions import handle_plugin_requests_exceptions
from ._rest_schemas import AuthenticatedRequestContext, ProjectPathParams
Expand Down Expand Up @@ -223,7 +223,7 @@ async def close_project(request: web.Request) -> web.Response:
),
)

await conditionally_unsubscribe_from_project_logs(
await conditionally_unsubscribe_project_logs_across_replicas(
request.app, path_params.project_id, req_ctx.user_id
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
ProjectStatus,
RunningState,
)
from models_library.rabbitmq_messages import (
WebserverInternalEventRabbitMessage,
WebserverInternalEventRabbitMessageAction,
)
from models_library.resource_tracker import (
HardwareInfo,
PricingAndHardwareInfoTuple,
Expand Down Expand Up @@ -115,9 +119,8 @@
from ..director_v2 import director_v2_service
from ..dynamic_scheduler import api as dynamic_scheduler_service
from ..models import ClientSessionID
from ..notifications import project_logs
from ..products import products_web
from ..rabbitmq import get_rabbitmq_rpc_client
from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client
from ..redis import get_redis_lock_manager_client_sdk
from ..resource_manager.models import UserSession
from ..resource_manager.registry import get_registry
Expand Down Expand Up @@ -187,19 +190,26 @@
_logger = logging.getLogger(__name__)


async def conditionally_unsubscribe_from_project_logs(
async def conditionally_unsubscribe_project_logs_across_replicas(
app: web.Application, project_id: ProjectID, user_id: UserID
) -> None:
"""
Unsubscribes from project logs only if no active socket connections remain for the project.

This function checks for actual socket connections rather than just user sessions,
ensuring logs are only unsubscribed when truly no users are connected.
ensuring logs are only unsubscribed when truly no users are connected. When no active
connections are detected, an unsubscribe event is sent via RabbitMQ to all webserver
replicas to coordinate the unsubscription across the entire cluster.

Note: With multiple webserver replicas, this ensures we don't unsubscribe until
the last socket is closed. However, another replica may still maintain an active
subscription even if no users are connected to it, as the RabbitMQ event ensures
all replicas receive the unsubscription notification.

Args:
app: The web application instance
project_id: The project ID to check
user_id: Optional user ID to use for the resource session (defaults to 0 if None)
project_id: The project ID to check for active connections
user_id: User ID to use for the resource session management
"""
redis_resource_registry = get_registry(app)
with managed_resource(user_id, None, app) as user_session:
Expand All @@ -221,7 +231,17 @@ async def conditionally_unsubscribe_from_project_logs(
# the last socket is closed, though another replica may still maintain an active
# subscription even if no users are connected to it.
if actually_used_sockets_on_project == 0:
await project_logs.unsubscribe(app, project_id)
rabbitmq_client = get_rabbitmq_client(app)
message = WebserverInternalEventRabbitMessage(
action=WebserverInternalEventRabbitMessageAction.UNSUBSCRIBE_FROM_PROJECT_LOGS_RABBIT_QUEUE,
data={"project_id": f"{project_id}"},
)
_logger.debug(
"No active socket connections detected for project %s by user %s. Sending unsubscribe event to all replicas.",
project_id,
user_id,
)
await rabbitmq_client.publish(message.channel_name, message)


async def patch_project_and_notify_users(
Expand Down Expand Up @@ -1569,7 +1589,7 @@ async def _leave_project_room(
sio = get_socket_server(app)
await sio.leave_room(socket_id, SocketIORoomStr.from_project_id(project_uuid))
else:
_logger.error(
_logger.warning(
"User %s/%s has no socket_id, cannot leave project room %s",
user_id,
client_session_id,
Expand Down
Loading