Skip to content

Commit c8363c7

Browse files
fyrestone刘宝
andauthored
[Ray] Add metrics for Ray executor (#3295)
* Add metrics for Ray executor * Pin faiss-cpu<1.7.3 * Fix * Fix * Fix asv * Pin cryptography<38.0.3 * Debug CI * Debug CI * Debug CI * Install ray for asv benchmark * Fallback to dump dot file if dot executable is not found * Add MARS_DUMP_SUBTASK_GRAPH_DIR * Create dump dir if not exist * Fix asv benchmark * Fix * Force pin setuptools before any installations * Revert "Force pin setuptools before any installations" This reverts commit 472b7c9. * Force pin setuptools<64 * Refine metrics name * Change merge logs from debug to info * Change fetch meta log of ray executor from info to debug * Refine merge logs * Fix merge log Co-authored-by: 刘宝 <[email protected]>
1 parent 5b6cce9 commit c8363c7

File tree

11 files changed

+108
-53
lines changed

11 files changed

+108
-53
lines changed

.github/workflows/benchmark-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
shell: bash
3636
run: |
3737
source ./ci/install-conda.sh
38-
python -m pip install --upgrade pip setuptools wheel coverage;
38+
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
3939
4040
- name: Install dependencies
4141
id: build
@@ -57,7 +57,7 @@ jobs:
5757
git fetch upstream
5858
git merge upstream/master
5959
asv machine --yes
60-
asv continuous -f 1.1 --strict upstream/master HEAD
60+
asv continuous -e -f 1.1 --strict upstream/master HEAD
6161
if: ${{ steps.build.outcome == 'success' }}
6262

6363
- name: Publish benchmarks artifact

.github/workflows/core-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
shell: bash
3636
run: |
3737
source ./ci/install-conda.sh
38-
python -m pip install --upgrade pip setuptools wheel coverage;
38+
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
3939
4040
- name: Install dependencies
4141
env:

.github/workflows/os-compat-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
shell: bash
3333
run: |
3434
source ./ci/install-conda.sh
35-
python -m pip install --upgrade pip setuptools wheel coverage;
35+
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
3636
3737
- name: Install dependencies
3838
env:

.github/workflows/platform-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
shell: bash
4848
run: |
4949
source ./ci/install-conda.sh
50-
python -m pip install --upgrade pip setuptools wheel coverage;
50+
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
5151
5252
- name: Start minikube
5353
if: ${{ matrix.with-kubernetes }}

benchmarks/asv_bench/benchmarks/execution.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import dataclasses
16+
import unittest.mock as mock
1617

1718
import mars.tensor as mt
1819
from mars import new_session
@@ -24,7 +25,7 @@
2425
)
2526
from mars.serialization import serialize
2627
from mars.services.task import new_task_id
27-
from mars.services.task.execution.ray.executor import execute_subtask
28+
from mars.services.task.execution.ray import executor as ray_executor
2829

2930

3031
def _gen_subtask_chunk_graph(t):
@@ -83,10 +84,11 @@ def time_numexpr_execution(self):
8384
c.execute(show_progress=False)
8485

8586
def time_numexpr_subtask_execution(self):
86-
for asv_subtask_info in self.asv_subtasks:
87-
execute_subtask(
88-
asv_subtask_info.subtask_id,
89-
asv_subtask_info.serialized_subtask_chunk_graph,
90-
set(),
91-
False,
92-
)
87+
with mock.patch.object(ray_executor, "ray"):
88+
for asv_subtask_info in self.asv_subtasks:
89+
ray_executor.execute_subtask(
90+
asv_subtask_info.subtask_id,
91+
asv_subtask_info.serialized_subtask_chunk_graph,
92+
0,
93+
False,
94+
)

