From 268954106cb8efee11afdf6418be5fa823f02f78 Mon Sep 17 00:00:00 2001 From: John Kew Date: Fri, 25 Jul 2025 00:00:19 -0700 Subject: [PATCH 1/2] NativeQueryCompiler performance improvements WIP for discussion which does two things: - allows for fewer deep copies - overrides default_to_pandas with a version which reduces copies - comment out various metrics ( probably not a huge factor here ) --- .../base/query_compiler_calculator.py | 32 +++---- .../pandas/native_query_compiler.py | 13 ++- .../pandas/query_compiler_caster.py | 87 ++++++++++--------- 3 files changed, 69 insertions(+), 63 deletions(-) diff --git a/modin/core/storage_formats/base/query_compiler_calculator.py b/modin/core/storage_formats/base/query_compiler_calculator.py index c6353027d66..4c0476daeba 100644 --- a/modin/core/storage_formats/base/query_compiler_calculator.py +++ b/modin/core/storage_formats/base/query_compiler_calculator.py @@ -162,22 +162,22 @@ def calculate(self) -> str: f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}" ) # Does not need to be secure, should not use system entropy - metrics_group = "%04x" % random.randrange(16**4) - for qc in self._qc_list: - max_shape = qc._max_shape() - backend = qc.get_backend() - emit_metric( - f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows", - max_shape[0], - ) - emit_metric( - f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols", - max_shape[1], - ) - for k, v in self._backend_data.items(): - emit_metric( - f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost - ) + metrics_group = 1 #"%04x" % random.randrange(16**4) + #for qc in self._qc_list: + # max_shape = qc._max_shape() + # backend = qc.get_backend() + # emit_metric( + # f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows", + # max_shape[0], + # ) + # emit_metric( + # f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols", + # max_shape[1], + # ) + #for k, v in self._backend_data.items(): + # emit_metric( + # f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost + # ) emit_metric( f"hybrid.merge.decision.{self._result_backend}.group.{metrics_group}", 1, diff --git a/modin/core/storage_formats/pandas/native_query_compiler.py b/modin/core/storage_formats/pandas/native_query_compiler.py index 7d89b50e5f6..713467150ab 100644 --- a/modin/core/storage_formats/pandas/native_query_compiler.py +++ b/modin/core/storage_formats/pandas/native_query_compiler.py @@ -102,7 +102,7 @@ class NativeQueryCompiler(BaseQueryCompiler): _modin_frame: pandas.DataFrame _should_warn_on_default_to_pandas: bool = False - def __init__(self, pandas_frame): + def __init__(self, pandas_frame, in_place=False): if hasattr(pandas_frame, "_to_pandas"): pandas_frame = pandas_frame._to_pandas() if is_scalar(pandas_frame): @@ -112,10 +112,10 @@ def __init__(self, pandas_frame): # so that we don't modify it. # TODO(https://github.com/modin-project/modin/issues/7435): Look # into avoiding this copy. - pandas_frame = pandas_frame.copy() + if not in_place: + pandas_frame = pandas_frame.copy() else: pandas_frame = pandas.DataFrame(pandas_frame) - self._modin_frame = pandas_frame storage_format = property( @@ -126,6 +126,11 @@ def __init__(self, pandas_frame): def execute(self): pass + + def default_to_pandas(self, pandas_op, *args, **kwargs): + #print("new default to pandas") + return type(self)(pandas_op(self._modin_frame, *args, **kwargs), in_place=True) + @property def frame_has_materialized_dtypes(self) -> bool: """ @@ -202,7 +207,7 @@ def to_pandas(self): @classmethod def from_pandas(cls, df, data_cls): - return cls(df) + return cls(df, in_place=True) @classmethod def from_arrow(cls, at, data_cls): diff --git a/modin/core/storage_formats/pandas/query_compiler_caster.py b/modin/core/storage_formats/pandas/query_compiler_caster.py index 68a80ddbdff..0cfd2fa9abd 100644 --- a/modin/core/storage_formats/pandas/query_compiler_caster.py +++ b/modin/core/storage_formats/pandas/query_compiler_caster.py @@ -768,7 +768,7 @@ def _get_backend_for_auto_switch( from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher # Does not need to be secure, should not use system entropy - metrics_group = "%04x" % random.randrange(16**4) + metrics_group = 1 #"%04x" % random.randrange(16**4) starting_backend = input_qc.get_backend() min_move_stay_delta = None @@ -779,23 +779,23 @@ def _get_backend_for_auto_switch( operation=function_name, arguments=arguments, ) - data_max_shape = input_qc._max_shape() - emit_metric( - f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}", - 1, - ) - emit_metric( - f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost", - stay_cost, - ) - emit_metric( - f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows", - data_max_shape[0], - ) - emit_metric( - f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols", - data_max_shape[1], - ) + #data_max_shape = input_qc._max_shape() + #emit_metric( + # f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}", + # 1, + #) + #emit_metric( + # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost", + # stay_cost, + #) + #emit_metric( + # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows", + # data_max_shape[0], + #) + #emit_metric( + # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols", + # data_max_shape[1], + #) for backend in Backend.get_active_backends(): if backend in ("Ray", "Unidist", "Dask"): # Disable automatically switching to these engines for now, because @@ -840,35 +840,36 @@ def _get_backend_for_auto_switch( ): min_move_stay_delta = move_stay_delta best_backend = backend - emit_metric( - f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost", - move_to_cost, - ) - emit_metric( - f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost", - other_execute_cost, - ) - emit_metric( - f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta", - move_stay_delta, - ) - - get_logger().info( - f"After {_normalize_class_name(class_of_wrapped_fn)} function {function_name}, " - + f"considered moving to backend {backend} with " - + f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) " - + f", stay_cost {stay_cost}, and move-stay delta " - + f"{move_stay_delta}" - ) + #emit_metric( + # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost", + # move_to_cost, + #) + #emit_metric( + # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost", + # other_execute_cost, + #) + #emit_metric( + # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta", + # move_stay_delta, + #) + + #get_logger().info( + # f"After {_normalize_class_name(class_of_wrapped_fn)} function {function_name}, " + # + f"considered moving to backend {backend} with " + # + f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) " + # + f", stay_cost {stay_cost}, and move-stay delta " + # + f"{move_stay_delta}" + #) if best_backend == starting_backend: - emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0) - get_logger().info( - f"Chose not to switch backends after operation {function_name}" - ) + pass + #emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0) + #get_logger().info( + # f"Chose not to switch backends after operation {function_name}" + #) else: emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 1) - get_logger().info(f"Chose to move to backend {best_backend}") + #get_logger().info(f"Chose to move to backend {best_backend}") return best_backend From e9da120959a93d13388fe3627262eb3629bf40c7 Mon Sep 17 00:00:00 2001 From: John Kew Date: Fri, 25 Jul 2025 13:49:57 -0700 Subject: [PATCH 2/2] re-add metrics --- .../base/query_compiler_calculator.py | 32 +++---- .../pandas/query_compiler_caster.py | 87 +++++++++---------- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/modin/core/storage_formats/base/query_compiler_calculator.py b/modin/core/storage_formats/base/query_compiler_calculator.py index 4c0476daeba..c6353027d66 100644 --- a/modin/core/storage_formats/base/query_compiler_calculator.py +++ b/modin/core/storage_formats/base/query_compiler_calculator.py @@ -162,22 +162,22 @@ def calculate(self) -> str: f"BackendCostCalculator Results: {self._calc_result_log(self._result_backend)}" ) # Does not need to be secure, should not use system entropy - metrics_group = 1 #"%04x" % random.randrange(16**4) - #for qc in self._qc_list: - # max_shape = qc._max_shape() - # backend = qc.get_backend() - # emit_metric( - # f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows", - # max_shape[0], - # ) - # emit_metric( - # f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols", - # max_shape[1], - # ) - #for k, v in self._backend_data.items(): - # emit_metric( - # f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost - # ) + metrics_group = "%04x" % random.randrange(16**4) + for qc in self._qc_list: + max_shape = qc._max_shape() + backend = qc.get_backend() + emit_metric( + f"hybrid.merge.candidate.{backend}.group.{metrics_group}.rows", + max_shape[0], + ) + emit_metric( + f"hybrid.merge.candidate.{backend}.group.{metrics_group}.cols", + max_shape[1], + ) + for k, v in self._backend_data.items(): + emit_metric( + f"hybrid.merge.candidate.{k}.group.{metrics_group}.cost", v.cost + ) emit_metric( f"hybrid.merge.decision.{self._result_backend}.group.{metrics_group}", 1, diff --git a/modin/core/storage_formats/pandas/query_compiler_caster.py b/modin/core/storage_formats/pandas/query_compiler_caster.py index 0cfd2fa9abd..68a80ddbdff 100644 --- a/modin/core/storage_formats/pandas/query_compiler_caster.py +++ b/modin/core/storage_formats/pandas/query_compiler_caster.py @@ -768,7 +768,7 @@ def _get_backend_for_auto_switch( from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher # Does not need to be secure, should not use system entropy - metrics_group = 1 #"%04x" % random.randrange(16**4) + metrics_group = "%04x" % random.randrange(16**4) starting_backend = input_qc.get_backend() min_move_stay_delta = None @@ -779,23 +779,23 @@ def _get_backend_for_auto_switch( operation=function_name, arguments=arguments, ) - #data_max_shape = input_qc._max_shape() - #emit_metric( - # f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}", - # 1, - #) - #emit_metric( - # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost", - # stay_cost, - #) - #emit_metric( - # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows", - # data_max_shape[0], - #) - #emit_metric( - # f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols", - # data_max_shape[1], - #) + data_max_shape = input_qc._max_shape() + emit_metric( + f"hybrid.auto.api.{class_of_wrapped_fn}.{function_name}.group.{metrics_group}", + 1, + ) + emit_metric( + f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.stay_cost", + stay_cost, + ) + emit_metric( + f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.rows", + data_max_shape[0], + ) + emit_metric( + f"hybrid.auto.current.{starting_backend}.group.{metrics_group}.cols", + data_max_shape[1], + ) for backend in Backend.get_active_backends(): if backend in ("Ray", "Unidist", "Dask"): # Disable automatically switching to these engines for now, because @@ -840,36 +840,35 @@ def _get_backend_for_auto_switch( ): min_move_stay_delta = move_stay_delta best_backend = backend - #emit_metric( - # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost", - # move_to_cost, - #) - #emit_metric( - # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost", - # other_execute_cost, - #) - #emit_metric( - # f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta", - # move_stay_delta, - #) - - #get_logger().info( - # f"After {_normalize_class_name(class_of_wrapped_fn)} function {function_name}, " - # + f"considered moving to backend {backend} with " - # + f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) " - # + f", stay_cost {stay_cost}, and move-stay delta " - # + f"{move_stay_delta}" - #) + emit_metric( + f"hybrid.auto.candidate.{backend}.group.{metrics_group}.move_to_cost", + move_to_cost, + ) + emit_metric( + f"hybrid.auto.candidate.{backend}.group.{metrics_group}.other_execute_cost", + other_execute_cost, + ) + emit_metric( + f"hybrid.auto.candidate.{backend}.group.{metrics_group}.delta", + move_stay_delta, + ) + + get_logger().info( + f"After {_normalize_class_name(class_of_wrapped_fn)} function {function_name}, " + + f"considered moving to backend {backend} with " + + f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) " + + f", stay_cost {stay_cost}, and move-stay delta " + + f"{move_stay_delta}" + ) if best_backend == starting_backend: - pass - #emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0) - #get_logger().info( - # f"Chose not to switch backends after operation {function_name}" - #) + emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 0) + get_logger().info( + f"Chose not to switch backends after operation {function_name}" + ) else: emit_metric(f"hybrid.auto.decision.{best_backend}.group.{metrics_group}", 1) - #get_logger().info(f"Chose to move to backend {best_backend}") + get_logger().info(f"Chose to move to backend {best_backend}") return best_backend