Skip to content
Merged
26 changes: 25 additions & 1 deletion modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
"""

import logging
import random
from types import MappingProxyType
from typing import Any, Optional

from modin.core.storage_formats.base.query_compiler import (
BaseQueryCompiler,
QCCoercionCost,
)
from modin.logging.metrics import emit_metric


class AggregatedBackendData:
Expand Down Expand Up @@ -102,6 +104,8 @@
"""
if self._result_backend is not None:
return self._result_backend
if len(self._qc_list) == 1:
return self._qc_list[0].get_backend()

Check warning on line 108 in modin/core/storage_formats/base/query_compiler_calculator.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler_calculator.py#L108

Added line #L108 was not covered by tests
if len(self._qc_list) == 0:
raise ValueError("No query compilers registered")

Expand Down Expand Up @@ -146,12 +150,32 @@
logging.info(
f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}"
)
# Does not need to be secure, should not use system entropy
metrics_group = "%04x" % random.randrange(16**4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was originally a monotonic counter, right? Why is it random now?

for qc in self._qc_list:
max_shape = qc._max_shape()
backend = qc.get_backend()
emit_metric(
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows",
max_shape[0],
)
emit_metric(
f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols",
max_shape[1],
)
for k, v in self._backend_data.items():
emit_metric(
f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost
)
emit_metric(
f"hybrid.merge.decision.{self._result_backend}.group.{metrics_group}",
1,
)

if self._result_backend is None:
raise ValueError(
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{','.join(self._backend_data.keys())}]"
)

return self._result_backend

def _add_cost_data(self, backend, cost):
Expand Down
37 changes: 37 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import functools
import inspect
import logging
import random
from abc import ABC, abstractmethod
from collections import defaultdict, namedtuple
from types import FunctionType, MappingProxyType, MethodType
Expand All @@ -42,6 +43,7 @@
)
from modin.error_message import ErrorMessage
from modin.logging import disable_logging
from modin.logging.metrics import emit_metric
from modin.utils import sentinel

Fn = TypeVar("Fn", bound=Any)
Expand Down Expand Up @@ -724,6 +726,8 @@ def _get_backend_for_auto_switch(
# backend.
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

# Does not need to be secure, should not use system entropy
metrics_group = "%04x" % random.randrange(16**4)
starting_backend = input_qc.get_backend()

min_move_stay_delta = None
Expand All @@ -734,6 +738,23 @@ def _get_backend_for_auto_switch(
operation=function_name,
arguments=arguments,
)
data_max_shape = input_qc._max_shape()
emit_metric(
f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}",
1,
)
emit_metric(
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost",
stay_cost,
)
emit_metric(
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows",
data_max_shape[0],
)
emit_metric(
f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols",
data_max_shape[1],
)
for backend in Backend.get_active_backends():
if backend in ("Ray", "Unidist", "Dask"):
# Disable automatically switching to these engines for now, because
Expand Down Expand Up @@ -778,16 +799,32 @@ def _get_backend_for_auto_switch(
):
min_move_stay_delta = move_stay_delta
best_backend = backend
emit_metric(
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost",
move_to_cost,
)
emit_metric(
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost",
other_execute_cost,
)
emit_metric(
f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta",
move_stay_delta,
)

logging.info(
f"After {class_of_wrapped_fn} function {function_name}, "
+ f"considered moving to backend {backend} with "
+ f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) "
+ f", stay_cost {stay_cost}, and move-stay delta "
+ f"{move_stay_delta}"
)

if best_backend == starting_backend:
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0)
logging.info(f"Chose not to switch backends after operation {function_name}")
else:
emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 1)
logging.info(f"Chose to move to backend {best_backend}")
return best_backend

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
register_function_for_post_op_switch,
register_function_for_pre_op_switch,
)
from modin.logging.metrics import add_metric_handler, clear_metric_handler
from modin.pandas.api.extensions import register_pd_accessor
from modin.tests.pandas.utils import create_test_dfs, df_equals, eval_general

Expand Down Expand Up @@ -1308,3 +1309,48 @@ class AQC(NativeQueryCompiler):

assert qc._engine_max_size() == oldmax
assert qc._transfer_threshold() == oldthresh


def test_cast_metrics(pico_df, cluster_df):
try:
count = 0

def test_handler(metric: str, value) -> None:
nonlocal count
if metric.startswith("modin.hybrid.merge"):
count += 1

add_metric_handler(test_handler)
df3 = pd.concat([pico_df, cluster_df], axis=1)
assert df3.get_backend() == "Cluster" # result should be on cluster
assert count == 7
finally:
clear_metric_handler(test_handler)


def test_switch_metrics(pico_df, cluster_df):
with backend_test_context(
test_backend="Big_Data_Cloud",
choices=("Big_Data_Cloud", "Small_Data_Local"),
):
try:
count = 0

def test_handler(metric: str, value) -> None:
nonlocal count
if metric.startswith("modin.hybrid.auto"):
count += 1

add_metric_handler(test_handler)

register_function_for_pre_op_switch(
class_name="DataFrame",
backend="Big_Data_Cloud",
method="describe",
)
df = pd.DataFrame([1] * 10)
assert df.get_backend() == "Big_Data_Cloud"
df.describe()
assert count == 8
finally:
clear_metric_handler(test_handler)
Loading