Skip to content

Commit f81aa73

Browse files
Adds Redis lock to support message creation for sync
Introduces Redis-based exclusive locking to ensure atomicity between message creation and first-message checking, preventing race conditions in concurrent environments and maintaining data consistency.
1 parent 91420d2 commit f81aa73

File tree

2 files changed

+57
-23
lines changed

2 files changed

+57
-23
lines changed

services/web/server/src/simcore_service_webserver/conversations/_conversation_message_service.py

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
from models_library.rest_ordering import OrderBy, OrderDirection
1616
from models_library.rest_pagination import PageTotalCount
1717
from models_library.users import UserID
18+
from servicelib.redis import exclusive
1819

19-
# Import or define SocketMessageDict
20+
from ..redis import get_redis_lock_manager_client_sdk
2021
from ..users import users_service
2122
from . import _conversation_message_repository
2223
from ._conversation_service import _get_recipients
@@ -28,6 +29,9 @@
2829

2930
_logger = logging.getLogger(__name__)
3031

32+
# Redis lock key for conversation message operations
33+
CONVERSATION_MESSAGE_REDIS_LOCK_KEY = "conversation_message_update:{}"
34+
3135

3236
async def create_message(
3337
app: web.Application,
@@ -70,30 +74,61 @@ async def create_support_message_and_check_if_it_is_first_message(
7074
content: str,
7175
type_: ConversationMessageType,
7276
) -> tuple[ConversationMessageGetDB, bool]:
73-
created_message = await create_message(
74-
app,
75-
user_id=user_id,
76-
project_id=project_id,
77-
conversation_id=conversation_id,
78-
content=content,
79-
type_=type_,
80-
)
81-
_, messages = await _conversation_message_repository.list_(
82-
app,
83-
conversation_id=conversation_id,
84-
offset=0,
85-
limit=1,
86-
order_by=OrderBy(
87-
field=IDStr("created"), direction=OrderDirection.ASC
88-
), # NOTE: ASC - first/oldest message first
77+
"""Create a message and check if it's the first one with Redis lock protection.
78+
79+
This function is protected by Redis exclusive lock because:
80+
- the message creation and first message check must be kept in sync
81+
82+
Args:
83+
app: The web application instance
84+
user_id: ID of the user creating the message
85+
project_id: ID of the project (optional)
86+
conversation_id: ID of the conversation
87+
content: Content of the message
88+
type_: Type of the message
89+
90+
Returns:
91+
Tuple containing the created message and whether it's the first message
92+
"""
93+
94+
@exclusive(
95+
get_redis_lock_manager_client_sdk(app),
96+
lock_key=CONVERSATION_MESSAGE_REDIS_LOCK_KEY.format(conversation_id),
97+
blocking=True,
98+
blocking_timeout=None, # NOTE: this is a blocking call, a timeout has undefined effects
8999
)
100+
async def _create_support_message_and_check_if_it_is_first_message() -> (
101+
tuple[ConversationMessageGetDB, bool]
102+
):
103+
"""This function is protected because
104+
- the message creation and first message check must be kept in sync
105+
"""
106+
created_message = await create_message(
107+
app,
108+
user_id=user_id,
109+
project_id=project_id,
110+
conversation_id=conversation_id,
111+
content=content,
112+
type_=type_,
113+
)
114+
_, messages = await _conversation_message_repository.list_(
115+
app,
116+
conversation_id=conversation_id,
117+
offset=0,
118+
limit=1,
119+
order_by=OrderBy(
120+
field=IDStr("created"), direction=OrderDirection.ASC
121+
), # NOTE: ASC - first/oldest message first
122+
)
123+
124+
is_first_message = False
125+
if messages:
126+
first_message = messages[0]
127+
is_first_message = first_message.message_id == created_message.message_id
90128

91-
is_first_message = False
92-
if messages:
93-
first_message = messages[0]
94-
is_first_message = first_message.message_id == created_message.message_id
129+
return created_message, is_first_message
95130

96-
return created_message, is_first_message
131+
return await _create_support_message_and_check_if_it_is_first_message()
97132

98133

99134
async def get_message(

services/web/server/tests/unit/with_dbs/04/conversations/test_conversations_messages_rest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,6 @@ async def test_conversation_messages_nonexistent_resources(
450450
await assert_status(resp, status.HTTP_404_NOT_FOUND)
451451

452452

453-
### Test with a database
454453
@pytest.mark.parametrize("user_role", [UserRole.USER])
455454
async def test_conversation_messages_with_database(
456455
client: TestClient,

0 commit comments

Comments
 (0)