Skip to content

Commit 5afec84

Browse files
authored
Merge pull request #540 from minrk/launcher-cleanup
Improve launcher cleanup
2 parents d4ed8c9 + 081c773 commit 5afec84

File tree

2 files changed

+75
-17
lines changed

2 files changed

+75
-17
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import string
1717
import sys
1818
import time
19+
import traceback
1920
from functools import partial
2021
from multiprocessing import cpu_count
2122
from weakref import WeakSet
@@ -56,7 +57,11 @@ def _atexit_cleanup_clusters(*args):
5657
continue
5758
if cluster.controller or cluster.engines:
5859
print(f"Stopping cluster {cluster}", file=sys.stderr)
59-
cluster.stop_cluster_sync()
60+
try:
61+
cluster.stop_cluster_sync()
62+
except Exception:
63+
print(f"Error stopping cluster {cluster}", file=sys.stderr)
64+
traceback.print_exception(*sys.exc_info())
6065

6166

6267
_atexit_cleanup_clusters.registered = False
@@ -354,22 +359,37 @@ def from_dict(cls, d, **kwargs):
354359
if attr in d:
355360
setattr(self, attr, d[attr])
356361

362+
cluster_key = ClusterManager._cluster_key(self)
363+
357364
if d.get("controller"):
358365
controller_info = d["controller"]
359366
cls = self.controller_launcher_class = import_item(controller_info["class"])
360367
if controller_info["state"]:
361-
self.controller = cls.from_dict(controller_info["state"], parent=self)
368+
try:
369+
self.controller = cls.from_dict(
370+
controller_info["state"], parent=self
371+
)
372+
except launcher.NotRunning as e:
373+
self.log.error(f"Controller for {cluster_key} not running: {e}")
362374

363375
engine_info = d.get("engines")
364376
if engine_info:
365377
cls = self.engine_launcher_class = import_item(engine_info["class"])
366378
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
367-
self.engines[engine_set_id] = cls.from_dict(
368-
engine_state,
369-
engine_set_id=engine_set_id,
370-
parent=self,
371-
)
372-
379+
try:
380+
self.engines[engine_set_id] = cls.from_dict(
381+
engine_state,
382+
engine_set_id=engine_set_id,
383+
parent=self,
384+
)
385+
except launcher.NotRunning as e:
386+
self.log.error(
387+
f"Engine set {cluster_key}{engine_set_id} not running: {e}"
388+
)
389+
# check if state changed
390+
if self.to_dict() != d:
391+
# if so, update our cluster file
392+
self.update_cluster_file()
373393
return self
374394

375395
@classmethod
@@ -698,7 +718,8 @@ class ClusterManager(LoggingConfigurable):
698718

699719
_clusters = Dict(help="My cluster objects")
700720

701-
def _cluster_key(self, cluster):
721+
@staticmethod
722+
def _cluster_key(cluster):
702723
"""Return a unique cluster key for a cluster
703724
704725
Default is {profile}:{cluster_id}

ipyparallel/cluster/launcher.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class UnknownStatus(LauncherError):
8282
pass
8383

8484

85+
class NotRunning(LauncherError):
86+
"""Raised when a launcher is no longer running"""
87+
88+
pass
89+
90+
8591
class BaseLauncher(LoggingConfigurable):
8692
"""An abstraction for starting, stopping and signaling a process."""
8793

@@ -408,7 +414,10 @@ def from_dict(cls, d, **kwargs):
408414
def _reconstruct_process(self, d):
409415
"""Reconstruct our process"""
410416
if 'pid' in d and d['pid'] > 0:
411-
self.process = psutil.Process(d['pid'])
417+
try:
418+
self.process = psutil.Process(d['pid'])
419+
except psutil.NoSuchProcess as e:
420+
raise NotRunning(f"Process {d['pid']}")
412421
self._start_waiting()
413422

414423
def _wait(self):
@@ -465,10 +474,18 @@ def start(self):
465474
async def join(self, timeout=None):
466475
"""Wait for the process to exit"""
467476
with ThreadPoolExecutor(1) as pool:
477+
wait = partial(self.process.wait, timeout)
468478
try:
469-
await asyncio.wrap_future(
470-
pool.submit(partial(self.process.wait, timeout))
471-
)
479+
try:
480+
future = pool.submit(wait)
481+
except RuntimeError:
482+
# e.g. called during process shutdown,
483+
# which raises
484+
# RuntimeError: cannot schedule new futures after interpreter shutdown
485+
# Instead, do the blocking call
486+
wait()
487+
else:
488+
await asyncio.wrap_future(future)
472489
except psutil.TimeoutExpired:
473490
raise TimeoutError(
474491
f"Process {self.pid} did not complete in {timeout} seconds."
@@ -638,8 +655,20 @@ def to_dict(self):
638655
@classmethod
639656
def from_dict(cls, d, **kwargs):
640657
self = super().from_dict(d, **kwargs)
658+
n = 0
641659
for i, engine_dict in d['engines'].items():
642-
self.launchers[i] = self.launcher_class.from_dict(engine_dict, parent=self)
660+
try:
661+
self.launchers[i] = self.launcher_class.from_dict(
662+
engine_dict, parent=self
663+
)
664+
except NotRunning as e:
665+
self.log.error(f"Engine {i} not running: {e}")
666+
else:
667+
n += 1
668+
if n == 0:
669+
raise NotRunning("No engines left")
670+
else:
671+
self.n = n
643672
return self
644673

645674
def start(self, n):
@@ -1184,9 +1213,17 @@ def wait_one(self, timeout):
11841213

11851214
async def join(self, timeout=None):
11861215
with ThreadPoolExecutor(1) as pool:
1187-
await asyncio.wrap_future(
1188-
pool.submit(partial(self.wait_one, timeout=timeout))
1189-
)
1216+
wait = partial(self.wait_one, timeout=timeout)
1217+
try:
1218+
future = pool.submit(wait)
1219+
except RuntimeError:
1220+
# e.g. called during process shutdown,
1221+
# which raises
1222+
# RuntimeError: cannot schedule new futures after interpreter shutdown
1223+
# Instead, do the blocking call
1224+
wait()
1225+
else:
1226+
await asyncio.wrap_future(future)
11901227

11911228
def signal(self, sig):
11921229
if self.state == 'running':

0 commit comments

Comments
 (0)