Skip to content

Commit b3ee27a

Browse files
author
Andrei Neagu
committed
added user_notifications base
1 parent 1dd6de2 commit b3ee27a

File tree

4 files changed

+114
-15
lines changed

4 files changed

+114
-15
lines changed

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
from models_library.rabbitmq_messages import (
88
EventRabbitMessage,
99
LoggerRabbitMessage,
10+
ProgressMessageMixin,
1011
ProgressRabbitMessageNode,
1112
ProgressRabbitMessageProject,
13+
ProgressRabbitMessageWorkerJob,
1214
ProgressType,
1315
WalletCreditsMessage,
1416
)
@@ -29,7 +31,11 @@
2931
send_message_to_standard_group,
3032
send_message_to_user,
3133
)
32-
from ..socketio.models import WebSocketNodeProgress, WebSocketProjectProgress
34+
from ..socketio.models import (
35+
WebSocketNodeProgress,
36+
WebSocketProjectProgress,
37+
WebSocketWorkerJobProgress,
38+
)
3339
from ..wallets import api as wallets_api
3440
from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq
3541

@@ -65,20 +71,29 @@ async def _convert_to_node_update_event(
6571

6672

6773
async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
68-
rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = (
69-
TypeAdapter(
70-
ProgressRabbitMessageNode | ProgressRabbitMessageProject
71-
).validate_json(data)
74+
rabbit_message: (
75+
ProgressRabbitMessageNode
76+
| ProgressRabbitMessageProject
77+
| ProgressRabbitMessageWorkerJob
78+
) = TypeAdapter(
79+
ProgressRabbitMessageNode
80+
| ProgressRabbitMessageProject
81+
| ProgressRabbitMessageWorkerJob
82+
).validate_json(
83+
data
7284
)
85+
7386
message: SocketMessageDict | None = None
7487
if isinstance(rabbit_message, ProgressRabbitMessageProject):
7588
message = WebSocketProjectProgress.from_rabbit_message(
7689
rabbit_message
7790
).to_socket_dict()
78-
91+
elif isinstance(rabbit_message, ProgressRabbitMessageWorkerJob):
92+
message = WebSocketWorkerJobProgress.from_rabbit_message(
93+
rabbit_message
94+
).to_socket_dict()
7995
elif rabbit_message.progress_type is ProgressType.COMPUTATION_RUNNING:
8096
message = await _convert_to_node_update_event(app, rabbit_message)
81-
8297
else:
8398
message = WebSocketNodeProgress.from_rabbit_message(
8499
rabbit_message
@@ -156,7 +171,7 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b
156171
{"topics": []},
157172
),
158173
SubcribeArgumentsTuple(
159-
ProgressRabbitMessageNode.get_channel_name(),
174+
ProgressMessageMixin.get_channel_name(),
160175
_progress_message_parser,
161176
{"topics": []},
162177
),
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from typing import Final
3+
4+
from aiohttp import web
5+
from models_library.rabbitmq_messages import (
6+
ProgressRabbitMessageWorkerJob,
7+
RabbitMessageBase,
8+
)
9+
from models_library.users import UserID
10+
from servicelib.logging_utils import log_catch
11+
from servicelib.rabbitmq import RabbitMQClient
12+
13+
from ..rabbitmq import get_rabbitmq_client
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
_SUBSCRIBABLE_EXCHANGES: Final[list[type[RabbitMessageBase]]] = [
19+
ProgressRabbitMessageWorkerJob,
20+
]
21+
22+
23+
async def subscribe(app: web.Application, user_id: UserID) -> None:
24+
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
25+
26+
for exchange in _SUBSCRIBABLE_EXCHANGES:
27+
exchange_name = exchange.get_channel_name()
28+
await rabbit_client.add_topics(exchange_name, topics=[f"{user_id}.*"])
29+
30+
31+
async def unsubscribe(app: web.Application, user_id: UserID) -> None:
32+
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
33+
for exchange in _SUBSCRIBABLE_EXCHANGES:
34+
exchange_name = exchange.get_channel_name()
35+
with log_catch(_logger, reraise=False):
36+
# NOTE: in case something bad happenned with the connection to the RabbitMQ server
37+
# such as a network disconnection. this call can fail.
38+
await rabbit_client.remove_topics(exchange_name, topics=[f"{user_id}"])
39+
40+
41+
# TODO: subscribe when logging in
42+
# TODO: unsubscribe when logging out

services/web/server/src/simcore_service_webserver/socketio/models.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from abc import abstractmethod
2-
from typing import Literal
2+
from typing import Literal, Self
33

44
from models_library.progress_bar import ProgressReport
55
from models_library.projects import ProjectID
66
from models_library.projects_nodes_io import NodeID
77
from models_library.rabbitmq_messages import (
88
ProgressRabbitMessageNode,
99
ProgressRabbitMessageProject,
10+
ProgressRabbitMessageWorkerJob,
1011
ProgressType,
1112
)
1213
from models_library.socketio import SocketMessageDict
@@ -56,9 +57,7 @@ class WebSocketProjectProgress(
5657
event_type: Literal["projectProgress"] = "projectProgress"
5758

5859
@classmethod
59-
def from_rabbit_message(
60-
cls, message: ProgressRabbitMessageProject
61-
) -> "WebSocketProjectProgress":
60+
def from_rabbit_message(cls, message: ProgressRabbitMessageProject) -> Self:
6261
return cls.model_construct(
6362
user_id=message.user_id,
6463
project_id=message.project_id,
@@ -83,9 +82,7 @@ class WebSocketNodeProgress(
8382
event_type: Literal["nodeProgress"] = "nodeProgress"
8483

8584
@classmethod
86-
def from_rabbit_message(
87-
cls, message: ProgressRabbitMessageNode
88-
) -> "WebSocketNodeProgress":
85+
def from_rabbit_message(cls, message: ProgressRabbitMessageNode) -> Self:
8986
return cls.model_construct(
9087
user_id=message.user_id,
9188
project_id=message.project_id,
@@ -99,3 +96,25 @@ def to_socket_dict(self) -> SocketMessageDict:
9996
event_type=self.event_type,
10097
data=jsonable_encoder(self, exclude={"event_type"}),
10198
)
99+
100+
101+
class WebSocketWorkerJobProgress(
102+
_WebSocketUserMixin,
103+
_WebSocketProgressMixin,
104+
WebSocketMessageBase,
105+
):
106+
event_type: Literal["workerJobProgress"] = "workerJobProgress"
107+
108+
@classmethod
109+
def from_rabbit_message(cls, message: ProgressRabbitMessageWorkerJob) -> Self:
110+
return cls.model_construct(
111+
user_id=message.user_id,
112+
progress_type=message.progress_type,
113+
progress_report=message.report,
114+
)
115+
116+
def to_socket_dict(self) -> SocketMessageDict:
117+
return SocketMessageDict(
118+
event_type=self.event_type,
119+
data=jsonable_encoder(self, exclude={"event_type"}),
120+
)

services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from models_library.rabbitmq_messages import (
1313
ProgressRabbitMessageNode,
1414
ProgressRabbitMessageProject,
15+
ProgressRabbitMessageWorkerJob,
1516
ProgressType,
1617
)
1718
from models_library.socketio import SocketMessageDict
@@ -75,6 +76,28 @@
7576
),
7677
id="project_progress",
7778
),
79+
pytest.param(
80+
ProgressRabbitMessageWorkerJob(
81+
user_id=123,
82+
progress_type=ProgressType.WORKER_JOB_EXPORTING,
83+
report=ProgressReport(actual_value=0.4, total=1),
84+
).model_dump_json(),
85+
SocketMessageDict(
86+
event_type=WebSocketNodeProgress.get_event_type(),
87+
data={
88+
"user_id": 123,
89+
"progress_type": ProgressType.WORKER_JOB_EXPORTING.value,
90+
"progress_report": {
91+
"actual_value": 0.4,
92+
"attempt": 0,
93+
"total": 1.0,
94+
"unit": None,
95+
"message": None,
96+
},
97+
},
98+
),
99+
id="worker_job_progress",
100+
),
78101
],
79102
)
80103
async def test_regression_progress_message_parser(

0 commit comments

Comments
 (0)