Skip to content

Commit c406846

Browse files
authored
🐛 Bring back the logs (ITISFoundation#2637)
1 parent 524ba04 commit c406846

File tree

27 files changed

+584
-398
lines changed

27 files changed

+584
-398
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import List, Optional, Union
2+
3+
from models_library.projects import ProjectID
4+
from models_library.projects_nodes import NodeID
5+
from models_library.projects_state import RunningState
6+
from models_library.users import UserID
7+
from pydantic import BaseModel
8+
from pydantic.types import NonNegativeFloat
9+
from simcore_postgres_database.models.comp_tasks import NodeClass
10+
11+
12+
class RabbitMessageBase(BaseModel):
13+
node_id: NodeID
14+
user_id: UserID
15+
project_id: ProjectID
16+
17+
18+
class LoggerRabbitMessage(RabbitMessageBase):
19+
messages: List[str]
20+
21+
22+
class ProgressRabbitMessage(RabbitMessageBase):
23+
progress: NonNegativeFloat
24+
25+
26+
class InstrumentationRabbitMessage(RabbitMessageBase):
27+
metrics: str
28+
service_uuid: NodeID
29+
service_type: NodeClass
30+
service_key: str
31+
service_tag: str
32+
result: Optional[RunningState] = None
33+
34+
35+
RabbitMessageTypes = Union[
36+
LoggerRabbitMessage, ProgressRabbitMessage, InstrumentationRabbitMessage
37+
]

packages/pytest-simcore/src/pytest_simcore/rabbit_service.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import json
77
import logging
88
import socket
9-
from typing import Any, AsyncIterator, Dict, Iterator, Optional, Tuple
9+
from dataclasses import dataclass
10+
from typing import Any, AsyncIterator, Dict, Iterator, Optional
1011

1112
import aio_pika
1213
import pytest
@@ -118,11 +119,18 @@ def _channel_close_callback(sender: Any, exc: Optional[BaseException] = None):
118119
await channel.close()
119120

120121

122+
@dataclass
123+
class RabbitExchanges:
124+
logs: aio_pika.Exchange
125+
progress: aio_pika.Exchange
126+
instrumentation: aio_pika.Exchange
127+
128+
121129
@pytest.fixture(scope="function")
122-
async def rabbit_exchange(
130+
async def rabbit_exchanges(
123131
rabbit_config: RabbitConfig,
124132
rabbit_channel: aio_pika.Channel,
125-
) -> Tuple[aio_pika.Exchange, aio_pika.Exchange]:
133+
) -> RabbitExchanges:
126134
"""
127135
Declares and returns 'log' and 'instrumentation' exchange channels with rabbit
128136
"""
@@ -134,27 +142,33 @@ async def rabbit_exchange(
134142
)
135143
assert logs_exchange
136144

145+
# declare progress exchange
146+
PROGRESS_EXCHANGE_NAME: str = rabbit_config.channels["progress"]
147+
progress_exchange = await rabbit_channel.declare_exchange(
148+
PROGRESS_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT
149+
)
150+
assert progress_exchange
151+
137152
# declare instrumentation exchange
138153
INSTRUMENTATION_EXCHANGE_NAME: str = rabbit_config.channels["instrumentation"]
139154
instrumentation_exchange = await rabbit_channel.declare_exchange(
140155
INSTRUMENTATION_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT
141156
)
142157
assert instrumentation_exchange
143158

144-
return logs_exchange, instrumentation_exchange
159+
return RabbitExchanges(logs_exchange, progress_exchange, instrumentation_exchange)
145160

146161

147162
@pytest.fixture(scope="function")
148163
async def rabbit_queue(
149164
rabbit_channel: aio_pika.Channel,
150-
rabbit_exchange: Tuple[aio_pika.Exchange, aio_pika.Exchange],
165+
rabbit_exchanges: RabbitExchanges,
151166
) -> AsyncIterator[aio_pika.Queue]:
152-
logs_exchange, instrumentation_exchange = rabbit_exchange
153-
154167
queue = await rabbit_channel.declare_queue(exclusive=True)
155168
assert queue
156169

157170
# Binding queue to exchange
158-
await queue.bind(logs_exchange)
159-
await queue.bind(instrumentation_exchange)
171+
await queue.bind(rabbit_exchanges.logs)
172+
await queue.bind(rabbit_exchanges.progress)
173+
await queue.bind(rabbit_exchanges.instrumentation)
160174
yield queue

packages/service-library/src/servicelib/aiohttp/rest_codecs.py

Lines changed: 0 additions & 26 deletions
This file was deleted.

packages/service-library/src/servicelib/aiohttp/rest_responses.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
33
"""
44
import inspect
5+
import json
56
from typing import Any, Dict, List, Mapping, Optional, Tuple, Type, Union
67

78
import attr
89
from aiohttp import web, web_exceptions
910
from aiohttp.web_exceptions import HTTPError, HTTPException
1011

11-
from .rest_codecs import json, jsonify
12+
from ..json_serialization import json_dumps
1213
from .rest_models import ErrorItemType, ErrorType, LogMessageType
1314

1415
ENVELOPE_KEYS = ("data", "error")
@@ -77,7 +78,7 @@ def create_data_response(
7778
else:
7879
payload = data
7980

80-
response = web.json_response(payload, dumps=jsonify)
81+
response = web.json_response(payload, dumps=json_dumps)
8182
except (TypeError, ValueError) as err:
8283
response = create_error_response(
8384
[
@@ -123,7 +124,7 @@ def create_error_response(
123124
payload = wrap_as_envelope(error=attr.asdict(error))
124125

125126
response = http_error_cls(
126-
reason=reason, text=jsonify(payload), content_type=JSON_CONTENT_TYPE
127+
reason=reason, text=json_dumps(payload), content_type=JSON_CONTENT_TYPE
127128
)
128129

129130
return response
@@ -136,7 +137,9 @@ def create_log_response(msg: str, level: str) -> web.Response:
136137
"""
137138
# TODO: DEPRECATE
138139
msg = LogMessageType(msg, level)
139-
response = web.json_response(data={"data": attr.asdict(msg), "error": None})
140+
response = web.json_response(
141+
data={"data": attr.asdict(msg), "error": None}, dumps=json_dumps
142+
)
140143
return response
141144

142145

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
# FIXME: move to settings-library or refactor
22

33
import logging
4-
from typing import Optional
4+
from typing import Final, Optional
55

6-
from tenacity import before_sleep_log, stop_after_attempt, wait_fixed
6+
from tenacity.before_sleep import before_sleep_log
7+
from tenacity.stop import stop_after_delay
8+
from tenacity.wait import wait_fixed
79

810
log = logging.getLogger(__file__)
911

1012

13+
_MINUTE: Final[int] = 60
14+
15+
1116
class RabbitMQRetryPolicyUponInitialization:
1217
"""Retry policy upon service initialization"""
1318

14-
WAIT_SECS = 2
15-
ATTEMPTS_COUNT = 60
16-
1719
def __init__(self, logger: Optional[logging.Logger] = None):
1820
logger = logger or log
1921

2022
self.kwargs = dict(
21-
wait=wait_fixed(self.WAIT_SECS),
22-
stop=stop_after_attempt(self.ATTEMPTS_COUNT),
23-
before_sleep=before_sleep_log(logger, logging.INFO),
23+
wait=wait_fixed(2),
24+
stop=stop_after_delay(3 * _MINUTE),
25+
before_sleep=before_sleep_log(logger, logging.WARNING),
2426
reraise=True,
2527
)

packages/service-library/tests/aiohttp/tutils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
import asyncio
77
import json
8+
from dataclasses import dataclass
89

9-
import attr
1010
from aiohttp import web
11-
from servicelib.aiohttp.rest_codecs import DataEncoder
11+
from servicelib.json_serialization import json_dumps
1212

1313

14-
@attr.s(auto_attribs=True)
14+
@dataclass
1515
class Data:
1616
x: int = 3
1717
y: str = "foo"
@@ -66,4 +66,4 @@ def get(cls, suffix, process=True):
6666
loop = asyncio.get_event_loop()
6767
data = loop.run_until_complete(coro(None))
6868

69-
return json.loads(json.dumps(data, cls=DataEncoder)) if process else data
69+
return json.loads(json_dumps(data)) if process else data

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import json
32
import logging
43
from dataclasses import dataclass
54
from typing import Callable, Dict, List, Tuple
@@ -13,6 +12,12 @@
1312
from models_library.projects import ProjectID
1413
from models_library.projects_nodes_io import NodeID
1514
from models_library.projects_state import RunningState
15+
from models_library.rabbitmq_messages import (
16+
InstrumentationRabbitMessage,
17+
LoggerRabbitMessage,
18+
ProgressRabbitMessage,
19+
)
20+
from simcore_postgres_database.models.comp_tasks import NodeClass
1621

1722
from ...core.settings import DaskSchedulerSettings
1823
from ...models.domains.comp_tasks import CompTaskAtDB, Image
@@ -114,19 +119,18 @@ async def _on_task_completed(self, event: TaskStateEvent) -> None:
114119
project_id, [node_id], event.state
115120
)
116121
# instrumentation
117-
message = {
118-
"metrics": "service_stopped",
119-
"user_id": user_id,
120-
"project_id": f"{project_id}",
121-
"service_uuid": f"{node_id}",
122-
"service_type": "COMPUTATIONAL",
123-
"service_key": service_key,
124-
"service_tag": service_version,
125-
"result": "SUCCESS" if event.state == RunningState.SUCCESS else "FAILURE",
126-
}
127-
await self.rabbitmq_client.publish_message(
128-
"instrumentation", json.dumps(message)
122+
message = InstrumentationRabbitMessage(
123+
metrics="service_stopped",
124+
user_id=user_id,
125+
project_id=project_id,
126+
node_id=node_id,
127+
service_uuid=node_id,
128+
service_type=NodeClass.COMPUTATIONAL,
129+
service_key=service_key,
130+
service_tag=service_version,
131+
result=event.state,
129132
)
133+
await self.rabbitmq_client.publish_message(message)
130134
self._wake_up_scheduler_now()
131135

132136
async def _task_state_change_handler(self, event: str) -> None:
@@ -140,18 +144,17 @@ async def _task_state_change_handler(self, event: str) -> None:
140144
)
141145

