Skip to content

Commit 211c7eb

Browse files
authored
Merge pull request #521 from minrk/public-controller-engines
make launchers public attributes
2 parents e3401a3 + 7924878 commit 211c7eb

File tree

3 files changed

+57
-57
lines changed

3 files changed

+57
-57
lines changed

ipyparallel/cluster/app.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,12 @@ def start(self):
187187
):
188188
profile = abbreviate_profile_dir(cluster.profile_dir)
189189
cluster_id = cluster.cluster_id
190-
running = bool(cluster._controller)
190+
running = bool(cluster.controller)
191191
# TODO: URL?
192192
engines = 0
193-
if cluster._engine_sets:
193+
if cluster.engines:
194194
engines = sum(
195-
engine_set.n for engine_set in cluster._engine_sets.values()
195+
engine_set.n for engine_set in cluster.engines.values()
196196
)
197197

198198
launcher = cluster.engine_launcher_class.__name__
@@ -351,7 +351,7 @@ async def start_engines(self):
351351
def watch_engines(self):
352352
"""Watch for early engine shutdown"""
353353
# FIXME: public API to get launcher instances?
354-
self.engine_launcher = next(iter(self.cluster._engine_sets.values()))
354+
self.engine_launcher = next(iter(self.cluster.engines.values()))
355355

356356
if not self.early_shutdown:
357357
self.engine_launcher.on_stop(self.engines_stopped)
@@ -477,7 +477,7 @@ async def start_cluster(self):
477477
f"Leaving cluster running: {self.cluster.cluster_file}", file=sys.stderr
478478
)
479479
self.loop.add_callback(self.loop.stop)
480-
self.cluster._controller.on_stop(self.controller_stopped)
480+
self.cluster.controller.on_stop(self.controller_stopped)
481481
self.watch_engines()
482482

483483
def controller_stopped(self, stop_data):

ipyparallel/cluster/cluster.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def _atexit_cleanup_clusters(*args):
5454
if not cluster.shutdown_atexit:
5555
# overridden after register
5656
continue
57-
if cluster._controller or cluster._engine_sets:
57+
if cluster.controller or cluster.engines:
5858
print(f"Stopping cluster {cluster}", file=sys.stderr)
5959
cluster.stop_cluster_sync()
6060

@@ -263,8 +263,8 @@ def _default_log(self):
263263
return traitlets.log.get_logger()
264264

265265
# private state
266-
_controller = Any()
267-
_engine_sets = Dict()
266+
controller = Any()
267+
engines = Dict()
268268

269269
def __init__(self, **kwargs):
270270
"""Construct a Cluster"""
@@ -275,7 +275,7 @@ def __init__(self, **kwargs):
275275
def __del__(self):
276276
if not self.shutdown_atexit:
277277
return
278-
if self._controller or self._engine_sets:
278+
if self.controller or self.engines:
279279
self.stop_cluster_sync()
280280

281281
def __repr__(self):
@@ -295,10 +295,10 @@ def __repr__(self):
295295
profile_dir = "~" + profile_dir[len(home_dir) :]
296296
fields["profile_dir"] = repr(profile_dir)
297297

298-
if self._controller:
298+
if self.controller:
299299
fields["controller"] = "<running>"
300-
if self._engine_sets:
301-
fields["engine_sets"] = list(self._engine_sets)
300+
if self.engines:
301+
fields["engine_sets"] = list(self.engines)
302302

303303
fields_str = ', '.join(f"{key}={value}" for key, value in fields.items())
304304

@@ -316,20 +316,20 @@ def _cls_str(cls):
316316

317317
cluster_info["class"] = _cls_str(self.__class__)
318318

319-
if self._controller:
319+
if self.controller:
320320
d["controller"] = {
321321
"class": _cls_str(self.controller_launcher_class),
322322
"state": None,
323323
}
324-
if self._controller:
325-
d["controller"]["state"] = self._controller.to_dict()
324+
if self.controller:
325+
d["controller"]["state"] = self.controller.to_dict()
326326

327327
d["engines"] = {
328328
"class": _cls_str(self.engine_launcher_class),
329329
"sets": {},
330330
}
331331
sets = d["engines"]["sets"]
332-
for engine_set_id, engine_launcher in self._engine_sets.items():
332+
for engine_set_id, engine_launcher in self.engines.items():
333333
sets[engine_set_id] = engine_launcher.to_dict()
334334
return d
335335

@@ -358,13 +358,13 @@ def from_dict(cls, d, **kwargs):
358358
controller_info = d["controller"]
359359
cls = self.controller_launcher_class = import_item(controller_info["class"])
360360
if controller_info["state"]:
361-
self._controller = cls.from_dict(controller_info["state"], parent=self)
361+
self.controller = cls.from_dict(controller_info["state"], parent=self)
362362

