Skip to content

Commit 11dc135

Browse files
authored
Add metrics on operand and subtask executions (#2947)
1 parent 21b1c1a commit 11dc135

File tree

6 files changed

+104
-1
lines changed

6 files changed

+104
-1
lines changed

mars/core/operand/core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
except ImportError: # pragma: no cover
2323
UFuncTypeError = None
2424

25+
from ...metrics import Metrics
2526
from ...typing import TileableType, ChunkType, OperandType
2627
from ...utils import calc_data_size
2728
from ..context import Context
@@ -39,6 +40,11 @@
3940
_op_type_to_size_estimator: Dict[Type[OperandType], Callable] = dict()
4041

4142

43+
op_executed_number = Metrics.counter(
44+
"mars.operand.executed_number", "The number of executed operands.", ("op",)
45+
)
46+
47+
4248
class TileableOperandMixin:
4349
__slots__ = ()
4450

@@ -484,6 +490,7 @@ def execute(results: Dict[str, Any], op: OperandType):
484490
try:
485491
result = executor(results, op)
486492
succeeded = True
493+
op_executed_number.record(1, {"op": op.__class__.__name__})
487494
return result
488495
except UFuncTypeError as e: # pragma: no cover
489496
raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None

mars/metrics/backends/metric.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,15 @@ class AbstractMetric(ABC):
2929
def __init__(
3030
self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None
3131
):
32+
assert isinstance(name, str), "Argument name should be a str"
33+
assert isinstance(description, str), "Argument description should be a str"
34+
if tag_keys is not None:
35+
assert isinstance(tag_keys, tuple) and all(
36+
isinstance(tag, str) for tag in tag_keys
37+
), "Argument tag_keys should be a tuple and its elements should be str"
3238
self._name = name
3339
self._description = description
34-
self._tag_keys = tuple(tag_keys) if tag_keys else tuple()
40+
self._tag_keys = tag_keys or tuple()
3541
self._init()
3642

3743
@property

mars/metrics/backends/tests/test_metric.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import pytest
16+
1517
from ..metric import (
1618
AbstractMetric,
1719
AbstractCounter,
@@ -21,6 +23,24 @@
2123
)
2224

2325

26+
def test_illegal_arguments():
27+
class DummyMetric(AbstractMetric):
28+
pass
29+
30+
DummyMetric.__abstractmethods__ = set()
31+
with pytest.raises(AssertionError):
32+
DummyMetric(1)
33+
34+
with pytest.raises(AssertionError):
35+
DummyMetric("dummy_metric", 1)
36+
37+
with pytest.raises(AssertionError):
38+
DummyMetric("dummy_metric", "A test metric", "service")
39+
40+
with pytest.raises(AssertionError):
41+
DummyMetric("dummy_metric", "A test metric", ("service", 1))
42+
43+
2444
def test_dummy_metric():
2545
class DummyMetric(AbstractMetric):
2646
pass

mars/services/scheduling/supervisor/manager.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from .... import oscar as mo
2323
from ....lib.aio import alru_cache
24+
from ....metrics import Metrics
2425
from ....oscar.backends.context import ProfilingContext
2526
from ....oscar.errors import MarsError
2627
from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING
@@ -82,6 +83,21 @@ def __init__(
8283
self._speculation_config = speculation_config or {}
8384
self._queueing_ref = None
8485
self._global_resource_ref = None
86+
self._submitted_subtask_count = Metrics.counter(
87+
"mars.scheduling.submitted_subtask_count",
88+
"The count of submitted subtasks to all bands.",
89+
("session_id", "task_id", "stage_id"),
90+
)
91+
self._finished_subtask_count = Metrics.counter(
92+
"mars.scheduling.finished_subtask_count",
93+
"The count of finished subtasks of all bands.",
94+
("session_id", "task_id", "stage_id"),
95+
)
96+
self._canceled_subtask_count = Metrics.counter(
97+
"mars.scheduling.canceled_subtask_count",
98+
"The count of canceled subtasks of all bands.",
99+
("session_id", "task_id", "stage_id"),
100+
)
85101
logger.info(
86102
"Created SubtaskManager with subtask_max_reschedules %s, "
87103
"speculation_config %s",
@@ -167,6 +183,14 @@ async def finish_subtasks(
167183
for subtask_id, subtask_band in zip(subtask_ids, bands):
168184
subtask_info = self._subtask_infos.get(subtask_id, None)
169185
if subtask_info is not None:
186+
self._finished_subtask_count.record(
187+
1,
188+
{
189+
"session_id": self._session_id,
190+
"task_id": subtask_info.subtask.task_id,
191+
"stage_id": subtask_info.subtask.stage_id,
192+
},
193+
)
170194
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
171195
is_finished=True
172196
)
@@ -236,6 +260,14 @@ async def submit_subtask_to_band(self, subtask_id: str, band: BandType):
236260
if enable_profiling
237261
else None
238262
)
263+
self._submitted_subtask_count.record(
264+
1,
265+
{
266+
"session_id": self._session_id,
267+
"task_id": subtask_info.subtask.task_id,
268+
"stage_id": subtask_info.subtask.stage_id,
269+
},
270+
)
239271
logger.debug("Start run subtask %s in band %s.", subtask_id, band)
240272
with Timer() as timer:
241273
task = asyncio.create_task(
@@ -388,6 +420,14 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks):
388420
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
389421
is_finished=True, is_cancelled=True
390422
)
423+
self._canceled_subtask_count.record(
424+
1,
425+
{
426+
"session_id": self._session_id,
427+
"task_id": subtask_info.subtask.task_id,
428+
"stage_id": subtask_info.subtask.stage_id,
429+
},
430+
)
391431
await self._queueing_ref.submit_subtasks.tell()
392432
logger.info("Subtasks %s canceled.", subtask_ids)
393433

mars/services/scheduling/supervisor/queueing.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from .... import oscar as mo
2424
from ....lib.aio import alru_cache
25+
from ....metrics import Metrics
2526
from ....resource import ZeroResource
2627
from ....utils import dataslots
2728
from ...subtask import Subtask
@@ -70,6 +71,16 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
7071

7172
self._periodical_submit_task = None
7273
self._submit_period = submit_period or _DEFAULT_SUBMIT_PERIOD
74+
self._submitted_subtask_number = Metrics.gauge(
75+
"mars.band.submitted_subtask_number",
76+
"The number of submitted subtask to a band.",
77+
("session_id", "band"),
78+
)
79+
self._unsubmitted_subtask_number = Metrics.gauge(
80+
"mars.band.unsubmitted_subtask_number",
81+
"The number of unsubmitted subtask to a band.",
82+
("session_id", "band"),
83+
)
7384

7485
async def __post_create__(self):
7586
from ...cluster import ClusterAPI
@@ -246,6 +257,12 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
246257
self, [item.subtask for item in submit_items.values()]
247258
):
248259
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
260+
tags = {
261+
"session_id": self._session_id,
262+
"band": band[0] if band else "",
263+
}
264+
self._submitted_subtask_number.record(len(submitted_ids), tags)
265+
self._unsubmitted_subtask_number.record(len(non_submitted_ids), tags)
249266
if submitted_ids:
250267
for stid in subtask_ids:
251268
if stid not in submitted_ids:

