Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,12 @@ def set_active_backends(cls, new_choices: tuple) -> None:
ValueError
Raises a ValueError when the set of new_choices are not already registered
"""
if not all(i in cls._BACKEND_TO_EXECUTION for i in new_choices):
raise ValueError(
f"Active backend choices {new_choices} are not all registered."
)
registered_backends = cls._BACKEND_TO_EXECUTION
for i in new_choices:
if i not in registered_backends:
raise ValueError(
f"Active backend choices {new_choices} are not all registered."
)
cls.choices = new_choices

@classmethod
Expand Down
36 changes: 25 additions & 11 deletions modin/core/storage_formats/base/query_compiler_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,31 @@ def calculate(self) -> str:
if cost is not None:
self._add_cost_data(backend_to, cost)

min_value = None
for k, v in self._backend_data.items():
if v.cost > v.max_cost:
continue
if min_value is None or min_value > v.cost:
min_value = v.cost
self._result_backend = k
self._result_backend = None

def get_min_cost_backend(skip_exceeds_max_cost=True) -> str:
result = None
min_value = None
for k, v in self._backend_data.items():
if skip_exceeds_max_cost and v.cost > v.max_cost:
continue
if min_value is None or min_value > v.cost:
min_value = v.cost
result = k
return result

# Get the best backend, skipping backends where we may exceed
# the total cost
self._result_backend = get_min_cost_backend(skip_exceeds_max_cost=True)

# If we still do not have a backend, pick the best backend while
# ignoring max_cost
if self._result_backend is None:
self._result_backend = get_min_cost_backend(skip_exceeds_max_cost=False)

# This should not happen
if self._result_backend is None:
raise ValueError("No backends are available to calculate costs.")

if len(self._backend_data) > 1:
get_logger().info(
Expand Down Expand Up @@ -267,10 +285,6 @@ def calculate(self) -> str:
1,
)

if self._result_backend is None:
raise ValueError(
f"Cannot cast to any of the available backends, as the estimated cost is too high. Tried these backends: [{', '.join(self._backend_data.keys())}]"
)
return self._result_backend

def _add_cost_data(self, backend, cost):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,6 @@ def test_cast_to_first_backend_with___init__(pico_df, cluster_df):
assert df3.get_backend() == "Cluster" # result should be on cluster


def test_no_solution(pico_df, local_df, cluster_df, cloud_df):
# Backends should appear in the order of arguments, followed by any active backends not present
# among the arguments.
with pytest.raises(ValueError, match=r"Pico, Local_Machine, Cluster, Cloud"):
pd.concat(axis=1, objs=[pico_df, local_df, cluster_df, cloud_df])


def test_self_cost_causes_move(cloud_high_self_df, cluster_df):
"""
Test that ``self_cost`` is being properly considered.
Expand Down Expand Up @@ -572,9 +565,9 @@ def test_self_cost_causes_move(cloud_high_self_df, cluster_df):
("cloud_df", "cloud_df", "cloud_df", "cloud_df", "Cloud"),
# moving all dfs to cloud is 1250, moving to cluster is 1000
# regardless of how they are ordered
("pico_df", "local_df", "cluster_df", "cloud_df", None),
("cloud_df", "local_df", "cluster_df", "pico_df", None),
("cloud_df", "cluster_df", "local_df", "pico_df", None),
("pico_df", "local_df", "cluster_df", "cloud_df", "Cluster"),
("cloud_df", "local_df", "cluster_df", "pico_df", "Cluster"),
("cloud_df", "cluster_df", "local_df", "pico_df", "Cluster"),
("cloud_df", "cloud_df", "local_df", "pico_df", "Cloud"),
# Still move everything to cloud
("pico_df", "pico_df", "pico_df", "cloud_df", "Cloud"),
Expand Down Expand Up @@ -769,6 +762,19 @@ def test_switch_local_to_cloud_with_iloc___setitem__(local_df, cloud_df, pin_loc
assert local_df.get_backend() == "Local_Machine" if pin_local else "Cloud"


# This test should force the creation of a dataframe which
# is too large for the backend and verify that it stays there
# because there are no other options
def test_single_backend_merge_no_good_options():
with backend_test_context(
test_backend="Small_Data_Local",
choices=["Small_Data_Local"],
):
df1 = pd.DataFrame({"a": [1] * 100})
df1["two"] = pd.to_datetime(df1["a"])
assert df1.get_backend() == "Small_Data_Local"


def test_stay_or_move_evaluation(cloud_high_self_df, default_df):
default_cls = type(default_df._get_query_compiler())
cloud_cls = type(cloud_high_self_df._get_query_compiler())
Expand Down
Loading