Skip to content

Commit dd8161e

Browse files
🎨 Send Socket.IO events whenever conversation messages are created, updated, or deleted (#7941)
1 parent 4c80da2 commit dd8161e

File tree

5 files changed

+250
-3
lines changed

5 files changed

+250
-3
lines changed

services/web/server/src/simcore_service_webserver/conversations/_conversation_message_service.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,65 @@
1111
ConversationMessagePatchDB,
1212
ConversationMessageType,
1313
)
14+
from models_library.projects import ProjectID
1415
from models_library.rest_ordering import OrderBy, OrderDirection
1516
from models_library.rest_pagination import PageTotalCount
1617
from models_library.users import UserID
1718

19+
from ..projects._groups_repository import list_project_groups
20+
from ..users._users_service import get_users_in_group
21+
22+
# Import or define SocketMessageDict
1823
from ..users.api import get_user_primary_group_id
1924
from . import _conversation_message_repository
25+
from ._socketio import (
26+
notify_conversation_message_created,
27+
notify_conversation_message_deleted,
28+
notify_conversation_message_updated,
29+
)
2030

2131
_logger = logging.getLogger(__name__)
2232

2333

34+
async def _get_recipients(app: web.Application, project_id: ProjectID) -> set[UserID]:
35+
groups = await list_project_groups(app, project_id=project_id)
36+
return {
37+
user
38+
for group in groups
39+
if group.read
40+
for user in await get_users_in_group(app, gid=group.gid)
41+
}
42+
43+
2444
async def create_message(
2545
app: web.Application,
2646
*,
2747
user_id: UserID,
48+
project_id: ProjectID,
2849
conversation_id: ConversationID,
2950
# Creation attributes
3051
content: str,
3152
type_: ConversationMessageType,
3253
) -> ConversationMessageGetDB:
3354
_user_group_id = await get_user_primary_group_id(app, user_id=user_id)
3455

