Skip to content

Commit e713c5e

Browse files
authored
Add metrics and event report for Ray channels (#2936)
1 parent a73970b commit e713c5e

File tree

3 files changed

+162
-7
lines changed

3 files changed

+162
-7
lines changed

mars/oscar/backends/ray/communication.py

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import itertools
1818
import logging
1919
import time
20+
import typing
2021
from abc import ABC
2122
from collections import namedtuple
2223
from dataclasses import dataclass
@@ -25,12 +26,14 @@
2526

2627
from ....oscar.profiling import ProfilingData
2728
from ....serialization import serialize, deserialize
29+
from ....metrics import Metrics
2830
from ....utils import lazy_import, implements, classproperty, Timer
2931
from ...debug import debug_async_timeout
3032
from ...errors import ServerClosed
3133
from ..communication.base import Channel, ChannelType, Server, Client
3234
from ..communication.core import register_client, register_server
3335
from ..communication.errors import ChannelClosed
36+
from .utils import report_event
3437

3538
ray = lazy_import("ray")
3639
logger = logging.getLogger(__name__)
@@ -39,6 +42,43 @@
3942
"ChannelID", ["local_address", "client_id", "channel_index", "dest_address"]
4043
)
4144

45+
SERIALIZATION_TIMEOUT_MILLS = 1000
46+
DESERIALIZATION_TIMEOUT_MILLS = 1000
47+
48+
49+
def msg_to_simple_str(msg): # pragma: no cover
50+
"""An helper that prints message structure without generate a big str."""
51+
from ..message import SendMessage, _MessageBase
52+
53+
if type(msg) == _ArgWrapper:
54+
msg = msg.message
55+
if isinstance(msg, SendMessage):
56+
return f"{str(type(msg))}(actor_ref={msg.actor_ref}, content={msg_to_simple_str(msg.content)})"
57+
if isinstance(msg, _MessageBase):
58+
return str(msg)
59+
if msg and isinstance(msg, typing.List):
60+
part_str = ", ".join([msg_to_simple_str(item) for item in msg[:5]])
61+
return f"List<{part_str}...{len(msg)}>"
62+
if msg and isinstance(msg, typing.Tuple):
63+
part_str = ", ".join([msg_to_simple_str(item) for item in msg[:5]])
64+
return f"Tuple<{part_str}...{len(msg)}>"
65+
if msg and isinstance(msg, typing.Dict):
66+
part_str = []
67+
it = iter(msg.items())
68+
try:
69+
while len(part_str) < 5:
70+
entry = next(it)
71+
part_str.append(
72+
f"k={msg_to_simple_str(entry[0])}, v={msg_to_simple_str(entry[1])}"
73+
)
74+
except StopIteration:
75+
pass
76+
part_str = ", ".join(part_str)
77+
return f"Dict<k={part_str}...{len(msg)}>"
78+
if isinstance(msg, (str, float, int)):
79+
return "{!s:.50}".format(msg)
80+
return str(type(msg))
81+
4282

4383
def _argwrapper_unpickler(serialized_message):
4484
return _ArgWrapper(deserialize(*serialized_message))
@@ -63,20 +103,45 @@ def __reduce__(self):
63103
init_metrics("ray")
64104
_ray_serialize = ray.serialization.SerializationContext.serialize
65105
_ray_deserialize_object = ray.serialization.SerializationContext._deserialize_object
106+
serialized_bytes_counter = Metrics.counter(
107+
"mars.channel_serialized_bytes",
108+
"The bytes serialized by mars ray channel.",
109+
)
110+
deserialized_bytes_counter = Metrics.counter(
111+
"mars.channel_deserialized_bytes",
112+
"The bytes deserialized by mars ray channel.",
113+
)
114+
serialization_time_mills = Metrics.counter(
115+
"mars.channel_serialization_time_mills",
116+
"The time used by mars ray channel serialization.",
117+
)
118+
deserialization_time_mills = Metrics.counter(
119+
"mars.channel_deserialization_time_mills",
120+
"The time used by mars ray channel deserialization.",
121+
)
66122

