Skip to content

Commit 712d7f1

Browse files
wjsihekaisheng
andauthored
Fix supervisor n_process (#2950)
Co-authored-by: hekaisheng <[email protected]>
1 parent 4ae67fb commit 712d7f1

File tree

12 files changed

+115
-93
lines changed

12 files changed

+115
-93
lines changed

mars/core/graph/tests/test_graph.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import logging
1615
import pytest
1716

1817
from .... import tensor as mt
1918
from ....tests.core import flaky
19+
from ....utils import to_str
2020
from .. import DAG, GraphContainsCycleError
2121

2222

@@ -103,22 +103,5 @@ def test_to_dot():
103103
arr2 = arr + arr_add
104104
graph = arr2.build_graph(fuse_enabled=False, tile=True)
105105

106-
dot = str(graph.to_dot(trunc_key=5))
107-
try:
108-
assert all(str(n.key)[5] in dot for n in graph) is True
109-
except AssertionError:
110-
graph_reprs = []
111-
for n in graph:
112-
graph_reprs.append(
113-
f"{n.op.key} -> {[succ.key for succ in graph.successors(n)]}"
114-
)
115-
logging.error(
116-
"Unexpected error in test_to_dot.\ndot = %r\ngraph_repr = %r",
117-
dot,
118-
"\n".join(graph_reprs),
119-
)
120-
missing_prefix = next(n.key for n in graph if str(n.key)[5] not in dot)
121-
logging.error(
122-
"Missing prefix %r (type: %s)", missing_prefix, type(missing_prefix)
123-
)
124-
raise
106+
dot = to_str(graph.to_dot(trunc_key=5))
107+
assert all(to_str(n.key)[:5] in dot for n in graph) is True

mars/deploy/kubernetes/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import abc
1717
import functools
18+
import math
1819
import re
1920

2021
from ... import __version__ as mars_version
@@ -605,6 +606,8 @@ def build_container_command(self):
605606
cmd += ["-p", str(self._service_port)]
606607
if self._web_port:
607608
cmd += ["-w", str(self._web_port)]
609+
if self._cpu:
610+
cmd += ["--n-process", str(int(math.ceil(self._cpu)))]
608611
return cmd
609612

610613

mars/deploy/oscar/supervisor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def config_args(self, parser):
3838
super().config_args(parser)
3939
parser.add_argument("-w", "--web-port", help="web port of the service")
4040
parser.add_argument(
41-
"--n-process", help="number of supervisor processes", default="0"
41+
"--n-process", help="number of supervisor processes", default="1"
4242
)
4343

4444
def parse_args(self, parser, argv, environ=None):

mars/oscar/backends/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ class ProfilingContext:
5050

5151

5252
class MarsActorContext(BaseActorContext):
53-
__slots__ = "_address", "_caller"
53+
__slots__ = ("_caller",)
5454

5555
support_allocate_strategy = True
5656

5757
def __init__(self, address: str = None):
58-
self._address = address
58+
BaseActorContext.__init__(self, address)
5959
self._caller = ActorCaller()
6060

6161
def __del__(self):

mars/oscar/backends/mars/pool.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import logging.config
1818
import multiprocessing
1919
import os
20+
import random
2021
import signal
2122
import sys
23+
import uuid
2224
from dataclasses import dataclass
2325
from types import TracebackType
2426
from typing import List
@@ -164,6 +166,9 @@ def _start_sub_pool(
164166
):
165167
ensure_coverage()
166168

169+
# make sure enough randomness for every sub pool
170+
random.seed(uuid.uuid1().bytes)
171+
167172
conf = actor_config.get_pool_config(process_index)
168173
suspend_sigint = conf["suspend_sigint"]
169174
if suspend_sigint:

mars/oscar/backends/mars/tests/test_mars_actor_context.py

Lines changed: 66 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def get_call_log(self):
242242

243243
@pytest.mark.parametrize(indirect=True)
244244
@pytest.fixture(params=[False, True])
245-
async def actor_pool_context(request):
245+
async def actor_pool(request):
246246
start_method = (
247247
os.environ.get("POOL_START_METHOD", "forkserver")
248248
if sys.platform != "win32"
@@ -266,9 +266,10 @@ async def actor_pool_context(request):
266266

267267

268268
@pytest.mark.asyncio
269-
async def test_simple_local_actor_pool(actor_pool_context):
270-
pool = actor_pool_context
271-
actor_ref = await mo.create_actor(DummyActor, 100, address=pool.external_address)
269+
async def test_simple_local_actor_pool(actor_pool):
270+
actor_ref = await mo.create_actor(
271+
DummyActor, 100, address=actor_pool.external_address
272+
)
272273
assert await actor_ref.add(1) == 101
273274
await actor_ref.add(1)
274275

@@ -279,17 +280,18 @@ async def test_simple_local_actor_pool(actor_pool_context):
279280
assert actor_ref.address == ref2.address
280281
assert actor_ref.uid == ref2.uid
281282

282-
ref = await mo.actor_ref(uid=actor_ref.uid, address=pool.external_address)
283+
ref = await mo.actor_ref(uid=actor_ref.uid, address=actor_pool.external_address)
283284
assert await ref.add(2) == 104
284285

285286

286287
@pytest.mark.asyncio
287-
async def test_mars_post_create_pre_destroy(actor_pool_context):
288-
pool = actor_pool_context
288+
async def test_mars_post_create_pre_destroy(actor_pool):
289289
rec_ref = await mo.create_actor(
290-
RecordActor, uid=RecordActor.default_uid(), address=pool.external_address
290+
RecordActor, uid=RecordActor.default_uid(), address=actor_pool.external_address
291+
)
292+
actor_ref = await mo.create_actor(
293+
CreateDestroyActor, address=actor_pool.external_address
291294
)
292-
actor_ref = await mo.create_actor(CreateDestroyActor, address=pool.external_address)
293295
await actor_ref.destroy()
294296

295297
records = await rec_ref.get_records()
@@ -299,72 +301,78 @@ async def test_mars_post_create_pre_destroy(actor_pool_context):
299301

300302

301303
@pytest.mark.asyncio
302-
async def test_mars_create_actor(actor_pool_context):
303-
pool = actor_pool_context
304-
actor_ref = await mo.create_actor(DummyActor, 1, address=pool.external_address)
304+
async def test_mars_create_actor(actor_pool):
305+
actor_ref = await mo.create_actor(
306+
DummyActor, 1, address=actor_pool.external_address
307+
)
305308
# create actor inside on_receive
306-
r = await actor_ref.create(DummyActor, 5, address=pool.external_address)
307-
ref = await mo.actor_ref(r, address=pool.external_address)
309+
r = await actor_ref.create(DummyActor, 5, address=actor_pool.external_address)
310+
ref = await mo.actor_ref(r, address=actor_pool.external_address)
308311
assert await ref.add(10) == 15
309312
# create actor inside on_receive and send message
310313
r = await actor_ref.create_send(
311-
DummyActor, 5, method="add", method_args=(1,), address=pool.external_address
314+
DummyActor,
315+
5,
316+
method="add",
317+
method_args=(1,),
318+
address=actor_pool.external_address,
312319
)
313320
assert r == 6
314321

315322

316323
@pytest.mark.asyncio
317-
async def test_mars_create_actor_error(actor_pool_context):
318-
pool = actor_pool_context
324+
async def test_mars_create_actor_error(actor_pool):
319325
ref1 = await mo.create_actor(
320-
DummyActor, 1, uid="dummy1", address=pool.external_address
326+
DummyActor, 1, uid="dummy1", address=actor_pool.external_address
321327
)
322328
with pytest.raises(mo.ActorAlreadyExist):
323329
await mo.create_actor(
324-
DummyActor, 1, uid="dummy1", address=pool.external_address
330+
DummyActor, 1, uid="dummy1", address=actor_pool.external_address
325331
)
326332
await mo.destroy_actor(ref1)
327333

328334
with pytest.raises(ValueError):
329-
await mo.create_actor(DummyActor, -1, address=pool.external_address)
330-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
335+
await mo.create_actor(DummyActor, -1, address=actor_pool.external_address)
336+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
331337
with pytest.raises(ValueError):
332-
await ref1.create(DummyActor, -2, address=pool.external_address)
338+
await ref1.create(DummyActor, -2, address=actor_pool.external_address)
333339

334340

335341
@pytest.mark.asyncio
336-
async def test_mars_send(actor_pool_context):
337-
pool = actor_pool_context
338-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
342+
async def test_mars_send(actor_pool):
343+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
339344
ref2 = await mo.actor_ref(
340-
await ref1.create(DummyActor, 2, address=pool.external_address)
345+
await ref1.create(DummyActor, 2, address=actor_pool.external_address)
341346
)
342347
assert await ref1.send(ref2, "add", 3) == 5
343348

344-
ref3 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
349+
ref3 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
345350
ref4 = await mo.create_actor(
346-
DummyActor, 2, address=pool.external_address, allocate_strategy=RandomSubPool()
351+
DummyActor,
352+
2,
353+
address=actor_pool.external_address,
354+
allocate_strategy=RandomSubPool(),
347355
)
348356
assert await ref4.send(ref3, "add", 3) == 4
349357

350358

351359
@pytest.mark.asyncio
352-
async def test_mars_send_error(actor_pool_context):
353-
pool = actor_pool_context
354-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
360+
async def test_mars_send_error(actor_pool):
361+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
355362
with pytest.raises(TypeError):
356363
await ref1.add(1.0)
357-
ref2 = await mo.create_actor(DummyActor, 2, address=pool.external_address)
364+
ref2 = await mo.create_actor(DummyActor, 2, address=actor_pool.external_address)
358365
with pytest.raises(TypeError):
359366
await ref1.send(ref2, "add", 1.0)
360367
with pytest.raises(mo.ActorNotExist):
361-
await (await mo.actor_ref("fake_uid", address=pool.external_address)).add(1)
368+
await (await mo.actor_ref("fake_uid", address=actor_pool.external_address)).add(
369+
1
370+
)
362371

363372

364373
@pytest.mark.asyncio
365-
async def test_mars_tell(actor_pool_context):
366-
pool = actor_pool_context
367-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
374+
async def test_mars_tell(actor_pool):
375+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
368376
ref2 = await mo.actor_ref(await ref1.create(DummyActor, 2))
369377
await ref1.tell(ref2, "add", 3)
370378
assert await ref2.get_value() == 5
@@ -382,9 +390,8 @@ async def test_mars_tell(actor_pool_context):
382390

383391

384392
@pytest.mark.asyncio
385-
async def test_mars_batch_method(actor_pool_context):
386-
pool = actor_pool_context
387-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
393+
async def test_mars_batch_method(actor_pool):
394+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
388395
batch_result = await ref1.add_ret.batch(
389396
ref1.add_ret.delay(1), ref1.add_ret.delay(2), ref1.add_ret.delay(3)
390397
)
@@ -401,11 +408,10 @@ async def test_mars_batch_method(actor_pool_context):
401408

402409

403410
@pytest.mark.asyncio
404-
async def test_gather_exception(actor_pool_context):
411+
async def test_gather_exception(actor_pool):
405412
try:
406413
Router.get_instance_or_empty()._cache.clear()
407-
pool = actor_pool_context
408-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
414+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
409415
router = Router.get_instance_or_empty()
410416
client = next(iter(router._cache.values()))
411417

@@ -439,9 +445,8 @@ class MyException(Exception):
439445

440446

441447
@pytest.mark.asyncio
442-
async def test_mars_destroy_has_actor(actor_pool_context):
443-
pool = actor_pool_context
444-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
448+
async def test_mars_destroy_has_actor(actor_pool):
449+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
445450
ref2 = await mo.actor_ref(ref1)
446451
ref2_add_method = ref2.add
447452
assert isinstance(ref1, ActorRef)
@@ -465,7 +470,7 @@ async def test_mars_destroy_has_actor(actor_pool_context):
465470
await ref2_add_method(1)
466471

467472
ref1 = await mo.create_actor(
468-
DummyActor, 1, uid=ref1.uid, address=pool.external_address
473+
DummyActor, 1, uid=ref1.uid, address=actor_pool.external_address
469474
)
470475

471476
# the ref2 should be works after actor is recreated.
@@ -479,8 +484,8 @@ async def test_mars_destroy_has_actor(actor_pool_context):
479484
assert not await mo.has_actor(ref1)
480485
assert not await mo.has_actor(ref2)
481486

482-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
483-
ref2 = await ref1.create(DummyActor, 2, address=pool.external_address)
487+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
488+
ref2 = await ref1.create(DummyActor, 2, address=actor_pool.external_address)
484489

485490
assert await mo.has_actor(ref2)
486491

@@ -489,23 +494,24 @@ async def test_mars_destroy_has_actor(actor_pool_context):
489494

490495
with pytest.raises(mo.ActorNotExist):
491496
await mo.destroy_actor(
492-
await mo.actor_ref("fake_uid", address=pool.external_address)
497+
await mo.actor_ref("fake_uid", address=actor_pool.external_address)
493498
)
494499

495-
ref1 = await mo.create_actor(DummyActor, 1, address=pool.external_address)
500+
ref1 = await mo.create_actor(DummyActor, 1, address=actor_pool.external_address)
496501
with pytest.raises(mo.ActorNotExist):
497-
await ref1.delete(await mo.actor_ref("fake_uid", address=pool.external_address))
502+
await ref1.delete(
503+
await mo.actor_ref("fake_uid", address=actor_pool.external_address)
504+
)
498505

499506
# test self destroy
500-
ref1 = await mo.create_actor(DummyActor, 2, address=pool.external_address)
507+
ref1 = await mo.create_actor(DummyActor, 2, address=actor_pool.external_address)
501508
await ref1.destroy()
502509
assert not await mo.has_actor(ref1)
503510

504511

505512
@pytest.mark.asyncio
506-
async def test_mars_resource_lock(actor_pool_context):
507-
pool = actor_pool_context
508-
ref = await mo.create_actor(ResourceLockActor, address=pool.external_address)
513+
async def test_mars_resource_lock(actor_pool):
514+
ref = await mo.create_actor(ResourceLockActor, address=actor_pool.external_address)
509515
event_list = []
510516

511517
async def test_task(idx):
@@ -525,13 +531,12 @@ async def test_task(idx):
525531

526532

527533
@pytest.mark.asyncio
528-
async def test_promise_chain(actor_pool_context):
529-
pool = actor_pool_context
534+
async def test_promise_chain(actor_pool):
530535
lock_ref = await mo.create_actor(
531-
ResourceLockActor, 2, address=pool.external_address
536+
ResourceLockActor, 2, address=actor_pool.external_address
532537
)
533538
promise_test_ref = await mo.create_actor(
534-
PromiseTestActor, lock_ref, address=pool.external_address
539+
PromiseTestActor, lock_ref, address=actor_pool.external_address
535540
)
536541

537542
delay_val = 1.0
@@ -609,8 +614,8 @@ async def __pre_destroy__(self):
609614

610615
@pytest.mark.asyncio
611616
@pytest.mark.parametrize("in_sub_pool", [True, False])
612-
async def test_error_in_pre_destroy(actor_pool_context, in_sub_pool):
613-
pool = actor_pool_context
617+
async def test_error_in_pre_destroy(actor_pool, in_sub_pool):
618+
pool = actor_pool
614619

615620
strategy = None if not in_sub_pool else RandomSubPool()
616621
a = await mo.create_actor(

mars/oscar/backends/pool.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,8 +724,14 @@ async def create_actor(self, message: CreateActorMessage) -> ResultMessageType:
724724
async def actor_ref(self, message: ActorRefMessage) -> ResultMessageType:
725725
result = await super().actor_ref(message)
726726
if isinstance(result, ErrorMessage):
727-
message.actor_ref.address = self._main_address
728-
result = await self.call(self._main_address, message)
727+
# need a new message id to call main actor
728+
main_message = ActorRefMessage(
729+
new_message_id(),
730+
create_actor_ref(self._main_address, message.actor_ref.uid),
731+
)
732+
result = await self.call(self._main_address, main_message)
733+
# rewrite to message_id of the original request
734+
result.message_id = message.message_id
729735
return result
730736

731737
@implements(AbstractActorPool.destroy_actor)

mars/oscar/context.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
cdef class BaseActorContext:
17-
pass
17+
cdef public str _address
1818

1919

2020
cpdef get_context()

0 commit comments

Comments
 (0)