Skip to content

Commit 7924878

Browse files
committed
make launchers public attributes
exposes methods like `get_output()` etc. via public mechanism
1 parent 83e37b4 commit 7924878

File tree

3 files changed

+57
-57
lines changed

3 files changed

+57
-57
lines changed

ipyparallel/cluster/app.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,12 @@ def start(self):
187187
):
188188
profile = abbreviate_profile_dir(cluster.profile_dir)
189189
cluster_id = cluster.cluster_id
190-
running = bool(cluster._controller)
190+
running = bool(cluster.controller)
191191
# TODO: URL?
192192
engines = 0
193-
if cluster._engine_sets:
193+
if cluster.engines:
194194
engines = sum(
195-
engine_set.n for engine_set in cluster._engine_sets.values()
195+
engine_set.n for engine_set in cluster.engines.values()
196196
)
197197

198198
launcher = cluster.engine_launcher_class.__name__
@@ -351,7 +351,7 @@ async def start_engines(self):
351351
def watch_engines(self):
352352
"""Watch for early engine shutdown"""
353353
# FIXME: public API to get launcher instances?
354-
self.engine_launcher = next(iter(self.cluster._engine_sets.values()))
354+
self.engine_launcher = next(iter(self.cluster.engines.values()))
355355

356356
if not self.early_shutdown:
357357
self.engine_launcher.on_stop(self.engines_stopped)
@@ -477,7 +477,7 @@ async def start_cluster(self):
477477
f"Leaving cluster running: {self.cluster.cluster_file}", file=sys.stderr
478478
)
479479
self.loop.add_callback(self.loop.stop)
480-
self.cluster._controller.on_stop(self.controller_stopped)
480+
self.cluster.controller.on_stop(self.controller_stopped)
481481
self.watch_engines()
482482

483483
def controller_stopped(self, stop_data):

ipyparallel/cluster/cluster.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def _atexit_cleanup_clusters(*args):
5353
if not cluster.shutdown_atexit:
5454
# overridden after register
5555
continue
56-
if cluster._controller or cluster._engine_sets:
56+
if cluster.controller or cluster.engines:
5757
print(f"Stopping cluster {cluster}", file=sys.stderr)
5858
cluster.stop_cluster_sync()
5959

@@ -261,8 +261,8 @@ def _default_log(self):
261261
return traitlets.log.get_logger()
262262

263263
# private state
264-
_controller = Any()
265-
_engine_sets = Dict()
264+
controller = Any()
265+
engines = Dict()
266266

267267
def __init__(self, **kwargs):
268268
"""Construct a Cluster"""
@@ -273,7 +273,7 @@ def __init__(self, **kwargs):
273273
def __del__(self):
274274
if not self.shutdown_atexit:
275275
return
276-
if self._controller or self._engine_sets:
276+
if self.controller or self.engines:
277277
self.stop_cluster_sync()
278278

279279
def __repr__(self):
@@ -293,10 +293,10 @@ def __repr__(self):
293293
profile_dir = "~" + profile_dir[len(home_dir) :]
294294
fields["profile_dir"] = repr(profile_dir)
295295

296-
if self._controller:
296+
if self.controller:
297297
fields["controller"] = "<running>"
298-
if self._engine_sets:
299-
fields["engine_sets"] = list(self._engine_sets)
298+
if self.engines:
299+
fields["engine_sets"] = list(self.engines)
300300

301301
fields_str = ', '.join(f"{key}={value}" for key, value in fields.items())
302302

@@ -314,20 +314,20 @@ def _cls_str(cls):
314314

315315
cluster_info["class"] = _cls_str(self.__class__)
316316

317-
if self._controller:
317+
if self.controller:
318318
d["controller"] = {
319319
"class": _cls_str(self.controller_launcher_class),
320320
"state": None,
321321
}
322-
if self._controller:
323-
d["controller"]["state"] = self._controller.to_dict()
322+
if self.controller:
323+
d["controller"]["state"] = self.controller.to_dict()
324324

325325
d["engines"] = {
326326
"class": _cls_str(self.engine_launcher_class),
327327
"sets": {},
328328
}
329329
sets = d["engines"]["sets"]
330-
for engine_set_id, engine_launcher in self._engine_sets.items():
330+
for engine_set_id, engine_launcher in self.engines.items():
331331
sets[engine_set_id] = engine_launcher.to_dict()
332332
return d
333333

@@ -356,13 +356,13 @@ def from_dict(cls, d, **kwargs):
356356
controller_info = d["controller"]
357357
cls = self.controller_launcher_class = import_item(controller_info["class"])
358358
if controller_info["state"]:
359-
self._controller = cls.from_dict(controller_info["state"], parent=self)
359+
self.controller = cls.from_dict(controller_info["state"], parent=self)
360360

