Skip to content

Commit 04725e1

Browse files
committed
simpler checks for stopped clusters
- join wait thread, if running, to ensure stop callbacks have been called - log updates to the cluster file - async waits in load_clusters to give stop events a chance to fire
1 parent d8db79d commit 04725e1

File tree

3 files changed

+29
-11
lines changed

3 files changed

+29
-11
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def from_file(
511511
def write_cluster_file(self):
512512
"""Write cluster info to disk for later loading"""
513513
os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True)
514+
self.log.debug(f"Updating {self.cluster_file}")
514515
with open(self.cluster_file, "w") as f:
515516
json.dump(self.to_dict(), f)
516517

@@ -523,6 +524,14 @@ def remove_cluster_file(self):
523524
else:
524525
self.log.debug(f"Removed cluster file: {self.cluster_file}")
525526

527+
def _is_running(self):
528+
"""Return if we have any running components"""
529+
if self.controller and self.controller.state != 'after':
530+
return True
531+
if any(es.state != 'after' for es in self.engines.values()):
532+
return True
533+
return False
534+
526535
def update_cluster_file(self):
527536
"""Update my cluster file
528537
@@ -533,9 +542,7 @@ def update_cluster_file(self):
533542
# setting cluster_file='' disables saving to disk
534543
return
535544

536-
if (not self.controller or self.controller.state == 'after') and not any(
537-
es.state == 'after' for es in self.engines.values()
538-
):
545+
if not self._is_running():
539546
self.remove_cluster_file()
540547
else:
541548
self.write_cluster_file()
@@ -862,10 +869,10 @@ def load_clusters(
862869
for key, cluster in list(self.clusters.items()):
863870
# remove stopped clusters
864871
# but not *new* clusters that haven't started yet
865-
if (cluster.controller and cluster.controller.state == 'after') and all(
866-
es.state == 'after' for es in cluster.engines.values()
867-
):
868-
self.log.info("Removing stopped cluster {key}")
872+
# if `cluster.controller` is present
873+
# that means it was running at some point
874+
if cluster.controller and not cluster._is_running():
875+
self.log.info(f"Removing stopped cluster {key}")
869876
self.clusters.pop(key)
870877

871878
if profile_dirs is None:

ipyparallel/cluster/launcher.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,11 @@ async def join(self, timeout=None):
490490
raise TimeoutError(
491491
f"Process {self.pid} did not complete in {timeout} seconds."
492492
)
493+
if getattr(self, '_stop_waiting', None) and getattr(self, "_wait_thread", None):
494+
self._stop_waiting.set()
495+
# got here, should be done
496+
# wait for wait_thread to cleanup
497+
self._wait_thread.join()
493498

494499
def _stream_file(self, path):
495500
"""Stream one file"""
@@ -1226,6 +1231,11 @@ async def join(self, timeout=None):
12261231
wait()
12271232
else:
12281233
await asyncio.wrap_future(future)
1234+
if getattr(self, '_stop_waiting', None) and getattr(self, "_wait_thread", None):
1235+
self._stop_waiting.set()
1236+
# got here, should be done
1237+
# wait for wait_thread to cleanup
1238+
self._wait_thread.join()
12291239

12301240
def signal(self, sig):
12311241
if self.state == 'running':

ipyparallel/tests/test_cluster.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import json
3+
import logging
34
import os
45
import signal
56
import sys
@@ -310,9 +311,9 @@ async def test_default_from_file(Cluster):
310311

311312

312313
async def test_cluster_manager_notice_stop(Cluster):
313-
cm = cluster.ClusterManager()
314+
cm = cluster.ClusterManager(log=logging.getLogger())
314315
cm.load_clusters()
315-
c = Cluster(n=1)
316+
c = Cluster(n=1, log=cm.log)
316317
key = cm._cluster_key(c)
317318
assert key not in cm.clusters
318319

@@ -326,8 +327,8 @@ async def test_cluster_manager_notice_stop(Cluster):
326327
# refresh list, cleans out stopped clusters
327328
# can take some time to notice
328329
tic = time.perf_counter()
329-
deadline = time.perf_counter() + 30
330+
deadline = time.perf_counter() + _timeout
330331
while time.perf_counter() < deadline and key in cm.clusters:
331-
time.sleep(0.2)
332+
await asyncio.sleep(0.2)
332333
cm.load_clusters()
333334
assert key not in cm.clusters

0 commit comments

Comments
 (0)