Skip to content

Commit bc37acf

Browse files
fyrestone刘宝
andauthored
[Ray] Destroy Ray executor when the task finish (#3049)
* Destroy Ray executor when the task finish * Fix * Fix Co-authored-by: 刘宝 <[email protected]>
1 parent 08d7cfe commit bc37acf

File tree

4 files changed

+73
-2
lines changed

4 files changed

+73
-2
lines changed

mars/services/task/execution/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ async def create(
165165
**kwargs,
166166
)
167167

168+
def destroy(self):
169+
"""Destroy the executor."""
170+
168171
async def __aenter__(self):
169172
"""Called when begin to execute the task."""
170173

mars/services/task/execution/ray/executor.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,27 @@ async def create(
177177
meta_api,
178178
)
179179

180+
# noinspection DuplicatedCode
181+
def destroy(self):
182+
self._config = None
183+
self._task = None
184+
self._tile_context = None
185+
self._task_context = None
186+
self._task_state_actor = None
187+
self._ray_executor = None
188+
189+
# api
190+
self._lifecycle_api = None
191+
self._meta_api = None
192+
193+
self._available_band_resources = None
194+
195+
# For progress
196+
self._pre_all_stages_progress = 1
197+
self._pre_all_stages_tile_progress = 1
198+
self._cur_stage_tile_progress = 1
199+
self._cur_stage_output_object_refs = []
200+
180201
@classmethod
181202
@alru_cache(cache_exceptions=False)
182203
async def _get_apis(cls, session_id: str, address: str):

mars/services/task/execution/ray/tests/test_ray_execution_backend.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,28 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from collections import Counter
16+
1517
import numpy as np
1618
import pandas as pd
1719
import pytest
1820

1921
from ...... import tensor as mt
2022

23+
from ......core import TileContext
2124
from ......core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
2225
from ......serialization import serialize
2326
from ......tests.core import require_ray, mock
2427
from ......utils import lazy_import, get_chunk_params
2528
from .....context import ThreadedServiceContext
26-
from ....core import new_task_id
29+
from ....core import new_task_id, Task
30+
from ..config import RayExecutionConfig
2731
from ..context import (
2832
RayExecutionContext,
2933
RayRemoteObjectManager,
3034
_RayRemoteObjectContext,
3135
)
32-
from ..executor import execute_subtask
36+
from ..executor import execute_subtask, RayTaskExecutor
3337
from ..fetcher import RayFetcher
3438

3539
ray = lazy_import("ray")
@@ -41,6 +45,48 @@ def _gen_subtask_chunk_graph(t):
4145
return next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
4246

4347

48+
class MockRayTaskExecutor(RayTaskExecutor):
49+
def __init__(self, *args, **kwargs):
50+
self._set_attrs = Counter()
51+
super().__init__(*args, **kwargs)
52+
53+
@staticmethod
54+
def _get_ray_executor():
55+
# Export remote function once.
56+
return None
57+
58+
def set_attr_counter(self):
59+
return self._set_attrs
60+
61+
def __setattr__(self, key, value):
62+
super().__setattr__(key, value)
63+
self._set_attrs[key] += 1
64+
65+
66+
def test_ray_executor_destroy():
67+
task = Task("mock_task", "mock_session")
68+
config = RayExecutionConfig.from_execution_config({"backend": "mars"})
69+
executor = MockRayTaskExecutor(
70+
config=config,
71+
task=task,
72+
tile_context=TileContext(),
73+
task_context={},
74+
task_state_actor=None,
75+
lifecycle_api=None,
76+
meta_api=None,
77+
)
78+
counter = executor.set_attr_counter()
79+
assert len(counter) > 0
80+
keys = executor.__dict__.keys()
81+
assert counter.keys() >= keys
82+
counter.clear()
83+
executor.destroy()
84+
keys = set(keys) - {"_set_attrs"}
85+
assert counter.keys() == keys, "Some keys are not reset in destroy()."
86+
for k, v in counter.items():
87+
assert v == 1
88+
89+
4490
def test_ray_execute_subtask_basic():
4591
raw = np.ones((10, 10))
4692
raw_expect = raw + 1

mars/services/task/supervisor/processor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ def dump_subtask_graph(self):
429429
f.write(dot)
430430

431431
def _finish(self):
432+
self._executor.destroy()
432433
self.done.set()
433434
if self._dump_subtask_graph:
434435
self.dump_subtask_graph()

0 commit comments

Comments
 (0)