361361
engine_info = d.get("engines")
362362
if engine_info:
363363
cls = self.engine_launcher_class = import_item(engine_info["class"])
364364
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
365-
self._engine_sets[engine_set_id] = cls.from_dict(
365+
self.engines[engine_set_id] = cls.from_dict(
366366
engine_state,
367367
engine_set_id=engine_set_id,
368368
parent=self,
@@ -429,7 +429,7 @@ def update_cluster_file(self):
429429
# setting cluster_file='' disables saving to disk
430430
return
431431

432-
if not self._controller and not self._engine_sets:
432+
if not self.controller and not self.engines:
433433
self.remove_cluster_file()
434434
else:
435435
self.write_cluster_file()
@@ -442,17 +442,17 @@ async def start_controller(self, **kwargs):
442442
# start controller
443443
# retrieve connection info
444444
# webhook?
445-
if self._controller is not None:
445+
if self.controller is not None:
446446
raise RuntimeError(
447-
"controller is already running. Call stop_controller() first."
447+
"controller is already running. Call stopcontroller() first."
448448
)
449449

450450
if self.shutdown_atexit:
451451
_atexit_clusters.add(self)
452452
if not _atexit_cleanup_clusters.registered:
453453
atexit.register(_atexit_cleanup_clusters)
454454

455-
self._controller = controller = self.controller_launcher_class(
455+
self.controller = controller = self.controller_launcher_class(
456456
work_dir=u'.',
457457
parent=self,
458458
log=self.log,
@@ -484,10 +484,10 @@ def add_args(args):
484484

485485
if controller_args is not None:
486486
# ensure we trigger trait observers after we are done
487-
self._controller.controller_args = list(controller_args)
487+
self.controller.controller_args = list(controller_args)
488488

489-
self._controller.on_stop(self._controller_stopped)
490-
r = self._controller.start()
489+
self.controller.on_stop(self._controller_stopped)
490+
r = self.controller.start()
491491
if inspect.isawaitable(r):
492492
await r
493493

@@ -505,7 +505,7 @@ async def start_engines(self, n=None, engine_set_id=None, **kwargs):
505505
# TODO: send engines connection info
506506
if engine_set_id is None:
507507
engine_set_id = f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
508-
engine_set = self._engine_sets[engine_set_id] = self.engine_launcher_class(
508+
engine_set = self.engines[engine_set_id] = self.engine_launcher_class(
509509
work_dir=u'.',
510510
parent=self,
511511
log=self.log,
@@ -548,17 +548,17 @@ async def stop_engines(self, engine_set_id=None):
548548
all engines are stopped.
549549
"""
550550
if engine_set_id is None:
551-
for engine_set_id in list(self._engine_sets):
551+
for engine_set_id in list(self.engines):
552552
await self.stop_engines(engine_set_id)
553553
return
554554
self.log.info(f"Stopping engine(s): {engine_set_id}")
555-
engine_set = self._engine_sets[engine_set_id]
555+
engine_set = self.engines[engine_set_id]
556556
r = engine_set.stop()
557557
if inspect.isawaitable(r):
558558
await r
559559
# retrieve and cleanup output files
560560
engine_set.get_output(remove=True)
561-
self._engine_sets.pop(engine_set_id)
561+
self.engines.pop(engine_set_id)
562562
self.update_cluster_file()
563563

564564
async def stop_engine(self, engine_id):
@@ -572,10 +572,10 @@ async def stop_engine(self, engine_id):
572572
async def restart_engines(self, engine_set_id=None):
573573
"""Restart an engine set"""
574574
if engine_set_id is None:
575-
for engine_set_id in list(self._engine_sets):
575+
for engine_set_id in list(self.engines):
576576
await self.restart_engines(engine_set_id)
577577
return
578-
engine_set = self._engine_sets[engine_set_id]
578+
engine_set = self.engines[engine_set_id]
579579
n = engine_set.n
580580
await self.stop_engines(engine_set_id)
581581
await self.start_engines(n, engine_set_id)
@@ -602,27 +602,27 @@ async def signal_engines(self, signum, engine_set_id=None):
602602
If no engine set is specified, signal all engine sets.
603603
"""
604604
if engine_set_id is None:
605-
for engine_set_id in list(self._engine_sets):
605+
for engine_set_id in list(self.engines):
606606
await self.signal_engines(signum, engine_set_id)
607607
return
608608
self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}")
609-
engine_set = self._engine_sets[engine_set_id]
609+
engine_set = self.engines[engine_set_id]
610610
r = engine_set.signal(signum)
611611
if inspect.isawaitable(r):
612612
await r
613613

614614
async def stop_controller(self):
615615
"""Stop the controller"""
616-
if self._controller and self._controller.running:
616+
if self.controller and self.controller.running:
617617
self.log.info("Stopping controller")
618-
r = self._controller.stop()
618+
r = self.controller.stop()
619619
if inspect.isawaitable(r):
620620
await r
621621

622-
if self._controller:
623-
self._controller.get_output(remove=True)
622+
if self.controller:
623+
self.controller.get_output(remove=True)
624624

625-
self._controller = None
625+
self.controller = None
626626
self.update_cluster_file()
627627

628628
async def stop_cluster(self):
@@ -636,7 +636,7 @@ async def connect_client(self, **client_kwargs):
636636
# this assumes local files exist
637637
from ipyparallel import Client
638638

639-
connection_info = self._controller.get_connection_info()
639+
connection_info = self.controller.get_connection_info()
640640
if inspect.isawaitable(connection_info):
641641
connection_info = await connection_info
642642

ipyparallel/tests/test_cluster.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ async def test_start_stop_controller(Cluster):
6464
with pytest.raises(RuntimeError):
6565
await cluster.start_controller()
6666
assert cluster.config is not None
67-
assert cluster._controller.config is cluster.config
68-
assert cluster._controller is not None
69-
proc = cluster._controller.process
67+
assert cluster.controller.config is cluster.config
68+
assert cluster.controller is not None
69+
proc = cluster.controller.process
7070
assert proc.is_running()
7171
with await cluster.connect_client() as rc:
7272
assert rc.queue_status() == {'unassigned': 0}
7373

7474
await cluster.stop_controller()
7575
proc.wait(timeout=3)
76-
assert cluster._controller is None
76+
assert cluster.controller is None
7777
# stop is idempotent
7878
await cluster.stop_controller()
7979
# TODO: test file cleanup
@@ -85,16 +85,16 @@ async def test_start_stop_engines(Cluster, engine_launcher_class):
8585

8686
n = 2
8787
engine_set_id = await cluster.start_engines(n)
88-
assert engine_set_id in cluster._engine_sets
89-
engine_set = cluster._engine_sets[engine_set_id]
88+
assert engine_set_id in cluster.engines
89+
engine_set = cluster.engines[engine_set_id]
9090
launcher_class = find_launcher_class(engine_launcher_class, "EngineSet")
9191
assert isinstance(engine_set, launcher_class)
9292

9393
with await cluster.connect_client() as rc:
9494
rc.wait_for_engines(n, timeout=_timeout)
9595

9696
await cluster.stop_engines(engine_set_id)
97-
assert cluster._engine_sets == {}
97+
assert cluster.engines == {}
9898
with pytest.raises(KeyError):
9999
await cluster.stop_engines(engine_set_id)
100100

@@ -105,15 +105,15 @@ async def test_start_stop_cluster(Cluster, engine_launcher_class):
105105
n = 2
106106
cluster = Cluster(engine_launcher_class=engine_launcher_class, n=n)
107107
await cluster.start_cluster()
108-
controller = cluster._controller
108+
controller = cluster.controller
109109
assert controller is not None
110-
assert len(cluster._engine_sets) == 1
110+
assert len(cluster.engines) == 1
111111

112112
with await cluster.connect_client() as rc:
113113
rc.wait_for_engines(n, timeout=_timeout)
114114
await cluster.stop_cluster()
115-
assert cluster._controller is None
116-
assert cluster._engine_sets == {}
115+
assert cluster.controller is None
116+
assert cluster.engines == {}
117117

118118

119119
@pytest.mark.skipif(
@@ -155,8 +155,8 @@ async def test_restart_engines(Cluster, engine_launcher_class):
155155
n = 2
156156
async with Cluster(engine_launcher_class=engine_launcher_class, n=n) as rc:
157157
cluster = rc.cluster
158-
engine_set_id = next(iter(cluster._engine_sets))
159-
engine_set = cluster._engine_sets[engine_set_id]
158+
engine_set_id = next(iter(cluster.engines))
159+
engine_set = cluster.engines[engine_set_id]
160160
assert rc.ids[:n] == list(range(n))
161161
before_pids = rc[:].apply_sync(os.getpid)
162162
await cluster.restart_engines()
@@ -240,12 +240,12 @@ async def test_to_from_dict(Cluster, engine_launcher_class):
240240
d = cluster.to_dict()
241241
cluster2 = ipp.Cluster.from_dict(d)
242242
assert not cluster2.shutdown_atexit
243-
assert cluster2._controller is not None
244-
assert cluster2._controller.process.pid == cluster._controller.process.pid
245-
assert list(cluster2._engine_sets) == list(cluster._engine_sets)
243+
assert cluster2.controller is not None
244+
assert cluster2.controller.process.pid == cluster.controller.process.pid
245+
assert list(cluster2.engines) == list(cluster.engines)
246246

247-
es1 = next(iter(cluster._engine_sets.values()))
248-
es2 = next(iter(cluster2._engine_sets.values()))
247+
es1 = next(iter(cluster.engines.values()))
248+
es2 = next(iter(cluster2.engines.values()))
249249
# ensure responsive
250250
rc[:].apply_async(lambda: None).get(timeout=_timeout)
251251
if not sys.platform.startswith("win"):

0 commit comments

Comments
 (0)