mars/dataframe/merge/merge.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -740,22 +740,29 @@ def tile(cls, op: "DataFrameMerge"):
740740
and len(left.chunks) + len(right.chunks) > auto_merge_threshold
741741
):
742742
yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.2)
743+
left_chunk_size = len(left.chunks)
744+
right_chunk_size = len(right.chunks)
743745
left = auto_merge_chunks(ctx, left)
744746
right = auto_merge_chunks(ctx, right)
745-
logger.debug(
746-
"Before merge %s, left data count: %d, chunk size: %d, "
747-
"right data count: %d, chunk_size: %d",
747+
logger.info(
748+
"Auto merge before %s, left data shape: %s, chunk count: %s -> %s, "
749+
"right data shape: %s, chunk count: %s -> %s.",
748750
op,
749-
left.shape[0],
751+
left.shape,
752+
left_chunk_size,
750753
len(left.chunks),
751-
right.shape[0],
754+
right.shape,
755+
right_chunk_size,
752756
len(right.chunks),
753757
)
754758
else:
755-
logger.debug(
756-
"Skip auto merge before %s, left chunk size: %d, right chunk size: %d",
759+
logger.info(
760+
"Skip auto merge before %s, left data shape: %s, chunk count: %d, "
761+
"right data shape: %s, chunk count: %d.",
757762
op,
763+
left.shape,
758764
len(left.chunks),
765+
right.shape,
759766
len(right.chunks),
760767
)
761768

@@ -766,7 +773,7 @@ def tile(cls, op: "DataFrameMerge"):
766773
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
767774
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
768775
small_one = right if len(left.chunks) > len(right.chunks) else left
769-
logger.debug(
776+
logger.info(
770777
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
771778
op,
772779
small_one,
@@ -782,7 +789,7 @@ def tile(cls, op: "DataFrameMerge"):
782789
if op.method == "auto":
783790
# if method is auto, select new method after auto merge
784791
method = cls._choose_merge_method(op, left, right)
785-
logger.info("Choose %s method for merge operand %s", method, op)
792+
logger.info("Choose %s method for merge operand %s.", method, op)
786793
if method == MergeMethod.one_chunk:
787794
ret = cls._tile_one_chunk(op, left, right)
788795
elif method == MergeMethod.broadcast:
@@ -802,16 +809,20 @@ def tile(cls, op: "DataFrameMerge"):
802809
ret[0].chunks, progress=0.8
803810
) # trigger execution for chunks
804811
merged = auto_merge_chunks(get_context(), ret[0])
805-
logger.debug(
806-
"After merge %s, data size: %d, chunk size: %d",
812+
logger.info(
813+
"Auto merge after %s, data shape: %s, chunk count: %s -> %s.",
807814
op,
808-
merged.shape[0],
815+
merged.shape,
816+
len(ret[0].chunks),
809817
len(merged.chunks),
810818
)
811819
return [merged]
812820
else:
813-
logger.debug(
814-
"Skip auto merge after %s, chunk size: %d", op, len(ret[0].chunks)
821+
logger.info(
822+
"Skip auto merge after %s, data shape: %s, chunk count: %d.",
823+
op,
824+
ret[0].shape,
825+
len(ret[0].chunks),
815826
)
816827
return ret
817828

mars/dataframe/utils.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ def decide_dataframe_chunk_sizes(shape, chunk_size, memory_usage):
165165
:return: the calculated chunk size for each dimension
166166
:rtype: tuple
167167
"""
168-
from ..config import options
169-
170168
chunk_size = dictify_chunk_size(shape, chunk_size)
171169
average_memory_usage = memory_usage / shape[0]
172170

@@ -238,8 +236,6 @@ def decide_dataframe_chunk_sizes(shape, chunk_size, memory_usage):
238236

239237

240238
def decide_series_chunk_size(shape, chunk_size, memory_usage):
241-
from ..config import options
242-
243239
chunk_size = dictify_chunk_size(shape, chunk_size)
244240
average_memory_usage = memory_usage / shape[0] if shape[0] != 0 else memory_usage
245241

mars/metrics/api.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def __init__(
8989
self,
9090
name: str,
9191
description: str = "",
92-
tag_keys: Optional[Tuple[str]] = None,
92+
tag_keys: Optional[Tuple[str, ...]] = None,
9393
metric_type: str = "Counter",
9494
):
9595
self._name = name
@@ -125,7 +125,9 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None):
125125

126126

127127
def gen_metric(func):
128-
def wrapper(name, descriptions: str = "", tag_keys: Optional[Tuple[str]] = None):
128+
def wrapper(
129+
name, descriptions: str = "", tag_keys: Optional[Tuple[str, ...]] = None
130+
):
129131
if _init is True:
130132
return func(name, descriptions, tag_keys)
131133
else:
@@ -168,7 +170,9 @@ class Metrics:
168170

169171
@staticmethod
170172
@gen_metric
171-
def counter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
173+
def counter(
174+
name, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None
175+
):
172176
logger.info(
173177
"Initializing a counter with name: %s, tag keys: %s, backend: %s",
174178
name,
@@ -179,7 +183,7 @@ def counter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
179183

180184
@staticmethod
181185
@gen_metric
182-
def gauge(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
186+
def gauge(name, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None):
183187
logger.info(
184188
"Initializing a gauge whose name: %s, tag keys: %s, backend: %s",
185189
name,
@@ -190,7 +194,7 @@ def gauge(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
190194

191195
@staticmethod
192196
@gen_metric
193-
def meter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
197+
def meter(name, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None):
194198
logger.info(
195199
"Initializing a meter whose name: %s, tag keys: %s, backend: %s",
196200
name,
@@ -201,7 +205,9 @@ def meter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
201205

202206
@staticmethod
203207
@gen_metric
204-
def histogram(name, description: str = "", tag_keys: Optional[Tuple[str]] = None):
208+
def histogram(
209+
name, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None
210+
):
205211
logger.info(
206212
"Initializing a histogram whose name: %s, tag keys: %s, backend: %s",
207213
name,

mars/services/task/execution/ray/executor.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from .....core.operand.fetch import FetchShuffle
3535
from .....lib.aio import alru_cache
3636
from .....lib.ordered_set import OrderedSet
37-
from .....metrics.api import init_metrics
37+
from .....metrics.api import init_metrics, Metrics
3838
from .....resource import Resource
3939
from .....serialization import serialize, deserialize
4040
from .....typing import BandType
@@ -67,6 +67,24 @@
6767
logger = logging.getLogger(__name__)
6868

6969

70+
# Metrics
71+
submitted_subtask_number = Metrics.counter(
72+
"mars.ray_dag.submitted_subtask_number",
73+
"The number of submitted subtask.",
74+
("session_id", "task_id", "stage_id"),
75+
)
76+
started_subtask_number = Metrics.counter(
77+
"mars.ray_dag.started_subtask_number",
78+
"The number of started subtask.",
79+
("subtask_id",),
80+
)
81+
completed_subtask_number = Metrics.counter(
82+
"mars.ray_dag.completed_subtask_number",
83+
"The number of completed subtask.",
84+
("subtask_id",),
85+
)
86+
87+
7088
@dataclass
7189
class _RayChunkMeta:
7290
memory_size: int
@@ -165,8 +183,11 @@ def execute_subtask(
165183
subtask outputs and meta for outputs if `output_meta_keys` is provided.
166184
"""
167185
init_metrics("ray")
186+
metrics_tags = {"subtask_id": subtask_id}
187+
started_subtask_number.record(1, metrics_tags)
188+
ray_task_id = ray.get_runtime_context().task_id
168189
subtask_chunk_graph = deserialize(*subtask_chunk_graph)
169-
logger.info("Start subtask: %s.", subtask_id)
190+
logger.info("Start subtask: %s, ray task id: %s.", subtask_id, ray_task_id)
170191
# Optimize chunk graph.
171192
subtask_chunk_graph = _optimize_subtask_graph(subtask_chunk_graph)
172193
fetch_chunks, shuffle_fetch_chunk = _get_fetch_chunks(subtask_chunk_graph)
@@ -255,7 +276,8 @@ def execute_subtask(
255276
output_values.append(output_meta)
256277
output_values.extend(normal_output.values())
257278
output_values.extend(mapper_output.values())
258-
logger.info("Complete subtask: %s.", subtask_id)
279+
logger.info("Complete subtask: %s, ray task id: %s.", subtask_id, ray_task_id)
280+
completed_subtask_number.record(1, metrics_tags)
259281
return output_values[0] if len(output_values) == 1 else output_values
260282

261283

@@ -554,6 +576,11 @@ async def _execute_subtask_graph(
554576
)
555577
subtask_max_retries = self._config.get_subtask_max_retries()
556578
subtask_num_cpus = self._config.get_subtask_num_cpus()
579+
metrics_tags = {
580+
"session_id": self._task.session_id,
581+
"task_id": self._task.task_id,
582+
"stage_id": stage_id,
583+
}
557584
for subtask in subtask_graph.topological_iter():
558585
if subtask.virtual:
559586
continue
@@ -592,6 +619,7 @@ async def _execute_subtask_graph(
592619
await asyncio.sleep(0)
593620
if output_count == 1:
594621
output_object_refs = [output_object_refs]
622+
submitted_subtask_number.record(1, metrics_tags)
595623
monitor_context.submitted_subtasks.add(subtask)
596624
monitor_context.object_ref_to_subtask[output_object_refs[0]] = subtask
597625
if subtask.stage_n_outputs:
@@ -750,7 +778,7 @@ async def _load_subtask_inputs(
750778
shuffle_object_refs = list(shuffle_manager.get_reducer_input_refs(subtask))
751779

752780
if key_to_get_meta:
753-
logger.info(
781+
logger.debug(
754782
"Fetch %s metas and update context of stage %s.",
755783
len(key_to_get_meta),
756784
stage_id,
@@ -867,11 +895,13 @@ def gc():
867895
stage_id,
868896
),
869897
_RayExecutionStage.WAITING: lambda: logger.info(
870-
"Completed [%s/%s] subtasks of stage %s, one of waiting object refs: %s",
898+
"Completed [%s/%s] subtasks of stage %s, one of waiting ray tasks: %s",
871899
len(completed_subtasks),
872900
total,
873901
stage_id,
874-
next(iter(object_ref_to_subtask)) if object_ref_to_subtask else None,
902+
next(iter(object_ref_to_subtask)).task_id()
903+
if object_ref_to_subtask
904+
else None,
875905
),
876906
}
877907

mars/services/task/execution/ray/tests/test_ray_execution_backend.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ async def test_ray_executor_destroy():
176176

177177

178178
@require_ray
179-
def test_ray_execute_subtask_basic():
179+
@mock.patch("ray.get_runtime_context")
180+
def test_ray_execute_subtask_basic(_):
180181
raw = np.ones((10, 10))
181182
raw_expect = raw + 1
182183
a = mt.ones((10, 10), chunk_size=10)
@@ -610,7 +611,7 @@ async def _wait_gc_execute_subtask_graph(
610611

611612
with mock.patch.object(
612613
executor, "_execute_subtask_graph", _wait_gc_execute_subtask_graph
613-
):
614+
), mock.patch("ray.get_runtime_context"):
614615
async with executor:
615616
await executor.execute_subtask_graph(
616617
"mock_stage", subtask_graph, chunk_graph, tile_context

0 commit comments

Comments
 (0)