Skip to content

Commit bde43bb

Browse files
fyrestone刘宝
andauthored
[Ray] Basic slow subtask detection (#3305)
* Basic slow subtask detection * Replace sklearn.utils._testing.assert_warns with pytest.warns * Fix load_boston * Fix rerun_time type * Remove rerun_time from Task and Subtask * Fix * Fix * Add UT * Fix lint * Fix lint * Refine logs * Fix * Refine code * Add check_slow_subtasks_iqr_ratio config Co-authored-by: 刘宝 <[email protected]>
1 parent 16843aa commit bde43bb

File tree

11 files changed

+274
-53
lines changed

11 files changed

+274
-53
lines changed

mars/contrib/dask/tests/test_dask.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ def test_unpartitioned_dataframe(setup_cluster):
7575
from dask import dataframe as dd
7676
from pandas._testing import assert_frame_equal
7777
import pandas as pd
78-
from sklearn.datasets import load_boston
78+
from sklearn.datasets import load_iris
7979

80-
boston = load_boston()
80+
boston = load_iris()
8181
pd.DataFrame(boston.data, columns=boston["feature_names"]).to_csv(
8282
"./boston_housing_data.csv"
8383
)
8484

8585
df = dd.read_csv(r"./boston_housing_data.csv")
86-
df["CRIM"] = df["CRIM"] / 2
86+
df["sepal length (cm)"] = df["sepal length (cm)"] / 2
8787

8888
dask_res = df.compute()
8989
assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler))

mars/learn/cluster/tests/test_k_means.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
try:
2323
from sklearn.datasets import make_blobs
2424
from sklearn.metrics.cluster import v_measure_score
25-
from sklearn.utils._testing import assert_raise_message, assert_warns
25+
from sklearn.utils._testing import assert_raise_message
2626
except ImportError:
2727
pass
2828

@@ -485,14 +485,13 @@ def test_k_means_function(setup):
485485
assert inertia > 0.0
486486

487487
# check warning when centers are passed
488-
assert_warns(
489-
RuntimeWarning,
490-
k_means,
491-
X,
492-
n_clusters=n_clusters,
493-
sample_weight=None,
494-
init=centers,
495-
)
488+
with pytest.warns(RuntimeWarning):
489+
k_means(
490+
X,
491+
n_clusters=n_clusters,
492+
sample_weight=None,
493+
init=centers,
494+
)
496495

497496
# to many clusters desired
498497
with pytest.raises(ValueError):

mars/learn/metrics/pairwise/tests/test_pariwise_distances.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sklearn.metrics import pairwise_distances as sk_pairwise_distances
1818
from sklearn.neighbors import NearestNeighbors as SkNearestNeighbors
1919
from sklearn.exceptions import DataConversionWarning
20-
from sklearn.utils._testing import assert_warns
20+
2121

2222
from ..... import tensor as mt
2323
from .....session import execute, fetch
@@ -63,7 +63,8 @@ def test_pairwise_distances_execution(setup):
6363
expected = sk_pairwise_distances(raw_x, raw_y, metric=m)
6464
np.testing.assert_almost_equal(result, expected)
6565

66-
assert_warns(DataConversionWarning, pairwise_distances, x, y, metric="jaccard")
66+
with pytest.warns(DataConversionWarning):
67+
pairwise_distances(x, y, metric="jaccard")
6768

6869
with pytest.raises(ValueError):
6970
_ = pairwise_distances(x, y, metric="unknown")

mars/learn/metrics/tests/test_ranking.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from sklearn.exceptions import UndefinedMetricWarning
2828
from sklearn.utils import check_random_state
2929
from sklearn.utils._testing import (
30-
assert_warns,
3130
assert_almost_equal,
3231
assert_array_almost_equal,
3332
)
@@ -166,14 +165,16 @@ def test_roc_curve_one_label(setup):
166165
y_pred = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
167166
# assert there are warnings
168167
w = UndefinedMetricWarning
169-
fpr, tpr, thresholds = assert_warns(w, roc_curve, y_true, y_pred)
168+
with pytest.warns(w):
169+
fpr, tpr, thresholds = roc_curve(y_true, y_pred)
170170
# all true labels, all fpr should be nan
171171
np.testing.assert_array_equal(fpr.fetch(), np.full(len(thresholds), np.nan))
172172
assert fpr.shape == tpr.shape
173173
assert fpr.shape == thresholds.shape
174174

