Skip to content

Commit 4e351dd

Browse files
authored
New ExecutionConfig API (#2968)
1 parent 4a0ab76 commit 4e351dd

File tree

17 files changed

+357
-85
lines changed

17 files changed

+357
-85
lines changed

mars/deploy/oscar/base_config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ task:
2929
fuse_enabled: yes
3030
initial_same_color_num: null
3131
as_broadcaster_successor_num: null
32-
task_executor_config:
32+
execution_config:
3333
backend: mars
3434
scheduling:
3535
autoscale:

mars/deploy/oscar/local.py

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
from ... import oscar as mo
2626
from ...core.entrypoints import init_extension_entrypoints
2727
from ...lib.aio import get_isolation, stop_isolation
28-
from ...resource import cpu_count, cuda_count, mem_total, Resource
28+
from ...resource import cpu_count, cuda_count, mem_total
2929
from ...services import NodeRole
30+
from ...services.task.execution.api import ExecutionConfig
3031
from ...typing import ClusterType, ClientType
3132
from ..utils import get_third_party_modules_from_config, load_config
3233
from .pool import create_supervisor_actor_pool, create_worker_actor_pool
@@ -82,7 +83,7 @@ async def new_cluster_in_isolation(
8283
n_supervisor_process,
8384
)
8485
await cluster.start()
85-
return await LocalClient.create(cluster, backend, timeout)
86+
return await LocalClient.create(cluster, timeout)
8687

8788

8889
async def new_cluster(
@@ -145,53 +146,54 @@ def __init__(
145146
subprocess_start_method = (
146147
"spawn" if sys.platform == "win32" else "forkserver"
147148
)
148-
# load config file to dict.
149149
self._address = address
150-
self._subprocess_start_method = subprocess_start_method
151-
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
152-
if backend is not None:
153-
self._config["task"]["task_executor_config"]["backend"] = backend
150+
self._n_worker = n_worker
154151
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
155152
self._mem_bytes = mem_total() if mem_bytes == "auto" else mem_bytes
153+
self._cuda_devices = self._get_cuda_devices(cuda_devices, n_worker)
154+
self._subprocess_start_method = subprocess_start_method
155+
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
156+
execution_config = ExecutionConfig.from_config(self._config, backend=backend)
157+
self._backend = execution_config.backend
158+
self._web = web
156159
self._n_supervisor_process = n_supervisor_process
160+
161+
execution_config.merge_from(
162+
ExecutionConfig.from_params(
163+
backend=self._backend,
164+
n_worker=self._n_worker,
165+
n_cpu=self._n_cpu,
166+
mem_bytes=self._mem_bytes,
167+
cuda_devices=self._cuda_devices,
168+
)
169+
)
170+
171+
self._bands_to_resource = execution_config.get_deploy_band_resources()
172+
self._supervisor_pool = None
173+
self._worker_pools = []
174+
self._exiting_check_task = None
175+
176+
self.supervisor_address = None
177+
self.web_address = None
178+
179+
@staticmethod
180+
def _get_cuda_devices(cuda_devices, n_worker):
157181
if cuda_devices == "auto":
158182
total = cuda_count()
159183
all_devices = np.arange(total)
160-
devices_list = [list(arr) for arr in np.array_split(all_devices, n_worker)]
184+
return [list(arr) for arr in np.array_split(all_devices, n_worker)]
161185

162186
else: # pragma: no cover
163187
if isinstance(cuda_devices[0], int):
164188
assert n_worker == 1
165-
devices_list = [cuda_devices]
189+
return [cuda_devices]
166190
else:
167191
assert len(cuda_devices) == n_worker
168-
devices_list = cuda_devices
169-
170-
self._n_worker = n_worker
171-
self._web = web
172-
self._bands_to_resource = bands_to_resource = []
173-
worker_cpus = self._n_cpu // n_worker
174-
if sum(len(devices) for devices in devices_list) == 0:
175-
assert worker_cpus > 0, (
176-
f"{self._n_cpu} cpus are not enough "
177-
f"for {n_worker}, try to decrease workers."
178-
)
179-
mem_bytes = self._mem_bytes // n_worker
180-
for _, devices in zip(range(n_worker), devices_list):
181-
worker_band_to_resource = dict()
182-
worker_band_to_resource["numa-0"] = Resource(
183-
num_cpus=worker_cpus, mem_bytes=mem_bytes
184-
)
185-
for i in devices: # pragma: no cover
186-
worker_band_to_resource[f"gpu-{i}"] = Resource(num_gpus=1)
187-
bands_to_resource.append(worker_band_to_resource)
188-
self._supervisor_pool = None
189-
self._worker_pools = []
192+
return cuda_devices
190193

191-
self.supervisor_address = None
192-
self.web_address = None
193-
194-
self._exiting_check_task = None
194+
@property
195+
def backend(self):
196+
return self._backend
195197

196198
@property
197199
def external_address(self):
@@ -285,18 +287,11 @@ def __init__(self: ClientType, cluster: ClusterType, session: AbstractSession):
285287
async def create(
286288
cls,
287289
cluster: LocalCluster,
288-
backend: str = None,
289290
timeout: float = None,
290291
) -> ClientType:
291-
if backend is None:
292-
backend = (
293-
cluster._config.get("task", {})
294-
.get("task_executor_config", {})
295-
.get("backend", "mars")
296-
)
297292
session = await _new_session(
298293
cluster.external_address,
299-
backend=backend,
294+
backend=cluster.backend,
300295
default=True,
301296
timeout=timeout,
302297
)

mars/deploy/oscar/tests/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,9 @@ async def _new_test_cluster_in_isolation(**new_cluster_kwargs):
119119
if k in kwargs:
120120
new_cluster_params[k] = kwargs.pop(k)
121121
return (
122-
await _new_test_cluster_in_isolation(address=address, **new_cluster_params)
122+
await _new_test_cluster_in_isolation(
123+
address=address, backend=backend, **new_cluster_params
124+
)
123125
).session
124126
return await _get_checked_session(address)
125127

mars/deploy/oscar/tests/test_local.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
_IsolatedWebSession,
5656
_execute_with_progress,
5757
)
58+
from ..tests.session import new_test_session
5859
from .modules.utils import ( # noqa: F401; pylint: disable=unused-variable
5960
cleanup_third_party_modules_output,
6061
get_output_filenames,
@@ -108,6 +109,8 @@ async def create_cluster(request):
108109
config = CONFIG_TEST_FILE
109110
elif request.param == "vineyard":
110111
config = CONFIG_VINEYARD_TEST_FILE
112+
else:
113+
config = None
111114
start_method = os.environ.get("POOL_START_METHOD", None)
112115
client = await new_cluster(
113116
subprocess_start_method=start_method,
@@ -135,6 +138,48 @@ async def _assert(session_id: str, addr: str, level: StorageLevel):
135138
).result()
136139

137140

141+
@pytest.mark.parametrize("backend", ["mars"])
142+
@pytest.mark.parametrize("_new_session", [new_session, new_test_session])
143+
def test_new_session_backend(_new_session, backend):
144+
from ....services.task.execution.api import _name_to_config_cls
145+
146+
config_cls = _name_to_config_cls[backend]
147+
original_config_init = config_cls.__init__
148+
original_deploy_band_resources = config_cls.get_deploy_band_resources
149+
with mock.patch.object(
150+
config_cls, "__init__", autospec=True
151+
) as config_init, mock.patch.object(
152+
config_cls, "get_deploy_band_resources", autospec=True
153+
) as deploy_band_resources:
154+
return_deploy_band_resources = []
155+
156+
def _wrap_original_deploy_band_resources(*args, **kwargs):
157+
nonlocal return_deploy_band_resources
158+
return_deploy_band_resources = original_deploy_band_resources(
159+
*args, **kwargs
160+
)
161+
return return_deploy_band_resources
162+
163+
config_init.side_effect = original_config_init
164+
deploy_band_resources.side_effect = _wrap_original_deploy_band_resources
165+
sess = _new_session(
166+
backend=backend, n_cpu=2, web=False, use_uvloop=False, default=True
167+
)
168+
try:
169+
assert config_init.call_count > 0
170+
assert deploy_band_resources.call_count > 0
171+
worker_pools = sess.default.client._cluster._worker_pools
172+
assert len(worker_pools) == len(return_deploy_band_resources)
173+
a = mt.ones((10, 10))
174+
b = a + 1
175+
res = b.to_numpy()
176+
np.testing.assert_array_equal(res, np.ones((10, 10)) + 1)
177+
finally:
178+
sess.stop_server()
179+
180+
assert get_default_async_session() is None
181+
182+
138183
@pytest.mark.asyncio
139184
async def test_vineyard_operators(create_cluster):
140185
param = create_cluster[1]

mars/deploy/oscar/tests/test_ray_dag.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
from ....tests.core import DICT_NOT_EMPTY, require_ray
2121
from ....utils import lazy_import
2222
from ..local import new_cluster
23+
from ..session import new_session
2324
from ..tests import test_local
25+
from ..tests.session import new_test_session
2426
from .modules.utils import ( # noqa: F401; pylint: disable=unused-variable
2527
cleanup_third_party_modules_output,
2628
get_output_filenames,
@@ -67,6 +69,13 @@ async def create_cluster(request):
6769
yield client, {}
6870

6971

72+
@require_ray
73+
@pytest.mark.parametrize("backend", ["ray"])
74+
@pytest.mark.parametrize("_new_session", [new_session, new_test_session])
75+
def test_new_session_backend(ray_start_regular_shared2, _new_session, backend):
76+
test_local.test_new_session_backend(_new_session, backend)
77+
78+
7079
@require_ray
7180
@pytest.mark.parametrize(
7281
"config",

mars/services/task/execution/api.py

Lines changed: 112 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,121 @@
1414

1515
from abc import ABC, abstractmethod
1616
from dataclasses import dataclass
17-
from typing import List, Dict, Any, Type
17+
from typing import List, Dict, Any, Type, Union
1818

1919
from ....core import ChunkGraph, Chunk, TileContext
2020
from ....resource import Resource
2121
from ....typing import BandType
22+
from ....utils import merge_dict
2223
from ...subtask import SubtaskGraph, SubtaskResult
2324

2425

26+
class ExecutionConfig:
27+
"""
28+
The config for execution backends.
29+
30+
This class should ONLY provide the APIs for the parts other than
31+
just the execution. Each backend may have a different implementation
32+
of the API.
33+
34+
If some configuration is for a specific backend. They should be in
35+
the backend config. e.g. `get_mars_special_config()` should be in
36+
the `MarsExecutionConfig`.
37+
"""
38+
39+
name = None
40+
41+
def __init__(self, execution_config: Dict):
42+
"""
43+
An example of execution_config:
44+
{
45+
"backend": "mars",
46+
"mars": {
47+
"n_worker": 1,
48+
"n_cpu": 2,
49+
...
50+
},
51+
}
52+
"""
53+
self._execution_config = execution_config
54+
55+
def merge_from(self, execution_config: "ExecutionConfig") -> "ExecutionConfig":
56+
assert isinstance(execution_config, ExecutionConfig)
57+
assert self.backend == execution_config.backend
58+
merge_dict(
59+
self._execution_config,
60+
execution_config.get_execution_config(),
61+
)
62+
return self
63+
64+
@property
65+
def backend(self) -> str:
66+
"""The backend from config."""
67+
return self._execution_config["backend"]
68+
69+
def get_execution_config(self) -> Dict:
70+
"""Get the execution config dict."""
71+
return self._execution_config
72+
73+
@abstractmethod
74+
def get_deploy_band_resources(self) -> List[Dict[str, Resource]]:
75+
"""Get the band resources for deployment."""
76+
77+
@classmethod
78+
def from_config(cls, config: Dict, backend: str = None) -> "ExecutionConfig":
79+
"""Construct an execution config instance from config."""
80+
execution_config = config["task"]["execution_config"]
81+
return cls.from_execution_config(execution_config, backend)
82+
83+
@classmethod
84+
def from_execution_config(
85+
cls, execution_config: Union[Dict, "ExecutionConfig"], backend: str = None
86+
) -> "ExecutionConfig":
87+
"""Construct an execution config instance from execution config."""
88+
if isinstance(execution_config, ExecutionConfig):
89+
assert backend is None
90+
return execution_config
91+
if backend is not None:
92+
name = execution_config["backend"] = backend
93+
else:
94+
name = execution_config.setdefault("backend", "mars")
95+
config_cls = _name_to_config_cls[name]
96+
return config_cls(execution_config)
97+
98+
@classmethod
99+
def from_params(
100+
cls,
101+
backend: str,
102+
n_worker: int,
103+
n_cpu: int,
104+
mem_bytes: int = 0,
105+
cuda_devices: List[List[int]] = None,
106+
**kwargs,
107+
) -> "ExecutionConfig":
108+
"""Construct an execution config instance from params."""
109+
execution_config = {
110+
"backend": backend,
111+
backend: dict(
112+
{
113+
"n_worker": n_worker,
114+
"n_cpu": n_cpu,
115+
"mem_bytes": mem_bytes,
116+
"cuda_devices": cuda_devices,
117+
},
118+
**kwargs,
119+
),
120+
}
121+
return cls.from_execution_config(execution_config)
122+
123+
124+
_name_to_config_cls: Dict[str, Type[ExecutionConfig]] = {}
125+
126+
127+
def register_config_cls(config_cls: Type[ExecutionConfig]):
128+
_name_to_config_cls[config_cls.name] = config_cls
129+
return config_cls
130+
131+
25132
@dataclass
26133
class ExecutionChunkResult:
27134
meta: Dict # The chunk meta for iterative tiling.
@@ -35,17 +142,16 @@ class TaskExecutor(ABC):
35142
@abstractmethod
36143
async def create(
37144
cls,
38-
config: Dict,
145+
config: Union[Dict, ExecutionConfig],
39146
*,
40147
session_id: str,
41148
address: str,
42149
task,
43150
tile_context: TileContext,
44151
**kwargs,
45152
) -> "TaskExecutor":
46-
name = config.get("backend", "mars")
47-
backend_config = config.get(name, {})
48-
executor_cls = _name_to_task_executor_cls[name]
153+
backend_config = ExecutionConfig.from_execution_config(config)
154+
executor_cls = _name_to_task_executor_cls[backend_config.backend]
49155
if executor_cls.create.__func__ is TaskExecutor.create.__func__:
50156
raise NotImplementedError(
51157
f"The {executor_cls} should implement the abstract classmethod `create`."
@@ -102,6 +208,7 @@ def get_stage_processors(self):
102208

103209
def register_executor_cls(executor_cls: Type[TaskExecutor]):
104210
_name_to_task_executor_cls[executor_cls.name] = executor_cls
211+
return executor_cls
105212

106213

107214
class Fetcher:

mars/services/task/execution/mars/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from .config import MarsExecutionConfig
1516
from .executor import MarsTaskExecutor
1617
from .fetcher import MarsFetcher

0 commit comments

Comments
 (0)