Skip to content

Commit b475f14

Browse files
committed
pool.submit cannot be called during process teardown
improve teardown of clusters during process exit log errors and advance in case of failure to stop one cluster so it doesn't prevent stopping others
1 parent d4ed8c9 commit b475f14

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import string
1717
import sys
1818
import time
19+
import traceback
1920
from functools import partial
2021
from multiprocessing import cpu_count
2122
from weakref import WeakSet
@@ -56,7 +57,11 @@ def _atexit_cleanup_clusters(*args):
5657
continue
5758
if cluster.controller or cluster.engines:
5859
print(f"Stopping cluster {cluster}", file=sys.stderr)
59-
cluster.stop_cluster_sync()
60+
try:
61+
cluster.stop_cluster_sync()
62+
except Exception:
63+
print(f"Error stopping cluster {cluster}", file=sys.stderr)
64+
traceback.print_exception(*sys.exc_info())
6065

6166

6267
_atexit_cleanup_clusters.registered = False

ipyparallel/cluster/launcher.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,18 @@ def start(self):
465465
async def join(self, timeout=None):
466466
"""Wait for the process to exit"""
467467
with ThreadPoolExecutor(1) as pool:
468+
wait = partial(self.process.wait, timeout)
468469
try:
469-
await asyncio.wrap_future(
470-
pool.submit(partial(self.process.wait, timeout))
471-
)
470+
try:
471+
future = pool.submit(wait)
472+
except RuntimeError:
473+
# e.g. called during process shutdown,
474+
# which raises
475+
# RuntimeError: cannot schedule new futures after interpreter shutdown
476+
# Instead, do the blocking call
477+
wait()
478+
else:
479+
await asyncio.wrap_future(future)
472480
except psutil.TimeoutExpired:
473481
raise TimeoutError(
474482
f"Process {self.pid} did not complete in {timeout} seconds."
@@ -1184,9 +1192,17 @@ def wait_one(self, timeout):
11841192

11851193
async def join(self, timeout=None):
11861194
with ThreadPoolExecutor(1) as pool:
1187-
await asyncio.wrap_future(
1188-
pool.submit(partial(self.wait_one, timeout=timeout))
1189-
)
1195+
wait = partial(self.wait_one, timeout=timeout)
1196+
try:
1197+
future = pool.submit(wait)
1198+
except RuntimeError:
1199+
# e.g. called during process shutdown,
1200+
# which raises
1201+
# RuntimeError: cannot schedule new futures after interpreter shutdown
1202+
# Instead, do the blocking call
1203+
wait()
1204+
else:
1205+
await asyncio.wrap_future(future)
11901206

11911207
def signal(self, sig):
11921208
if self.state == 'running':

0 commit comments

Comments
 (0)