Skip to content

Commit e8739e9

Browse files
committed
cluster: default engine set methods to all engines if unspecified
most folks are going to use just one engine set anyway, so no need to keep track of the engine set id
1 parent 5affde1 commit e8739e9

File tree

1 file changed

+42
-10
lines changed

1 file changed

+42
-10
lines changed

ipyparallel/cluster/cluster.py

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
starts/stops/polls controllers, engines, etc.
66
"""
77
import asyncio
8+
import atexit
89
import inspect
910
import logging
1011
import os
@@ -304,7 +305,7 @@ async def start_engines(self, n=None, engine_set_id=None):
304305
n = cpu_count()
305306
self.log.info(f"Starting {n or ''} engines with {self.engine_launcher_class}")
306307
r = engine_set.start(n)
307-
engine_set.on_stop(self._engines_stopped)
308+
engine_set.on_stop(partial(self._engines_stopped, engine_set_id))
308309
if inspect.isawaitable(r):
309310
await r
310311
return engine_set_id
@@ -347,8 +348,12 @@ async def stop_engine(self, engine_id):
347348
"""
348349
raise NotImplementedError("How do we find an engine by id?")
349350

350-
async def restart_engine_set(self, engine_set_id):
351+
async def restart_engines(self, engine_set_id=None):
351352
"""Restart an engine set"""
353+
if engine_set_id is None:
354+
for engine_set_id in list(self._engine_sets):
355+
await self.restart_engines(engine_set_id)
356+
return
352357
engine_set = self._engine_sets[engine_set_id]
353358
n = engine_set.n
354359
await self.stop_engines(engine_set_id)
@@ -362,19 +367,28 @@ async def restart_engine(self, engine_id):
362367
"""
363368
raise NotImplementedError("How do we find an engine by id?")
364369

365-
async def signal_engine(self, engine_id, signum):
370+
async def signal_engine(self, signum, engine_id):
366371
"""Signal one engine
367372
368373
*May* signal all engines in a set,
369374
depending on EngineSet features (e.g. mpiexec)
370375
"""
371376
raise NotImplementedError("How do we find an engine by id?")
372377

373-
async def signal_engines(self, engine_set_id, signum):
374-
"""Signal all engines in a set"""
378+
async def signal_engines(self, signum, engine_set_id=None):
379+
"""Signal all engines in a set
380+
381+
If no engine set is specified, signal all engine sets.
382+
"""
383+
if engine_set_id is None:
384+
for engine_set_id in list(self._engine_sets):
385+
await self.signal_engines(signum, engine_set_id)
386+
return
375387
self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}")
376388
engine_set = self._engine_sets[engine_set_id]
377-
engine_set.signal(signum)
389+
r = engine_set.signal(signum)
390+
if inspect.isawaitable(r):
391+
await r
378392

379393
async def stop_controller(self):
380394
"""Stop the controller"""
@@ -443,19 +457,37 @@ def __exit__(self, *args):
443457
class ClusterManager(LoggingConfigurable):
444458
"""A manager of clusters
445459
446-
Wraps Cluster, adding"""
460+
Wraps Cluster, adding lookup/list by cluster id
461+
"""
447462

448463
_clusters = Dict(help="My cluster objects")
449464

450-
def load_clusters(self):
465+
def load_clusters(self, serialized_state):
451466
"""Load serialized cluster state"""
452-
raise NotImplementedError()
467+
raise NotImplementedError("Serializing clusters not implemented")
453468

454469
def list_clusters(self):
455470
"""List current clusters"""
471+
# TODO: what should we return?
472+
# just cluster ids or the full dict?
473+
# just cluster ids for now
474+
return sorted(self._clusters)
456475

457-
def new_cluster(self, cluster_cls):
476+
def new_cluster(self, cluster_cls, **kwargs):
458477
"""Create a new cluster"""
478+
cluster = Cluster(parent=self)
479+
if cluster.cluster_id in self._clusters:
480+
raise KeyError(f"Cluster {cluster.cluster_id} already exists!")
481+
self._clusters[cluster]
482+
483+
def get_cluster(self, cluster_id):
484+
"""Get a Cluster object by id"""
485+
return self._clusters[cluster_id]
486+
487+
def remove_cluster(self, cluster_id):
488+
"""Delete a cluster by id"""
489+
# TODO: check running?
490+
del self._clusters[cluster_id]
459491

460492
def _cluster_method(self, method_name, cluster_id, *args, **kwargs):
461493
"""Wrapper around single-cluster methods

0 commit comments

Comments
 (0)