363363
engine_info = d.get("engines")
364364
if engine_info:
365365
cls = self.engine_launcher_class = import_item(engine_info["class"])
366366
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
367-
self._engine_sets[engine_set_id] = cls.from_dict(
367+
self.engines[engine_set_id] = cls.from_dict(
368368
engine_state,
369369
engine_set_id=engine_set_id,
370370
parent=self,
@@ -431,7 +431,7 @@ def update_cluster_file(self):
431431
# setting cluster_file='' disables saving to disk
432432
return
433433

434-
if not self._controller and not self._engine_sets:
434+
if not self.controller and not self.engines:
435435
self.remove_cluster_file()
436436
else:
437437
self.write_cluster_file()
@@ -444,17 +444,17 @@ async def start_controller(self, **kwargs):
444444
# start controller
445445
# retrieve connection info
446446
# webhook?
447-
if self._controller is not None:
447+
if self.controller is not None:
448448
raise RuntimeError(
449-
"controller is already running. Call stop_controller() first."
449+
"controller is already running. Call stopcontroller() first."
450450
)
451451

452452
if self.shutdown_atexit:
453453
_atexit_clusters.add(self)
454454
if not _atexit_cleanup_clusters.registered:
455455
atexit.register(_atexit_cleanup_clusters)
456456

457-
self._controller = controller = self.controller_launcher_class(
457+
self.controller = controller = self.controller_launcher_class(
458458
work_dir=u'.',
459459
parent=self,
460460
log=self.log,
@@ -486,10 +486,10 @@ def add_args(args):
486486

487487
if controller_args is not None:
488488
# ensure we trigger trait observers after we are done
489-
self._controller.controller_args = list(controller_args)
489+
self.controller.controller_args = list(controller_args)
490490

491-
self._controller.on_stop(self._controller_stopped)
492-
r = self._controller.start()
491+
self.controller.on_stop(self._controller_stopped)
492+
r = self.controller.start()
493493
if inspect.isawaitable(r):
494494
await r
495495

@@ -507,7 +507,7 @@ async def start_engines(self, n=None, engine_set_id=None, **kwargs):
507507
# TODO: send engines connection info
508508
if engine_set_id is None:
509509
engine_set_id = f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
510-
engine_set = self._engine_sets[engine_set_id] = self.engine_launcher_class(
510+
engine_set = self.engines[engine_set_id] = self.engine_launcher_class(
511511
work_dir=u'.',
512512
parent=self,
513513
log=self.log,
@@ -550,17 +550,17 @@ async def stop_engines(self, engine_set_id=None):
550550
all engines are stopped.
551551
"""
552552
if engine_set_id is None:
553-
for engine_set_id in list(self._engine_sets):
553+
for engine_set_id in list(self.engines):
554554
await self.stop_engines(engine_set_id)
555555
return
556556
self.log.info(f"Stopping engine(s): {engine_set_id}")
557-
engine_set = self._engine_sets[engine_set_id]
557+
engine_set = self.engines[engine_set_id]
558558
r = engine_set.stop()
559559
if inspect.isawaitable(r):
560560
await r
561561
# retrieve and cleanup output files
562562
engine_set.get_output(remove=True)
563-
self._engine_sets.pop(engine_set_id)
563+
self.engines.pop(engine_set_id)
564564
self.update_cluster_file()
565565

566566
async def stop_engine(self, engine_id):
@@ -574,10 +574,10 @@ async def stop_engine(self, engine_id):
574574
async def restart_engines(self, engine_set_id=None):
575575
"""Restart an engine set"""
576576
if engine_set_id is None:
577-
for engine_set_id in list(self._engine_sets):
577+
for engine_set_id in list(self.engines):
578578
await self.restart_engines(engine_set_id)
579579
return
580-
engine_set = self._engine_sets[engine_set_id]
580+
engine_set = self.engines[engine_set_id]
581581
n = engine_set.n
582582
await self.stop_engines(engine_set_id)
583583
await self.start_engines(n, engine_set_id)
@@ -604,27 +604,27 @@ async def signal_engines(self, signum, engine_set_id=None):
604604
If no engine set is specified, signal all engine sets.
605605
"""
606606
if engine_set_id is None:
607-
for engine_set_id in list(self._engine_sets):
607+
for engine_set_id in list(self.engines):
608608
await self.signal_engines(signum, engine_set_id)
609609
return
610610
self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}")
611-
engine_set = self._engine_sets[engine_set_id]
611+
engine_set = self.engines[engine_set_id]
612612
r = engine_set.signal(signum)
613613
if inspect.isawaitable(r):
614614
await r
615615

616616
async def stop_controller(self):
617617
"""Stop the controller"""
618-
if self._controller and self._controller.running:
618+
if self.controller and self.controller.running:
619619
self.log.info("Stopping controller")
620-
r = self._controller.stop()
620+
r = self.controller.stop()
621621
if inspect.isawaitable(r):
622622
await r
623623

624-
if self._controller:
625-
self._controller.get_output(remove=True)
624+
if self.controller:
625+
self.controller.get_output(remove=True)
626626

627-
self._controller = None
627+
self.controller = None
628628
self.update_cluster_file()
629629

630630
async def stop_cluster(self):
@@ -638,7 +638,7 @@ async def connect_client(self, **client_kwargs):
638638
# this assumes local files exist
639639
from ipyparallel import Client
640640

641-
connection_info = self._controller.get_connection_info()
641+
connection_info = self.controller.get_connection_info()
642642
if inspect.isawaitable(connection_info):
643643
connection_info = await connection_info
644644

ipyparallel/tests/test_cluster.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ async def test_start_stop_controller(Cluster):
6464
with pytest.raises(RuntimeError):
6565
await cluster.start_controller()
6666
assert cluster.config is not None
67-
assert cluster._controller.config is cluster.config
68-
assert cluster._controller is not None
69-
proc = cluster._controller.process
67+
assert cluster.controller.config is cluster.config
68+
assert cluster.controller is not None
69+
proc = cluster.controller.process
7070
assert proc.is_running()
7171
with await cluster.connect_client() as rc:
7272
assert rc.queue_status() == {'unassigned': 0}
7373

7474
await cluster.stop_controller()
7575
proc.wait(timeout=3)
76-
assert cluster._controller is None
76+
assert cluster.controller is None
7777
# stop is idempotent
7878
await cluster.stop_controller()
7979
# TODO: test file cleanup
@@ -85,16 +85,16 @@ async def test_start_stop_engines(Cluster, engine_launcher_class):
8585

8686
n = 2
8787
engine_set_id = await cluster.start_engines(n)
88-
assert engine_set_id in cluster._engine_sets
89-
engine_set = cluster._engine_sets[engine_set_id]
88+
assert engine_set_id in cluster.engines
89+
engine_set = cluster.engines[engine_set_id]
9090
launcher_class = find_launcher_class(engine_launcher_class, "EngineSet")
9191
assert isinstance(engine_set, launcher_class)
9292

9393
with await cluster.connect_client() as rc:
9494
rc.wait_for_engines(n, timeout=_timeout)
9595

9696
await cluster.stop_engines(engine_set_id)
97-
assert cluster._engine_sets == {}
97+
assert cluster.engines == {}
9898
with pytest.raises(KeyError):
9999
await cluster.stop_engines(engine_set_id)
100100

@@ -105,15 +105,15 @@ async def test_start_stop_cluster(Cluster, engine_launcher_class):
105105
n = 2
106106
cluster = Cluster(engine_launcher_class=engine_launcher_class, n=n)
107107
await cluster.start_cluster()
108-
controller = cluster._controller
108+
controller = cluster.controller
109109
assert controller is not None
110-
assert len(cluster._engine_sets) == 1
110+
assert len(cluster.engines) == 1
111111

112112
with await cluster.connect_client() as rc:
113113
rc.wait_for_engines(n, timeout=_timeout)
114114
await cluster.stop_cluster()
115-
assert cluster._controller is None
116-
assert cluster._engine_sets == {}
115+
assert cluster.controller is None
116+
assert cluster.engines == {}
117117

118118

119119
@pytest.mark.skipif(
@@ -155,8 +155,8 @@ async def test_restart_engines(Cluster, engine_launcher_class):
155155
n = 2
156156
async with Cluster(engine_launcher_class=engine_launcher_class, n=n) as rc:
157157
cluster = rc.cluster
158-
engine_set_id = next(iter(cluster._engine_sets))
159-
engine_set = cluster._engine_sets[engine_set_id]
158+
engine_set_id = next(iter(cluster.engines))
159+
engine_set = cluster.engines[engine_set_id]
160160
assert rc.ids[:n] == list(range(n))
161161
before_pids = rc[:].apply_sync(os.getpid)
162162
await cluster.restart_engines()
@@ -240,12 +240,12 @@ async def test_to_from_dict(Cluster, engine_launcher_class):
240240
d = cluster.to_dict()
241241
cluster2 = ipp.Cluster.from_dict(d)
242242
assert not cluster2.shutdown_atexit
243-
assert cluster2._controller is not None
244-
assert cluster2._controller.process.pid == cluster._controller.process.pid
245-
assert list(cluster2._engine_sets) == list(cluster._engine_sets)
243+
assert cluster2.controller is not None
244+
assert cluster2.controller.process.pid == cluster.controller.process.pid
245+
assert list(cluster2.engines) == list(cluster.engines)
246246

247-
es1 = next(iter(cluster._engine_sets.values()))
248-
es2 = next(iter(cluster2._engine_sets.values()))
247+
es1 = next(iter(cluster.engines.values()))
248+
es2 = next(iter(cluster2.engines.values()))
249249
# ensure responsive
250250
rc[:].apply_async(lambda: None).get(timeout=_timeout)
251251
if not sys.platform.startswith("win"):

0 commit comments

Comments
 (0)