142146
if task_state_event.state == RunningState.STARTED:
143-
message = {
144-
"metrics": "service_started",
145-
"user_id": user_id,
146-
"project_id": f"{project_id}",
147-
"service_uuid": f"{node_id}",
148-
"service_type": "COMPUTATIONAL",
149-
"service_key": service_key,
150-
"service_tag": service_version,
151-
}
152-
await self.rabbitmq_client.publish_message(
153-
"instrumentation", json.dumps(message)
147+
message = InstrumentationRabbitMessage(
148+
metrics="service_started",
149+
user_id=user_id,
150+
project_id=project_id,
151+
node_id=node_id,
152+
service_uuid=node_id,
153+
service_type=NodeClass.COMPUTATIONAL,
154+
service_key=service_key,
155+
service_tag=service_version,
154156
)
157+
await self.rabbitmq_client.publish_message(message)
155158

156159
await CompTasksRepository(self.db_engine).set_project_tasks_state(
157160
project_id, [node_id], task_state_event.state
@@ -161,28 +164,23 @@ async def _task_progress_change_handler(self, event: str) -> None:
161164
task_progress_event = TaskProgressEvent.parse_raw(event)
162165
logger.debug("received task progress update: %s", task_progress_event)
163166
*_, user_id, project_id, node_id = parse_dask_job_id(task_progress_event.job_id)
164-
message = {
165-
"user_id": user_id,
166-
"project_id": f"{project_id}",
167-
"node_id": f"{node_id}",
168-
"progress": task_progress_event.progress,
169-
"channel": "progress",
170-
}
171-
await self.rabbitmq_client.publish_message(
172-
task_progress_event.topic_name(), json.dumps(message)
167+
message = ProgressRabbitMessage(
168+
user_id=user_id,
169+
project_id=project_id,
170+
node_id=node_id,
171+
progress=task_progress_event.progress,
173172
)
173+
await self.rabbitmq_client.publish_message(message)
174174

175175
async def _task_log_change_handler(self, event: str) -> None:
176176
task_log_event = TaskLogEvent.parse_raw(event)
177177
logger.debug("received task log update: %s", task_log_event)
178178
*_, user_id, project_id, node_id = parse_dask_job_id(task_log_event.job_id)
179-
message = {
180-
"user_id": user_id,
181-
"project_id": f"{project_id}",
182-
"node_id": f"{node_id}",
183-
"messages": [task_log_event.log],
184-
"channel": "logger",
185-
}
186-
await self.rabbitmq_client.publish_message(
187-
task_log_event.topic_name(), json.dumps(message)
179+
message = LoggerRabbitMessage(
180+
user_id=user_id,
181+
project_id=project_id,
182+
node_id=node_id,
183+
messages=[task_log_event.log],
188184
)
185+
186+
await self.rabbitmq_client.publish_message(message)

0 commit comments

Comments
 (0)