Skip to content

Commit 3756981

Browse files
authored
Allow specifying multiple supervisor processes (#2452)
1 parent ff7a804 commit 3756981

File tree

21 files changed

+204
-100
lines changed

21 files changed

+204
-100
lines changed

.github/workflows/install-hadoop.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ sudo npm uninstall -g yarn || true
99

1010
sudo apt-get install -yq ssh rsync
1111

12-
VERSION=3.3.1
12+
VERSION=2.10.1
1313
HADOOP_URL="https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=hadoop/common/hadoop-$VERSION/hadoop-$VERSION.tar.gz"
1414

1515
# download hadoop

mars/deploy/oscar/local.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,19 @@ async def new_cluster_in_isolation(
5757
config: Union[str, Dict] = None,
5858
web: bool = True,
5959
timeout: float = None,
60+
n_supervisor_process: int = 0,
6061
) -> ClientType:
6162
if subprocess_start_method is None:
6263
subprocess_start_method = "spawn" if sys.platform == "win32" else "forkserver"
6364
cluster = LocalCluster(
64-
address, n_worker, n_cpu, cuda_devices, subprocess_start_method, config, web
65+
address,
66+
n_worker,
67+
n_cpu,
68+
cuda_devices,
69+
subprocess_start_method,
70+
config,
71+
web,
72+
n_supervisor_process,
6573
)
6674
await cluster.start()
6775
return await LocalClient.create(cluster, backend, timeout)
@@ -77,6 +85,7 @@ async def new_cluster(
7785
web: bool = True,
7886
loop: asyncio.AbstractEventLoop = None,
7987
use_uvloop: Union[bool, str] = "auto",
88+
n_supervisor_process: int = 0,
8089
) -> ClientType:
8190
coro = new_cluster_in_isolation(
8291
address,
@@ -86,6 +95,7 @@ async def new_cluster(
8695
subprocess_start_method=subprocess_start_method,
8796
config=config,
8897
web=web,
98+
n_supervisor_process=n_supervisor_process,
8999
)
90100
isolation = ensure_isolation_created(dict(loop=loop, use_uvloop=use_uvloop))
91101
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
@@ -111,6 +121,7 @@ def __init__(
111121
config: Union[str, Dict] = None,
112122
web: Union[bool, str] = "auto",
113123
timeout: float = None,
124+
n_supervisor_process: int = 0,
114125
):
115126
# load third party extensions.
116127
init_extension_entrypoints()
@@ -121,6 +132,7 @@ def __init__(
121132
self._subprocess_start_method = subprocess_start_method
122133
self._config = config
123134
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
135+
self._n_supervisor_process = n_supervisor_process
124136
if cuda_devices == "auto":
125137
total = cuda_count()
126138
all_devices = np.arange(total)
@@ -188,7 +200,7 @@ async def _start_supervisor_pool(self):
188200
)
189201
self._supervisor_pool = await create_supervisor_actor_pool(
190202
self._address,
191-
n_process=0,
203+
n_process=self._n_supervisor_process,
192204
modules=supervisor_modules,
193205
subprocess_start_method=self._subprocess_start_method,
194206
)

mars/deploy/oscar/ray.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,17 @@ async def new_cluster(
304304
ensure_isolation_created(kwargs)
305305
if kwargs: # pragma: no cover
306306
raise TypeError(f"new_cluster got unexpected " f"arguments: {list(kwargs)}")
307+
n_supervisor_process = kwargs.get(
308+
"n_supervisor_process", DEFAULT_SUPERVISOR_SUB_POOL_NUM
309+
)
307310
cluster = RayCluster(
308-
cluster_name, supervisor_mem, worker_num, worker_cpu, worker_mem, config
311+
cluster_name,
312+
supervisor_mem,
313+
worker_num,
314+
worker_cpu,
315+
worker_mem,
316+
config,
317+
n_supervisor_process=n_supervisor_process,
309318
)
310319
try:
311320
await cluster.start()
@@ -371,11 +380,13 @@ def __init__(
371380
worker_cpu: int = 16,
372381
worker_mem: int = 32 * 1024 ** 3,
373382
config: Union[str, Dict] = None,
383+
n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM,
374384
):
375385
# load third party extensions.
376386
init_extension_entrypoints()
377387
self._cluster_name = cluster_name
378388
self._supervisor_mem = supervisor_mem
389+
self._n_supervisor_process = n_supervisor_process
379390
self._worker_num = worker_num
380391
self._worker_cpu = worker_cpu
381392
self._worker_mem = worker_mem
@@ -402,7 +413,7 @@ async def start(self):
402413
self._config.get("cluster", {})
403414
.get("ray", {})
404415
.get("supervisor", {})
405-
.get("sub_pool_num", DEFAULT_SUPERVISOR_SUB_POOL_NUM)
416+
.get("sub_pool_num", self._n_supervisor_process)
406417
)
407418
from ...storage.ray import support_specify_owner
408419

mars/deploy/oscar/session.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,10 @@ def _attach_session(future: asyncio.Future):
17151715
break
17161716
except asyncio.TimeoutError:
17171717
# timeout
1718-
if not cancelled.is_set():
1718+
if (
1719+
not cancelled.is_set()
1720+
and execution_info.progress() is not None
1721+
):
17191722
progress_bar.update(execution_info.progress() * 100)
17201723
if cancelled.is_set():
17211724
# cancel execution

mars/deploy/oscar/supervisor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def __init__(self):
3232
def config_args(self, parser):
3333
super().config_args(parser)
3434
parser.add_argument("-w", "--web-port", help="web port of the service")
35+
parser.add_argument(
36+
"--n-process", help="number of supervisor processes", default="0"
37+
)
3538

3639
def parse_args(self, parser, argv, environ=None):
3740
args = super().parse_args(parser, argv, environ=environ)
@@ -52,7 +55,7 @@ def parse_args(self, parser, argv, environ=None):
5255
async def create_actor_pool(self):
5356
return await create_supervisor_actor_pool(
5457
self.args.endpoint,
55-
n_process=0,
58+
n_process=int(self.args.n_process),
5659
ports=self.ports,
5760
modules=self.args.load_modules,
5861
logging_conf=self.logging_conf,

mars/deploy/oscar/tests/test_cmdline.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def _get_labelled_port(label=None, create=True):
103103
test_name = os.environ["PYTEST_CURRENT_TEST"]
104104
if (test_name, label) not in _test_port_cache:
105105
if create:
106-
_test_port_cache[(test_name, label)] = get_next_port()
106+
_test_port_cache[(test_name, label)] = get_next_port(occupy=True)
107107
else:
108108
return None
109109
return _test_port_cache[(test_name, label)]
@@ -128,6 +128,8 @@ def _get_labelled_port(label=None, create=True):
128128
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
129129
"-w",
130130
lambda: str(_get_labelled_port("web")),
131+
"--n-process",
132+
"2",
131133
],
132134
worker_cmd_start
133135
+ [
@@ -147,19 +149,20 @@ def _reload_args(args):
147149
return [arg if not callable(arg) else arg() for arg in args]
148150

149151

150-
_rerun_errors = (_ProcessExitedException,) + (
152+
_rerun_errors = (
153+
_ProcessExitedException,
151154
asyncio.TimeoutError,
152155
futures.TimeoutError,
153156
TimeoutError,
154157
)
155158

156159

157-
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
158160
@pytest.mark.parametrize(
159161
"supervisor_args,worker_args,use_web_addr",
160162
list(start_params.values()),
161163
ids=list(start_params.keys()),
162164
)
165+
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
163166
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
164167
new_isolation()
165168
sv_proc = w_procs = None

mars/oscar/backends/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ async def _listen(self, client: Client):
6666
self._client_to_message_futures[client] = dict()
6767
for future in message_futures.values():
6868
future.set_exception(e)
69+
finally:
70+
await asyncio.sleep(0)
6971

7072
message_futures = self._client_to_message_futures.get(client)
7173
self._client_to_message_futures[client] = dict()

mars/oscar/backends/mars/tests/test_pool.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,11 @@ def clear_routers():
116116

117117

118118
@pytest.mark.asyncio
119+
@mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_create")
119120
@mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_destroy")
120-
async def test_sub_actor_pool(notify_main_pool):
121-
notify_main_pool.return_value = None
121+
async def test_sub_actor_pool(notify_main_pool_to_create, notify_main_pool_to_destroy):
122+
notify_main_pool_to_create.return_value = None
123+
notify_main_pool_to_destroy.return_value = None
122124
config = ActorPoolConfig()
123125

124126
ext_address0 = f"127.0.0.1:{get_next_port()}"
@@ -850,3 +852,34 @@ def get_logger_level(self):
850852
_Actor, allocate_strategy=strategy, address=pool.external_address
851853
)
852854
assert await ref.get_logger_level() == logging.DEBUG
855+
856+
857+
@pytest.mark.asyncio
858+
async def test_ref_sub_pool_actor():
859+
start_method = (
860+
os.environ.get("POOL_START_METHOD", "forkserver")
861+
if sys.platform != "win32"
862+
else None
863+
)
864+
pool = await create_actor_pool(
865+
"127.0.0.1",
866+
pool_cls=MainActorPool,
867+
n_process=1,
868+
subprocess_start_method=start_method,
869+
)
870+
871+
async with pool:
872+
ctx = get_context()
873+
ref1 = await ctx.create_actor(
874+
TestActor, address=pool.external_address, allocate_strategy=RandomSubPool()
875+
)
876+
sub_address = ref1.address
877+
ref2 = await ctx.create_actor(TestActor, address=sub_address)
878+
ref2_main = await ctx.actor_ref(ref2.uid, address=pool.external_address)
879+
assert ref2_main.address == sub_address
880+
881+
await ctx.destroy_actor(create_actor_ref(pool.external_address, ref2.uid))
882+
assert not await ctx.has_actor(
883+
create_actor_ref(pool.external_address, ref2.uid)
884+
)
885+
assert not await ctx.has_actor(create_actor_ref(sub_address, ref2.uid))

mars/oscar/backends/message.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class ControlMessageType(Enum):
5959
sync_config = 2
6060
get_config = 3
6161
wait_pool_recovered = 4
62+
add_sub_pool_actor = 5
6263

6364

6465
@dataslots
@@ -192,7 +193,14 @@ def message_type(self) -> MessageType:
192193

193194

194195
class CreateActorMessage(_MessageBase):
195-
__slots__ = "actor_cls", "actor_id", "args", "kwargs", "allocate_strategy"
196+
__slots__ = (
197+
"actor_cls",
198+
"actor_id",
199+
"args",
200+
"kwargs",
201+
"allocate_strategy",
202+
"from_main",
203+
)
196204

197205
def __init__(
198206
self,
@@ -202,6 +210,7 @@ def __init__(
202210
args: Tuple,
203211
kwargs: Dict,
204212
allocate_strategy,
213+
from_main: bool = False,
205214
protocol: int = None,
206215
message_trace: List[MessageTraceItem] = None,
207216
):
@@ -211,6 +220,7 @@ def __init__(
211220
self.args = args
212221
self.kwargs = kwargs
213222
self.allocate_strategy = allocate_strategy
223+
self.from_main = from_main
214224

215225
@classproperty
216226
@implements(_MessageBase.message_type)

mars/oscar/backends/pool.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import concurrent.futures as futures
1717
import itertools
1818
import logging
19+
import multiprocessing
1920
import os
2021
import threading
21-
import multiprocessing
2222
from abc import ABC, ABCMeta, abstractmethod
2323
from typing import Dict, List, Type, TypeVar, Coroutine, Callable, Union, Optional
2424

@@ -473,7 +473,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type
473473
async def has_actor(self, message: HasActorMessage) -> ResultMessage:
474474
result = ResultMessage(
475475
message.message_id,
476-
to_binary(message.actor_ref.uid) in self._actors,
476+
message.actor_ref.uid in self._actors,
477477
protocol=message.protocol,
478478
)
479479
return result
@@ -497,7 +497,7 @@ async def destroy_actor(self, message: DestroyActorMessage) -> result_message_ty
497497
@implements(AbstractActorPool.actor_ref)
498498
async def actor_ref(self, message: ActorRefMessage) -> result_message_type:
499499
with _ErrorProcessor(message.message_id, message.protocol) as processor:
500-
actor_id = to_binary(message.actor_ref.uid)
500+
actor_id = message.actor_ref.uid
501501
if actor_id not in self._actors:
502502
raise ActorNotExist(f"Actor {actor_id} does not exist")
503503
result = ResultMessage(
@@ -649,6 +649,22 @@ async def notify_main_pool_to_destroy(
649649
): # pragma: no cover
650650
await self.call(self._main_address, message)
651651

652+
async def notify_main_pool_to_create(self, message: CreateActorMessage):
653+
reg_message = ControlMessage(
654+
new_message_id(),
655+
self.external_address,
656+
ControlMessageType.add_sub_pool_actor,
657+
(self.external_address, message.allocate_strategy, message),
658+
)
659+
await self.call(self._main_address, reg_message)
660+
661+
@implements(AbstractActorPool.create_actor)
662+
async def create_actor(self, message: CreateActorMessage) -> result_message_type:
663+
result = await super().create_actor(message)
664+
if not message.from_main:
665+
await self.notify_main_pool_to_create(message)
666+
return result
667+
652668
@implements(AbstractActorPool.actor_ref)
653669
async def actor_ref(self, message: ActorRefMessage) -> result_message_type:
654670
result = await super().actor_ref(message)
@@ -775,6 +791,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type
775791
message.args,
776792
message.kwargs,
777793
allocate_strategy=new_allocate_strategy,
794+
from_main=True,
778795
protocol=message.protocol,
779796
message_trace=message.message_trace,
780797
)
@@ -952,6 +969,17 @@ async def handle_control_command(
952969
processor.result = ResultMessage(
953970
message.message_id, True, protocol=message.protocol
954971
)
972+
elif message.control_message_type == ControlMessageType.add_sub_pool_actor:
973+
address, allocate_strategy, create_message = message.content
974+
create_message.from_main = True
975+
ref = create_actor_ref(address, to_binary(create_message.actor_id))
976+
self._allocated_actors[address][ref] = (
977+
allocate_strategy,
978+
create_message,
979+
)
980+
processor.result = ResultMessage(
981+
message.message_id, True, protocol=message.protocol
982+
)
955983
else:
956984
processor.result = await self.call(message.address, message)
957985
return processor.result
@@ -1114,9 +1142,8 @@ def process_sub_pool_lost(self, address: str):
11141142
async def monitor_sub_pools(self):
11151143
try:
11161144
while not self._stopped.is_set():
1117-
for address in self.sub_processes:
1145+
for address, process in self.sub_processes.items():
11181146
try:
1119-
process = self.sub_processes[address]
11201147
recover_events_discovered = address in self._recover_events
11211148
if not await self.is_sub_pool_alive(
11221149
process

0 commit comments

Comments
 (0)