Skip to content

Commit 081c773

Browse files
committed
add Launcher.NotRunning for handling Launcher.from_dict for a defunct process
e.g. process shutdown since last serialized avoids persistent errors for defunct clusters
1 parent b475f14 commit 081c773

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,37 @@ def from_dict(cls, d, **kwargs):
359359
if attr in d:
360360
setattr(self, attr, d[attr])
361361

362+
cluster_key = ClusterManager._cluster_key(self)
363+
362364
if d.get("controller"):
363365
controller_info = d["controller"]
364366
cls = self.controller_launcher_class = import_item(controller_info["class"])
365367
if controller_info["state"]:
366-
self.controller = cls.from_dict(controller_info["state"], parent=self)
368+
try:
369+
self.controller = cls.from_dict(
370+
controller_info["state"], parent=self
371+
)
372+
except launcher.NotRunning as e:
373+
self.log.error(f"Controller for {cluster_key} not running: {e}")
367374

368375
engine_info = d.get("engines")
369376
if engine_info:
370377
cls = self.engine_launcher_class = import_item(engine_info["class"])
371378
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
372-
self.engines[engine_set_id] = cls.from_dict(
373-
engine_state,
374-
engine_set_id=engine_set_id,
375-
parent=self,
376-
)
377-
379+
try:
380+
self.engines[engine_set_id] = cls.from_dict(
381+
engine_state,
382+
engine_set_id=engine_set_id,
383+
parent=self,
384+
)
385+
except launcher.NotRunning as e:
386+
self.log.error(
387+
f"Engine set {cluster_key}{engine_set_id} not running: {e}"
388+
)
389+
# check if state changed
390+
if self.to_dict() != d:
391+
# if so, update our cluster file
392+
self.update_cluster_file()
378393
return self
379394

380395
@classmethod
@@ -703,7 +718,8 @@ class ClusterManager(LoggingConfigurable):
703718

704719
_clusters = Dict(help="My cluster objects")
705720

706-
def _cluster_key(self, cluster):
721+
@staticmethod
722+
def _cluster_key(cluster):
707723
"""Return a unique cluster key for a cluster
708724
709725
Default is {profile}:{cluster_id}

ipyparallel/cluster/launcher.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class UnknownStatus(LauncherError):
8282
pass
8383

8484

85+
class NotRunning(LauncherError):
86+
"""Raised when a launcher is no longer running"""
87+
88+
pass
89+
90+
8591
class BaseLauncher(LoggingConfigurable):
8692
"""An abstraction for starting, stopping and signaling a process."""
8793

@@ -408,7 +414,10 @@ def from_dict(cls, d, **kwargs):
408414
def _reconstruct_process(self, d):
409415
"""Reconstruct our process"""
410416
if 'pid' in d and d['pid'] > 0:
411-
self.process = psutil.Process(d['pid'])
417+
try:
418+
self.process = psutil.Process(d['pid'])
419+
except psutil.NoSuchProcess as e:
420+
raise NotRunning(f"Process {d['pid']}")
412421
self._start_waiting()
413422

414423
def _wait(self):
@@ -646,8 +655,20 @@ def to_dict(self):
646655
@classmethod
647656
def from_dict(cls, d, **kwargs):
648657
self = super().from_dict(d, **kwargs)
658+
n = 0
649659
for i, engine_dict in d['engines'].items():
650-
self.launchers[i] = self.launcher_class.from_dict(engine_dict, parent=self)
660+
try:
661+
self.launchers[i] = self.launcher_class.from_dict(
662+
engine_dict, parent=self
663+
)
664+
except NotRunning as e:
665+
self.log.error(f"Engine {i} not running: {e}")
666+
else:
667+
n += 1
668+
if n == 0:
669+
raise NotRunning("No engines left")
670+
else:
671+
self.n = n
651672
return self
652673

653674
def start(self, n):

0 commit comments

Comments
 (0)