Skip to content

Commit 692ff4d

Browse files
committed
Refactor metrics a bit to be easier to process. Switch to using a random hex for the group instead of variable.
1 parent e5446be commit 692ff4d

File tree

3 files changed

+56
-86
lines changed

3 files changed

+56
-86
lines changed

modin/core/storage_formats/base/query_compiler_calculator.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"""
2121

2222
import logging
23+
import random
2324
from types import MappingProxyType
2425
from typing import Any, Optional
2526

@@ -48,10 +49,6 @@ def __init__(self, backend: str, query_compiler: BaseQueryCompiler):
4849
self.max_cost = query_compiler.max_cost()
4950

5051

51-
# Global Variable Used to track groups of metrics
52-
hybrid_metrics_calc_group = 0
53-
54-
5552
class BackendCostCalculator:
5653
"""
5754
Calculate which Backend should be used for an operation.
@@ -143,7 +140,6 @@ def calculate(self) -> str:
143140

144141
min_value = None
145142
for k, v in self._backend_data.items():
146-
emit_metric(f"hybrid.cast.to.{k}.cost.{hybrid_metrics_calc_group}", v.cost)
147143
if v.cost > v.max_cost:
148144
continue
149145
if min_value is None or min_value > v.cost:
@@ -154,10 +150,26 @@ def calculate(self) -> str:
154150
logging.info(
155151
f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}"
156152
)
157-
DECIDED_TO_SWITCH = 1
153+
# Does not need to be secure, should not use system entropy
154+
metrics_group = "%04x" % random.randrange(16**4)
155+
for qc in self._qc_list:
156+
max_shape = qc._max_shape()
157+
backend = qc.get_backend()
158+
emit_metric(
159+
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows",
160+
max_shape[0],
161+
)
162+
emit_metric(
163+
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols",
164+
max_shape[1],
165+
)
166+
for k, v in self._backend_data.items():
167+
emit_metric(
168+
f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost
169+
)
158170
emit_metric(
159-
f"hybrid.cast.decision.{self._result_backend}.{hybrid_metrics_calc_group}",
160-
DECIDED_TO_SWITCH,
171+
f"hybrid.merge.decision.{self._result_backend}.group.{metrics_group}",
172+
1,
161173
)
162174

163175
if self._result_backend is None:

modin/core/storage_formats/pandas/query_compiler_caster.py

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import logging
2525
from abc import ABC, abstractmethod
2626
from collections import defaultdict, namedtuple
27+
import random
2728
from types import FunctionType, MappingProxyType, MethodType
2829
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, Union, ValuesView
2930

@@ -542,10 +543,6 @@ def _maybe_switch_backend_post_op(
542543
return result
543544

544545

545-
# Global Variable Used to track groups of metrics
546-
hybrid_metrics_group = 0
547-
548-
549546
def _get_backend_for_auto_switch(
550547
input_qc: BaseQueryCompiler,
551548
class_of_wrapped_fn: str,
@@ -583,6 +580,8 @@ def _get_backend_for_auto_switch(
583580
# backend.
584581
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
585582

583+
# Does not need to be secure, should not use system entropy
584+
metrics_group = "%04x" % random.randrange(16**4)
586585
starting_backend = input_qc.get_backend()
587586

588587
min_move_stay_delta = None
@@ -593,6 +592,23 @@ def _get_backend_for_auto_switch(
593592
operation=function_name,
594593
arguments=arguments,
595594
)
595+
data_max_shape = input_qc._max_shape()
596+
emit_metric(
597+
f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}",
598+
1,
599+
)
600+
emit_metric(
601+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost",
602+
stay_cost,
603+
)
604+
emit_metric(
605+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows",
606+
data_max_shape[0],
607+
)
608+
emit_metric(
609+
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols",
610+
data_max_shape[1],
611+
)
596612
for backend in Backend.get_active_backends():
597613
if backend in ("Ray", "Unidist", "Dask"):
598614
# Disable automatically switching to these engines for now, because
@@ -637,53 +653,32 @@ def _get_backend_for_auto_switch(
637653
):
638654
min_move_stay_delta = move_stay_delta
639655
best_backend = backend
640-
global hybrid_metrics_group
641656
emit_metric(
642-
f"hybrid.auto.from.{starting_backend}.to.{backend}.move_to_cost.{hybrid_metrics_group}",
657+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost",
643658
move_to_cost,
644659
)
645660
emit_metric(
646-
f"hybrid.auto.from.{starting_backend}.to.{backend}.stay_cost.{hybrid_metrics_group}",
647-
stay_cost,
648-
)
649-
emit_metric(
650-
f"hybrid.auto.from.{starting_backend}.to.{backend}.other_execute_cost.{hybrid_metrics_group}",
661+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost",
651662
other_execute_cost,
652663
)
653664
emit_metric(
654-
f"hybrid.auto.from.{starting_backend}.to.{backend}.delta.{hybrid_metrics_group}",
665+
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta",
655666
move_stay_delta,
656667
)
657-
SINGLE_EVENT = 1
658-
DECIDED_TO_SWITCH = 1
659-
DECIDED_NOT_TO_SWITCH = 0
660-
emit_metric(
661-
f"hybrid.auto.from.{starting_backend}.to.{backend}.decision.{best_backend}.{hybrid_metrics_group}",
662-
(
663-
DECIDED_TO_SWITCH
664-
if starting_backend != backend
665-
else DECIDED_NOT_TO_SWITCH
666-
),
667-
)
668-
emit_metric(
669-
f"hybrid.auto.from.{starting_backend}.to.{backend}.api_cls_name.{class_of_wrapped_fn}.{hybrid_metrics_group}",
670-
SINGLE_EVENT,
671-
)
672-
emit_metric(
673-
f"hybrid.auto.from.{starting_backend}.to.{backend}.function_name.{function_name}.{hybrid_metrics_group}",
674-
SINGLE_EVENT,
675-
)
676-
hybrid_metrics_group += 1
668+
677669
logging.info(
678670
f"After {class_of_wrapped_fn} function {function_name}, "
679671
+ f"considered moving to backend {backend} with "
680672
+ f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) "
681673
+ f", stay_cost {stay_cost}, and move-stay delta "
682674
+ f"{move_stay_delta}"
683675
)
676+
684677
if best_backend == starting_backend:
678+
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0)
685679
logging.info(f"Chose not to switch backends after operation {function_name}")
686680
else:
681+
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 1)
687682
logging.info(f"Chose to move to backend {best_backend}")
688683
return best_backend
689684