35-
return await _conversation_message_repository.create(
56+
created_message = await _conversation_message_repository.create(
3657
app,
3758
conversation_id=conversation_id,
3859
user_group_id=_user_group_id,
3960
content=content,
4061
type_=type_,
4162
)
4263

64+
await notify_conversation_message_created(
65+
app,
66+
recipients=await _get_recipients(app, project_id),
67+
project_id=project_id,
68+
conversation_message=created_message,
69+
)
70+
71+
return created_message
72+
4373

4474
async def get_message(
4575
app: web.Application,
@@ -55,22 +85,33 @@ async def get_message(
5585
async def update_message(
5686
app: web.Application,
5787
*,
88+
project_id: ProjectID,
5889
conversation_id: ConversationID,
5990
message_id: ConversationMessageID,
6091
# Update attributes
6192
updates: ConversationMessagePatchDB,
6293
) -> ConversationMessageGetDB:
63-
return await _conversation_message_repository.update(
94+
updated_message = await _conversation_message_repository.update(
6495
app,
6596
conversation_id=conversation_id,
6697
message_id=message_id,
6798
updates=updates,
6899
)
69100

101+
await notify_conversation_message_updated(
102+
app,
103+
recipients=await _get_recipients(app, project_id),
104+
project_id=project_id,
105+
conversation_message=updated_message,
106+
)
107+
108+
return updated_message
109+
70110

71111
async def delete_message(
72112
app: web.Application,
73113
*,
114+
project_id: ProjectID,
74115
conversation_id: ConversationID,
75116
message_id: ConversationMessageID,
76117
) -> None:
@@ -80,6 +121,14 @@ async def delete_message(
80121
message_id=message_id,
81122
)
82123

124+
await notify_conversation_message_deleted(
125+
app,
126+
recipients=await _get_recipients(app, project_id),
127+
project_id=project_id,
128+
conversation_id=conversation_id,
129+
message_id=message_id,
130+
)
131+
83132

84133
async def list_messages_for_conversation(
85134
app: web.Application,
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import datetime
2+
from typing import Final
3+
4+
from aiohttp import web
5+
from models_library.conversations import (
6+
ConversationID,
7+
ConversationMessageGetDB,
8+
ConversationMessageID,
9+
)
10+
from models_library.projects import ProjectID
11+
from models_library.socketio import SocketMessageDict
12+
from models_library.users import UserID
13+
from pydantic import AliasGenerator, BaseModel, ConfigDict
14+
from pydantic.alias_generators import to_camel
15+
from servicelib.utils import limited_as_completed
16+
17+
from ..socketio.messages import send_message_to_standard_group
18+
19+
_MAX_CONCURRENT_SENDS: Final[int] = 3
20+
21+
SOCKET_IO_CONVERSATION_MESSAGE_CREATED_EVENT: Final[str] = (
22+
"conversation:message:created"
23+
)
24+
SOCKET_IO_CONVERSATION_MESSAGE_DELETED_EVENT: Final[str] = (
25+
"conversation:message:deleted"
26+
)
27+
SOCKET_IO_CONVERSATION_MESSAGE_UPDATED_EVENT: Final[str] = (
28+
"conversation:message:updated"
29+
)
30+
31+
32+
class BaseConversationMessage(BaseModel):
33+
conversation_id: ConversationID
34+
message_id: ConversationMessageID
35+
36+
model_config = ConfigDict(
37+
populate_by_name=True,
38+
from_attributes=True,
39+
alias_generator=AliasGenerator(
40+
serialization_alias=to_camel,
41+
),
42+
)
43+
44+
45+
class ConversationMessageCreated(BaseConversationMessage):
46+
content: str
47+
created: datetime.datetime
48+
49+
50+
class ConversationMessageUpdated(BaseConversationMessage):
51+
content: str
52+
modified: datetime.datetime
53+
54+
55+
class ConversationMessageDeleted(BaseConversationMessage): ...
56+
57+
58+
async def _send_message_to_recipients(
59+
app: web.Application,
60+
recipients: set[UserID],
61+
notification_message: SocketMessageDict,
62+
):
63+
async for _ in limited_as_completed(
64+
(
65+
send_message_to_standard_group(app, recipient, notification_message)
66+
for recipient in recipients
67+
),
68+
limit=_MAX_CONCURRENT_SENDS,
69+
):
70+
...
71+
72+
73+
async def notify_conversation_message_created(
74+
app: web.Application,
75+
*,
76+
recipients: set[UserID],
77+
project_id: ProjectID,
78+
conversation_message: ConversationMessageGetDB,
79+
) -> None:
80+
notification_message = SocketMessageDict(
81+
event_type=SOCKET_IO_CONVERSATION_MESSAGE_CREATED_EVENT,
82+
data={
83+
"projectId": project_id,
84+
**ConversationMessageCreated(
85+
**conversation_message.model_dump()
86+
).model_dump(mode="json", by_alias=True),
87+
},
88+
)
89+
90+
await _send_message_to_recipients(app, recipients, notification_message)
91+
92+
93+
async def notify_conversation_message_updated(
94+
app: web.Application,
95+
*,
96+
recipients: set[UserID],
97+
project_id: ProjectID,
98+
conversation_message: ConversationMessageGetDB,
99+
) -> None:
100+
101+
notification_message = SocketMessageDict(
102+
event_type=SOCKET_IO_CONVERSATION_MESSAGE_UPDATED_EVENT,
103+
data={
104+
"projectId": project_id,
105+
**ConversationMessageUpdated(
106+
**conversation_message.model_dump()
107+
).model_dump(mode="json", by_alias=True),
108+
},
109+
)
110+
111+
await _send_message_to_recipients(app, recipients, notification_message)
112+
113+
114+
async def notify_conversation_message_deleted(
115+
app: web.Application,
116+
*,
117+
recipients: set[UserID],
118+
project_id: ProjectID,
119+
conversation_id: ConversationID,
120+
message_id: ConversationMessageID,
121+
) -> None:
122+
123+
notification_message = SocketMessageDict(
124+
event_type=SOCKET_IO_CONVERSATION_MESSAGE_DELETED_EVENT,
125+
data={
126+
"projectId": project_id,
127+
**ConversationMessageDeleted(
128+
conversation_id=conversation_id, message_id=message_id
129+
).model_dump(mode="json", by_alias=True),
130+
},
131+
)
132+
133+
await _send_message_to_recipients(app, recipients, notification_message)

services/web/server/src/simcore_service_webserver/projects/_conversations_service.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ async def create_project_conversation_message(
169169
return await conversations_service.create_message(
170170
app,
171171
user_id=user_id,
172+
project_id=project_uuid,
172173
conversation_id=conversation_id,
173174
content=content,
174175
type_=message_type,
@@ -221,6 +222,7 @@ async def update_project_conversation_message(
221222
)
222223
return await conversations_service.update_message(
223224
app,
225+
project_id=project_uuid,
224226
conversation_id=conversation_id,
225227
message_id=message_id,
226228
updates=ConversationMessagePatchDB(content=content),
@@ -244,7 +246,10 @@ async def delete_project_conversation_message(
244246
permission="read",
245247
)
246248
await conversations_service.delete_message(
247-
app, conversation_id=conversation_id, message_id=message_id
249+
app,
250+
project_id=project_uuid,
251+
conversation_id=conversation_id,
252+
message_id=message_id,
248253
)
249254

250255

services/web/server/src/simcore_service_webserver/socketio/messages.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
SOCKET_IO_LOG_EVENT: Final[str] = "logger"
2828

2929
SOCKET_IO_NODE_UPDATED_EVENT: Final[str] = "nodeUpdated"
30+
3031
SOCKET_IO_PROJECT_UPDATED_EVENT: Final[str] = "projectStateUpdated"
32+
3133
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT: Final[str] = "walletOsparcCreditsUpdated"
3234

3335

0 commit comments

Comments
 (0)