Skip to content

Commit 011f2f6

Browse files
authored
Merge pull request #493 from minrk/atexit-cleanup
shutdown clusters at exit by default
2 parents 8d48481 + d33f243 commit 011f2f6

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
starts/stops/polls controllers, engines, etc.
66
"""
77
import asyncio
8+
import atexit
89
import inspect
910
import logging
1011
import os
@@ -15,12 +16,14 @@
1516
import time
1617
from functools import partial
1718
from multiprocessing import cpu_count
19+
from weakref import WeakSet
1820

1921
import IPython
2022
import traitlets.log
2123
from IPython.core.profiledir import ProfileDir
2224
from IPython.core.profiledir import ProfileDirError
2325
from traitlets import Any
26+
from traitlets import Bool
2427
from traitlets import default
2528
from traitlets import Dict
2629
from traitlets import Float
@@ -35,6 +38,23 @@
3538

3639
_suffix_chars = string.ascii_lowercase + string.digits
3740

41+
# weak set of clusters to be cleaned up at exit
42+
_atexit_clusters = WeakSet()
43+
44+
45+
def _atexit_cleanup_clusters(*args):
46+
"""Cleanup clusters during process shutdown"""
47+
for cluster in _atexit_clusters:
48+
if not cluster.shutdown_atexit:
49+
# overridden after register
50+
continue
51+
if cluster._controller or cluster._engine_sets:
52+
print(f"Stopping cluster {cluster}", file=sys.stderr)
53+
cluster.stop_cluster_sync()
54+
55+
56+
_atexit_cleanup_clusters.registered = False
57+
3858

3959
class Cluster(AsyncFirst, LoggingConfigurable):
4060
"""Class representing an IPP cluster
@@ -48,6 +68,17 @@ class Cluster(AsyncFirst, LoggingConfigurable):
4868
"""
4969

5070
# general configuration
71+
72+
shutdown_atexit = Bool(
73+
True,
74+
help="""
75+
Shutdown the cluster at process exit.
76+
77+
Set to False if you want to launch a cluster and leave it running
78+
after the launching process exits.
79+
""",
80+
)
81+
5182
cluster_id = Unicode(help="The id of the cluster (default: random string)")
5283

5384
@default("cluster_id")
@@ -224,6 +255,12 @@ def _default_log(self):
224255
_controller = Any()
225256
_engine_sets = Dict()
226257

258+
def __del__(self):
259+
if not self.shutdown_atexit:
260+
return
261+
if self._controller or self._engine_sets:
262+
self.stop_cluster_sync()
263+
227264
def __repr__(self):
228265

229266
fields = {
@@ -272,6 +309,11 @@ async def start_controller(self, **kwargs):
272309
"controller is already running. Call stop_controller() first."
273310
)
274311

312+
if self.shutdown_atexit:
313+
_atexit_clusters.add(self)
314+
if not _atexit_cleanup_clusters.registered:
315+
atexit.register(_atexit_cleanup_clusters)
316+
275317
self._controller = controller = self.controller_launcher_class(
276318
work_dir=u'.',
277319
parent=self,

0 commit comments

Comments
 (0)