175175
# assert there are warnings
176-
fpr, tpr, thresholds = assert_warns(w, roc_curve, [1 - x for x in y_true], y_pred)
176+
with pytest.warns(w):
177+
fpr, tpr, thresholds = roc_curve([1 - x for x in y_true], y_pred)
177178
# all negative labels, all tpr should be nan
178179
np.testing.assert_array_equal(tpr.fetch(), np.full(len(thresholds), np.nan))
179180
assert fpr.shape == tpr.shape

mars/learn/neighbors/tests/test_nearest_neighbors.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from sklearn.neighbors import NearestNeighbors as SkNearestNeighbors
2525
from sklearn.neighbors import BallTree as SkBallTree
2626
from sklearn.neighbors import KDTree as SkKDTree
27-
from sklearn.utils._testing import assert_warns
2827
except ImportError: # pragma: no cover
2928
SkNearestNeighbors = None
3029

@@ -66,7 +65,8 @@ def test_nearest_neighbors(setup):
6665
with pytest.raises(ValueError):
6766
_ = NearestNeighbors(algorithm="auto", metric="unknown")
6867

69-
assert_warns(SyntaxWarning, NearestNeighbors, metric_params={"p": 1})
68+
with pytest.warns(SyntaxWarning):
69+
NearestNeighbors(metric_params={"p": 1})
7070

7171
with pytest.raises(ValueError):
7272
_ = NearestNeighbors(metric="wminkowski", p=0)
@@ -105,7 +105,8 @@ def test_nearest_neighbors(setup):
105105
nn.fit(np.random.rand(0, 10))
106106

107107
nn = NearestNeighbors(algorithm="ball_tree")
108-
assert_warns(UserWarning, nn.fit, X_sparse)
108+
with pytest.warns(UserWarning):
109+
nn.fit(X_sparse)
109110

110111
nn = NearestNeighbors(metric="haversine")
111112
with pytest.raises(ValueError):

mars/learn/semi_supervised/tests/test_label_propagation.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sklearn.datasets import make_classification
1818
from sklearn.model_selection import train_test_split
1919
from sklearn.exceptions import ConvergenceWarning
20-
from sklearn.utils._testing import assert_no_warnings, assert_warns
20+
from sklearn.utils._testing import assert_no_warnings
2121

2222
from .... import tensor as mt
2323
from ...metrics.pairwise import rbf_kernel
@@ -103,7 +103,8 @@ def test_convergence_warning(setup):
103103
y = np.array([0, 1, -1])
104104

105105
mdl = LabelPropagation(kernel="rbf", max_iter=1)
106-
assert_warns(ConvergenceWarning, mdl.fit, X, y)
106+
with pytest.warns(ConvergenceWarning):
107+
mdl.fit(X, y)
107108
assert mdl.n_iter_ == mdl.max_iter
108109

109110
mdl = LabelPropagation(kernel="rbf", max_iter=500)

mars/services/subtask/core.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def is_done(self) -> bool:
5252

5353

5454
class Subtask(Serializable):
55-
__slots__ = ("_repr", "_pure_depend_keys")
55+
__slots__ = ("_repr", "_pure_depend_keys", "runtime")
5656

