Skip to content

Commit e7969cf

Browse files
authored
✨ log message passes project_id (ITISFoundation#3158)
1 parent 253bdc2 commit e7969cf

File tree

2 files changed

+81
-83
lines changed

2 files changed

+81
-83
lines changed

services/web/server/src/simcore_service_webserver/computation_subscribe.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import os
44
import socket
5-
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List
5+
from typing import Any, AsyncIterator, Awaitable, Callable
66

77
import aio_pika
88
from aiohttp import web
@@ -49,7 +49,7 @@ async def progress_message_parser(app: web.Application, data: bytes) -> None:
4949
progress=rabbit_message.progress,
5050
)
5151
if project:
52-
messages: List[SocketMessageDict] = [
52+
messages: list[SocketMessageDict] = [
5353
{
5454
"event_type": SOCKET_IO_NODE_UPDATED_EVENT,
5555
"data": {
@@ -74,13 +74,11 @@ async def progress_message_parser(app: web.Application, data: bytes) -> None:
7474

7575
async def log_message_parser(app: web.Application, data: bytes) -> None:
7676
rabbit_message = LoggerRabbitMessage.parse_raw(data)
77-
socket_messages: List[SocketMessageDict] = [
77+
78+
socket_messages: list[SocketMessageDict] = [
7879
{
7980
"event_type": SOCKET_IO_LOG_EVENT,
80-
"data": {
81-
"messages": rabbit_message.messages,
82-
"node_id": f"{rabbit_message.node_id}",
83-
},
81+
"data": rabbit_message.dict(exclude={"user_id"}),
8482
}
8583
]
8684
await send_messages(app, f"{rabbit_message.user_id}", socket_messages)
@@ -101,7 +99,7 @@ async def instrumentation_message_parser(app: web.Application, data: bytes) -> N
10199
async def events_message_parser(app: web.Application, data: bytes) -> None:
102100
rabbit_message = EventRabbitMessage.parse_raw(data)
103101

104-
socket_messages: List[SocketMessageDict] = [
102+
socket_messages: list[SocketMessageDict] = [
105103
{
106104
"event_type": SOCKET_IO_EVENT,
107105
"data": {
@@ -163,7 +161,7 @@ async def _get_channel() -> aio_pika.Channel:
163161
async def _exchange_consumer(
164162
exchange_name: str,
165163
parse_handler: Callable[[web.Application, bytes], Awaitable[None]],
166-
consumer_kwargs: Dict[str, Any],
164+
consumer_kwargs: dict[str, Any],
167165
):
168166
while consumer_running:
169167
try:

services/web/server/tests/integration/02/test_rabbit.py

Lines changed: 74 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77
import logging
88
from asyncio import sleep
99
from collections import namedtuple
10-
from typing import Any, Awaitable, Callable, Dict, List, NamedTuple, Tuple
10+
from typing import Any, Awaitable, Callable, NamedTuple
1111

1212
import aio_pika
1313
import pytest
1414
import socketio
1515
import sqlalchemy as sa
1616
from faker.proxy import Faker
1717
from models_library.basic_types import UUIDStr
18+
from models_library.projects import ProjectID
19+
from models_library.projects_nodes_io import NodeID
1820
from models_library.projects_state import RunningState
1921
from models_library.rabbitmq_messages import (
2022
InstrumentationRabbitMessage,
@@ -23,9 +25,11 @@
2325
)
2426
from models_library.users import UserID
2527
from pytest_mock import MockerFixture
28+
from pytest_simcore.helpers.utils_login import UserInfoDict
2629
from pytest_simcore.rabbit_service import RabbitExchanges
2730
from servicelib.aiohttp.application import create_safe_application
2831
from settings_library.rabbit import RabbitSettings
32+
from simcore_postgres_database.models.comp_tasks import NodeClass
2933
from simcore_service_webserver._constants import APP_SETTINGS_KEY
3034
from simcore_service_webserver.application_settings import setup_settings
3135
from simcore_service_webserver.computation import setup_computation
@@ -40,6 +44,7 @@
4044
from simcore_service_webserver.security_roles import UserRole
4145
from simcore_service_webserver.session import setup_session
4246
from simcore_service_webserver.socketio.plugin import setup_socketio
47+
from tenacity import retry_if_exception_type
4348
from tenacity._asyncio import AsyncRetrying
4449
from tenacity.before_sleep import before_sleep_log
4550
from tenacity.stop import stop_after_delay
@@ -60,19 +65,19 @@
6065

6166
logger = logging.getLogger(__name__)
6267

63-
RabbitMessage = Dict[str, Any]
64-
LogMessages = List[LoggerRabbitMessage]
65-
InstrumMessages = List[InstrumentationRabbitMessage]
66-
ProgressMessages = List[ProgressRabbitMessage]
68+
RabbitMessage = dict[str, Any]
69+
LogMessages = list[LoggerRabbitMessage]
70+
InstrumMessages = list[InstrumentationRabbitMessage]
71+
ProgressMessages = list[ProgressRabbitMessage]
6772

6873

6974
async def _publish_in_rabbit(
70-
user_id: int,
71-
project_id: UUIDStr,
72-
node_uuid: UUIDStr,
75+
user_id: UserID,
76+
project_id: ProjectID,
77+
node_uuid: NodeID,
7378
num_messages: int,
7479
rabbit_exchanges: RabbitExchanges,
75-
) -> Tuple[LogMessages, ProgressMessages, InstrumMessages]:
80+
) -> tuple[LogMessages, ProgressMessages, InstrumMessages]:
7681

7782
log_messages = [
7883
LoggerRabbitMessage(
@@ -102,7 +107,7 @@ async def _publish_in_rabbit(
102107
project_id=project_id,
103108
node_id=node_uuid,
104109
service_uuid=node_uuid,
105-
service_type="COMPUTATIONAL",
110+
service_type=NodeClass.COMPUTATIONAL,
106111
service_key="some/service/awesome/key",
107112
service_tag="some-awesome-tag",
108113
)
@@ -154,7 +159,7 @@ async def _publish_in_rabbit(
154159
def client(
155160
event_loop: asyncio.AbstractEventLoop,
156161
aiohttp_client: Callable,
157-
app_config: Dict[str, Any], ## waits until swarm with *_services are up
162+
app_config: dict[str, Any], ## waits until swarm with *_services are up
158163
rabbit_service: RabbitSettings, ## waits until rabbit is responsive and set env vars
159164
postgres_db: sa.engine.Engine,
160165
mocker: MockerFixture,
@@ -193,40 +198,34 @@ def client(
193198

194199

195200
@pytest.fixture
196-
def user_id(faker: Faker) -> int:
197-
return faker.pyint(min_value=1)
201+
def client_session_id(client_session_id_factory: Callable[[], str]) -> UUIDStr:
202+
return client_session_id_factory()
198203

199204

200205
@pytest.fixture
201-
def node_uuid(faker: Faker) -> str:
202-
return faker.uuid4()
206+
def not_logged_user_id(faker: Faker, logged_user: dict[str, Any]) -> UserID:
207+
some_user_id = faker.pyint(min_value=logged_user["id"] + 1)
208+
assert logged_user["id"] != some_user_id
209+
return some_user_id
203210

204211

205212
@pytest.fixture
206-
def client_session_id(faker: Faker) -> UUIDStr:
207-
return faker.uuid4()
208-
209-
210-
@pytest.fixture
211-
def other_user_id(user_id: int, logged_user: Dict[str, Any]) -> int:
212-
other = user_id
213-
assert logged_user["id"] != other
214-
return other
215-
216-
217-
@pytest.fixture
218-
def other_project_id(faker: Faker, user_project: Dict[str, Any]) -> UUIDStr:
219-
other_id = faker.uuid4()
220-
assert user_project["uuid"] != other_id
213+
def not_current_project_id(faker: Faker, user_project: dict[str, Any]) -> ProjectID:
214+
other_id = faker.uuid4(cast_to=None)
215+
assert (
216+
ProjectID(user_project["uuid"]) != other_id
217+
), "bad luck... this should not happen very often though"
221218
return other_id
222219

223220

224221
@pytest.fixture
225-
def other_node_uuid(node_uuid: UUIDStr, user_project: Dict[str, Any]) -> UUIDStr:
226-
other = node_uuid
227-
node_uuid = list(user_project["workbench"])[0]
228-
assert node_uuid != other
229-
return other
222+
def not_in_project_node_uuid(faker: Faker, user_project: dict[str, Any]) -> NodeID:
223+
not_in_project_node_uuid = faker.uuid4(cast_to=None)
224+
assert not any(
225+
NodeID(node_id) == not_in_project_node_uuid
226+
for node_id in user_project["workbench"]
227+
), "bad luck... this should not happen very often though"
228+
return not_in_project_node_uuid
230229

231230

232231
@pytest.fixture
@@ -267,15 +266,15 @@ async def socketio_subscriber_handlers(
267266
def publish_some_messages_in_rabbit(
268267
rabbit_exchanges: RabbitExchanges,
269268
) -> Callable[
270-
[UserID, UUIDStr, UUIDStr, int],
271-
Awaitable[Tuple[LogMessages, ProgressMessages, InstrumMessages]],
269+
[UserID, ProjectID, NodeID, int],
270+
Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]],
272271
]:
273272
"""rabbitMQ PUBLISHER"""
274273

275274
async def go(
276-
user_id: int,
277-
project_id: str,
278-
node_uuid: str,
275+
user_id: UserID,
276+
project_id: ProjectID,
277+
node_uuid: NodeID,
279278
num_messages: int,
280279
):
281280
return await _publish_in_rabbit(
@@ -302,11 +301,12 @@ def user_role() -> UserRole:
302301
#
303302

304303
POLLING_TIME = 0.2
305-
TIMEOUT_S = 5
304+
TIMEOUT_S = 10
306305
RETRY_POLICY = dict(
307306
wait=wait_fixed(POLLING_TIME),
308307
stop=stop_after_delay(TIMEOUT_S),
309308
before_sleep=before_sleep_log(logger, log_level=logging.WARNING),
309+
retry=retry_if_exception_type(AssertionError),
310310
reraise=True,
311311
)
312312
NUMBER_OF_MESSAGES = 1
@@ -319,23 +319,23 @@ def user_role() -> UserRole:
319319

320320
@pytest.mark.parametrize("user_role", USER_ROLES)
321321
async def test_publish_to_other_user(
322-
other_user_id: int,
323-
other_project_id: UUIDStr,
324-
other_node_uuid: str,
322+
not_logged_user_id: UserID,
323+
not_current_project_id: ProjectID,
324+
not_in_project_node_uuid: NodeID,
325325
#
326326
socketio_subscriber_handlers: NamedTuple,
327327
publish_some_messages_in_rabbit: Callable[
328-
[UserID, UUIDStr, UUIDStr, int],
329-
Awaitable[Tuple[LogMessages, ProgressMessages, InstrumMessages]],
328+
[UserID, ProjectID, NodeID, int],
329+
Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]],
330330
],
331331
):
332332
mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers
333333

334334
# Some other client publishes messages with wrong user id
335335
await publish_some_messages_in_rabbit(
336-
other_user_id,
337-
other_project_id,
338-
other_node_uuid,
336+
not_logged_user_id,
337+
not_current_project_id,
338+
not_in_project_node_uuid,
339339
NUMBER_OF_MESSAGES,
340340
)
341341
await sleep(TIMEOUT_S)
@@ -346,23 +346,23 @@ async def test_publish_to_other_user(
346346

347347
@pytest.mark.parametrize("user_role", USER_ROLES)
348348
async def test_publish_to_user(
349-
logged_user: Dict[str, Any],
350-
other_project_id: UUIDStr,
351-
other_node_uuid: str,
349+
logged_user: UserInfoDict,
350+
not_current_project_id: ProjectID,
351+
not_in_project_node_uuid: NodeID,
352352
#
353353
socketio_subscriber_handlers: NamedTuple,
354354
publish_some_messages_in_rabbit: Callable[
355-
[UserID, UUIDStr, UUIDStr, int],
356-
Awaitable[Tuple[LogMessages, ProgressMessages, InstrumMessages]],
355+
[UserID, ProjectID, NodeID, int],
356+
Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]],
357357
],
358358
):
359359
mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers
360360

361361
# publish messages with correct user id, but no project
362362
log_messages, _, _ = await publish_some_messages_in_rabbit(
363363
logged_user["id"],
364-
other_project_id,
365-
other_node_uuid,
364+
not_current_project_id,
365+
not_in_project_node_uuid,
366366
NUMBER_OF_MESSAGES,
367367
)
368368

@@ -376,30 +376,30 @@ async def test_publish_to_user(
376376
value = mock_call[0]
377377
deserialized_value = json.loads(value[0])
378378
assert deserialized_value == json.loads(
379-
expected_message.json(include={"node_id", "messages"})
379+
expected_message.json(include={"node_id", "project_id", "messages"})
380380
)
381381
mock_node_update_handler.assert_not_called()
382382

383383

384384
@pytest.mark.parametrize("user_role", USER_ROLES)
385385
async def test_publish_about_users_project(
386-
logged_user: Dict[str, Any],
387-
user_project: Dict[str, Any],
388-
other_node_uuid: str,
386+
logged_user: UserInfoDict,
387+
user_project: dict[str, Any],
388+
not_in_project_node_uuid: NodeID,
389389
#
390390
socketio_subscriber_handlers: NamedTuple,
391391
publish_some_messages_in_rabbit: Callable[
392-
[UserID, UUIDStr, UUIDStr, int],
393-
Awaitable[Tuple[LogMessages, ProgressMessages, InstrumMessages]],
392+
[UserID, ProjectID, NodeID, int],
393+
Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]],
394394
],
395395
):
396396
mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers
397397

398398
# publish message with correct user id, project but not node
399399
log_messages, _, _ = await publish_some_messages_in_rabbit(
400-
logged_user["id"],
401-
user_project["uuid"],
402-
other_node_uuid,
400+
UserID(logged_user["id"]),
401+
ProjectID(user_project["uuid"]),
402+
not_in_project_node_uuid,
403403
NUMBER_OF_MESSAGES,
404404
)
405405

@@ -413,29 +413,29 @@ async def test_publish_about_users_project(
413413
value = mock_call[0]
414414
deserialized_value = json.loads(value[0])
415415
assert deserialized_value == json.loads(
416-
expected_message.json(include={"node_id", "messages"})
416+
expected_message.json(include={"node_id", "project_id", "messages"})
417417
)
418418
mock_node_update_handler.assert_not_called()
419419

420420

421421
@pytest.mark.parametrize("user_role", USER_ROLES)
422422
async def test_publish_about_users_projects_node(
423-
logged_user: Dict[str, Any],
424-
user_project: Dict[str, Any],
423+
logged_user: UserInfoDict,
424+
user_project: dict[str, Any],
425425
#
426426
socketio_subscriber_handlers: NamedTuple,
427427
publish_some_messages_in_rabbit: Callable[
428-
[UserID, UUIDStr, UUIDStr, int],
429-
Awaitable[Tuple[LogMessages, ProgressMessages, InstrumMessages]],
428+
[UserID, ProjectID, NodeID, int],
429+
Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]],
430430
],
431431
):
432432
mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers
433433

434434
# publish message with correct user id, project node
435-
node_uuid = list(user_project["workbench"])[0]
435+
node_uuid = NodeID(list(user_project["workbench"])[0])
436436
log_messages, _, _ = await publish_some_messages_in_rabbit(
437-
logged_user["id"],
438-
user_project["uuid"],
437+
UserID(logged_user["id"]),
438+
ProjectID(user_project["uuid"]),
439439
node_uuid,
440440
NUMBER_OF_MESSAGES,
441441
)
@@ -451,7 +451,7 @@ async def test_publish_about_users_projects_node(
451451
value = mock_call[0]
452452
deserialized_value = json.loads(value[0])
453453
assert deserialized_value == json.loads(
454-
expected_message.json(include={"node_id", "messages"})
454+
expected_message.json(include={"node_id", "project_id", "messages"})
455455
)
456456

457457
# mock_log_handler.assert_has_calls(log_calls, any_order=True)

0 commit comments

Comments
 (0)