mars/services/scheduling/worker/execution.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from ....core.graph import DAG
2727
from ....core.operand import Fetch, FetchShuffle
2828
from ....lib.aio import alru_cache
29+
from ....metrics import Metrics
2930
from ....oscar.errors import MarsError
3031
from ....storage import StorageLevel
3132
from ....utils import dataslots, get_chunk_key_to_data_keys, wrap_exception
@@ -145,6 +146,16 @@ def __init__(
145146
self._data_prepare_timeout = data_prepare_timeout
146147

147148
self._subtask_info = dict()
149+
self._submitted_subtask_count = Metrics.counter(
150+
"mars.band.submitted_subtask_count",
151+
"The count of submitted subtasks to the current band.",
152+
("band",),
153+
)
154+
self._finished_subtask_count = Metrics.counter(
155+
"mars.band.finished_subtask_count",
156+
"The count of finished subtasks of the current band.",
157+
("band",),
158+
)
148159

149160
async def __post_create__(self):
150161
self._cluster_api = await ClusterAPI.create(self.address)
@@ -499,6 +510,7 @@ async def run_subtask(
499510
logger.debug(
500511
"Start to schedule subtask %s on %s.", subtask.subtask_id, self.address
501512
)
513+
self._submitted_subtask_count.record(1, {"band": self.address})
502514
with mo.debug.no_message_trace():
503515
task = asyncio.create_task(
504516
self.ref().internal_run_subtask(subtask, band_name)
@@ -519,6 +531,7 @@ async def run_subtask(
519531
)
520532
result = await task
521533
self._subtask_info.pop(subtask.subtask_id, None)
534+
self._finished_subtask_count.record(1, {"band": self.address})
522535
logger.debug("Subtask %s finished with result %s", subtask.subtask_id, result)
523536
return result
524537

0 commit comments

Comments
 (0)