5757
subtask_id: str = StringField("subtask_id")
5858
subtask_name: str = StringField("subtask_name")
@@ -65,7 +65,6 @@ class Subtask(Serializable):
6565
virtual: bool = BoolField("virtual")
6666
retryable: bool = BoolField("retryable")
6767
priority: Tuple[int, int] = TupleField("priority", FieldTypes.int32)
68-
rerun_time: int = Int32Field("rerun_time")
6968
extra_config: dict = DictField("extra_config")
7069
stage_id: str = StringField("stage_id")
7170
# chunks that need meta updated
@@ -95,7 +94,6 @@ def __init__(
9594
priority: Tuple[int, int] = None,
9695
virtual: bool = False,
9796
retryable: bool = True,
98-
rerun_time: int = 0,
9997
extra_config: dict = None,
10098
stage_id: str = None,
10199
update_meta_chunks: List[ChunkType] = None,
@@ -116,7 +114,6 @@ def __init__(
116114
priority=priority,
117115
virtual=virtual,
118116
retryable=retryable,
119-
rerun_time=rerun_time,
120117
extra_config=extra_config,
121118
stage_id=stage_id,
122119
update_meta_chunks=update_meta_chunks,
@@ -129,11 +126,13 @@ def __init__(
129126
)
130127
self._pure_depend_keys = None
131128
self._repr = None
129+
self.runtime = None
132130

133131
def __on_deserialize__(self):
134132
super(Subtask, self).__on_deserialize__()
135133
self._pure_depend_keys = None
136134
self._repr = None
135+
self.runtime = None
137136

138137
@property
139138
def expect_band(self):

mars/services/task/core.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
Serializable,
2323
StringField,
2424
ReferenceField,
25-
Int32Field,
2625
BoolField,
2726
AnyField,
2827
DictField,
@@ -41,7 +40,6 @@ class Task(Serializable):
4140
session_id: str = StringField("session_id")
4241
tileable_graph: TileableGraph = ReferenceField("tileable_graph", TileableGraph)
4342
fuse_enabled: bool = BoolField("fuse_enabled")
44-
rerun_time: int = Int32Field("rerun_time")
4543
extra_config: dict = DictField("extra_config")
4644

4745
def __init__(
@@ -50,15 +48,13 @@ def __init__(
5048
session_id: str = None,
5149
tileable_graph: TileableGraph = None,
5250
fuse_enabled: bool = True,
53-
rerun_time: int = 0,
5451
extra_config: dict = None,
5552
):
5653
super().__init__(
5754
task_id=task_id,
5855
session_id=session_id,
5956
tileable_graph=tileable_graph,
6057
fuse_enabled=fuse_enabled,
61-
rerun_time=rerun_time,
6258
extra_config=extra_config,
6359
)
6460

mars/services/task/execution/ray/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
# The default interval seconds to update progress and collect garbage.
2929
DEFAULT_MONITOR_INTERVAL_SECONDS = 0 if IN_RAY_CI else 1
3030
DEFAULT_LOG_INTERVAL_SECONDS = 60
31+
DEFAULT_CHECK_SLOW_SUBTASKS_INTERVAL_SECONDS = 120
3132

3233

3334
@register_config_cls
@@ -77,6 +78,19 @@ def get_log_interval_seconds(self):
7778
"log_interval_seconds", DEFAULT_LOG_INTERVAL_SECONDS
7879
)
7980

81+
def get_check_slow_subtasks_interval_seconds(self) -> float:
82+
return self._ray_execution_config.get(
83+
"check_slow_subtasks_interval_seconds",
84+
DEFAULT_CHECK_SLOW_SUBTASKS_INTERVAL_SECONDS,
85+
)
86+
87+
def get_check_slow_subtask_iqr_ratio(self) -> float:
88+
# https://en.wikipedia.org/wiki/Box_plot
89+
# iqr = q3 - q1
90+
# duration_threshold = q3 + check_slow_subtasks_iqr_ratio * (q3 - q1)
91+
# So, the value == 3, extremely slow(probably hang); value == 1.5, slow
92+
return self._ray_execution_config.get("check_slow_subtasks_iqr_ratio", 3)
93+
8094
def get_shuffle_fetch_type(self) -> ShuffleFetchType:
8195
return ShuffleFetchType.FETCH_BY_INDEX
8296

0 commit comments

Comments
 (0)