modin/tests/pandas/native_df_interoperability/test_compiler_caster.py

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,24 +1262,17 @@ def test_concat_with_pin(pin_backends, expected_backend):
12621262

12631263
def test_cast_metrics(pico_df, cluster_df):
12641264
try:
1265-
errors = 0
1265+
count = 0
12661266

12671267
def test_handler(metric: str, value) -> None:
1268-
nonlocal errors
1269-
if metric.startswith("modin.hybrid.cast"):
1270-
tokens = metric.split(".")
1271-
if tokens[4] == "Pico" and value == 750:
1272-
return
1273-
if tokens[4] == "Cluster" and value == 250:
1274-
return
1275-
if tokens[3] == "decision" and tokens[4] == "Cluster" and value == 1:
1276-
return
1277-
errors += 1
1268+
nonlocal count
1269+
if metric.startswith("modin.hybrid.merge"):
1270+
count += 1
12781271

12791272
add_metric_handler(test_handler)
12801273
df3 = pd.concat([pico_df, cluster_df], axis=1)
12811274
assert df3.get_backend() == "Cluster" # result should be on cluster
1282-
assert errors == 0
1275+
assert count == 7
12831276
finally:
12841277
clear_metric_handler(test_handler)
12851278

@@ -1290,43 +1283,13 @@ def test_switch_metrics(pico_df, cluster_df):
12901283
choices=("Big_Data_Cloud", "Small_Data_Local"),
12911284
):
12921285
try:
1293-
errors = 0
1286+
count = 0
12941287

12951288
def test_handler(metric: str, value) -> None:
1296-
nonlocal errors
1289+
nonlocal count
12971290
if metric.startswith("modin.hybrid.auto"):
1298-
tokens = metric.split(".")
12991291
assert "from.Big_Data_Cloud.to.Small_Data_Local" in metric
1300-
if (
1301-
tokens[7] == "stay_cost"
1302-
and value == QCCoercionCost.COST_IMPOSSIBLE
1303-
):
1304-
return
1305-
if tokens[7] == "other_execute_cost" and value == 1000:
1306-
return
1307-
if tokens[7] == "move_to_cost" and value == 0:
1308-
return
1309-
if tokens[7] == "delta" and value == 0:
1310-
return
1311-
if (
1312-
tokens[7] == "decision"
1313-
and tokens[8] == "Big_Data_Cloud"
1314-
and value == 1
1315-
):
1316-
return
1317-
if (
1318-
tokens[7] == "api_cls_name"
1319-
and tokens[8] == "DataFrame"
1320-
and value == 1
1321-
):
1322-
return
1323-
if (
1324-
tokens[7] == "function_name"
1325-
and tokens[8] == "describe"
1326-
and value == 1
1327-
):
1328-
return
1329-
errors += 1
1292+
count += 1
13301293

13311294
add_metric_handler(test_handler)
13321295

@@ -1338,7 +1301,7 @@ def test_handler(metric: str, value) -> None:
13381301
df = pd.DataFrame([1] * 10)
13391302
assert df.get_backend() == "Big_Data_Cloud"
13401303
df.describe()
1341-
assert errors == 0
1304+
assert count == 9
13421305
finally:
13431306
clear_metric_handler(test_handler)
13441307

0 commit comments

Comments
 (0)