67123
def _serialize(self, value):
68124
if type(value) is _ArgWrapper: # pylint: disable=unidiomatic-typecheck
69125
message = value.message
70126
with Timer() as timer:
71127
serialized_object = _ray_serialize(self, value)
128+
bytes_length = serialized_object.total_bytes
129+
serialized_bytes_counter.record(bytes_length)
130+
serialization_time_mills.record(timer.duration * 1000)
131+
if timer.duration * 1000 > SERIALIZATION_TIMEOUT_MILLS: # pragma: no cover
132+
report_event(
133+
"WARNING",
134+
"SERIALIZATION_TIMEOUT",
135+
f"Serialization took {timer.duration} seconds for {bytes_length} sized message {msg_to_simple_str(message)}.",
136+
)
72137
try:
73138
if message.profiling_context is not None:
74139
task_id = message.profiling_context.task_id
75140
ProfilingData[task_id, "serialization"].inc(
76141
"serialize", timer.duration
77142
)
78-
except AttributeError:
79-
logger.debug(
143+
except AttributeError: # pragma: no cover
144+
logger.info(
80145
"Profiling serialization got error, the send "
81146
"message %s may not be an instance of message",
82147
type(message),
@@ -87,7 +152,20 @@ def _serialize(self, value):
87152

88153
def _deserialize_object(self, data, metadata, object_ref):
89154
start_time = time.time()
155+
bytes_length = 0
156+
if data:
157+
bytes_length = len(data)
158+
deserialized_bytes_counter.record(bytes_length)
90159
value = _ray_deserialize_object(self, data, metadata, object_ref)
160+
duration = time.time() - start_time
161+
deserialization_time_mills.record(duration * 1000)
162+
if duration * 1000 > DESERIALIZATION_TIMEOUT_MILLS: # pragma: no cover
163+
report_event(
164+
"WARNING",
165+
"DESERIALIZATION_TIMEOUT",
166+
f"Deserialization took {duration} seconds for "
167+
f"{bytes_length} sized msg {msg_to_simple_str(value)}",
168+
)
91169
if type(value) is _ArgWrapper: # pylint: disable=unidiomatic-typecheck
92170
message = value.message
93171
try:
@@ -96,8 +174,8 @@ def _deserialize_object(self, data, metadata, object_ref):
96174
ProfilingData[task_id, "serialization"].inc(
97175
"deserialize", time.time() - start_time
98176
)
99-
except AttributeError:
100-
logger.debug(
177+
except AttributeError: # pragma: no cover
178+
logger.info(
101179
"Profiling serialization got error, the recv "
102180
"message %s may not be an instance of message",
103181
type(message),
@@ -187,9 +265,22 @@ def _submit_task(self, message: Any, object_ref: "ray.ObjectRef"):
187265
async def handle_task(message: Any, object_ref: "ray.ObjectRef"):
188266
# use `%.500` to avoid print too long messages
189267
with debug_async_timeout(
190-
"ray_object_retrieval_timeout", "Client sent message is %.500s", message
268+
"ray_object_retrieval_timeout",
269+
"Message that client sent to actor %s is %.500s and object_ref is %s",
270+
self.dest_address,
271+
message,
272+
object_ref,
191273
):
192-
result = await object_ref
274+
try:
275+
result = await object_ref
276+
except Exception as e: # pragma: no cover
277+
logger.exception(
278+
"Get object %s from %s failed, got exception %s.",
279+
object_ref,
280+
self.dest_address,
281+
e,
282+
)
283+
raise
193284
if isinstance(result, RayChannelException):
194285
raise result.exc_value.with_traceback(result.exc_traceback)
195286
return result.message
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 1999-2021 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from .....tests.core import require_ray, mock
16+
from .....utils import lazy_import
17+
from ..utils import report_event
18+
19+
ray = lazy_import("ray")
20+
21+
22+
@require_ray
23+
@mock.patch("ray.report_event")
24+
def test_report_event(fake_report_event, ray_start_regular):
25+
arguments = []
26+
27+
def _report_event(*args):
28+
arguments.extend(args)
29+
30+
fake_report_event.side_effect = _report_event
31+
severity, label, message = "WARNING", "test_label", "test_message"
32+
report_event(severity, label, message)
33+
assert arguments == [ray.EventSeverity.WARNING, label, message]

mars/oscar/backends/ray/utils.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import enum
1515
import os
1616
import asyncio
1717
import logging
@@ -166,3 +166,34 @@ async def kill_and_wait(
166166
raise Exception(
167167
f"The actor {actor_handle} is not died after ray.kill {timeout} seconds."
168168
)
169+
170+
171+
if ray and not hasattr(ray, "report_event"): # pragma: no cover
172+
# lower version of ray doesn't support event
173+
174+
class EventSeverity(enum.Enum):
175+
INFO = 0
176+
WARNING = 1
177+
ERROR = 2
178+
FATAL = 3
179+
180+
def _report_event(severity, label, message):
181+
logger.warning(
182+
"severity: %s, label: %s, message: %s.", severity, label, message
183+
)
184+
185+
# lazy imported module can't override really module attr
186+
import ray
187+
188+
ray.EventSeverity = EventSeverity
189+
ray.report_event = _report_event
190+
191+
192+
def report_event(severity, label, message):
193+
if ray and ray.is_initialized():
194+
severity = (
195+
getattr(ray.EventSeverity, severity)
196+
if isinstance(severity, str)
197+
else severity
198+
)
199+
ray.report_event(severity, label, message)

0 commit comments

Comments
 (0)