Skip to content

Commit acb3081

Browse files
committed
add activate arg to start_and_connect
creates a blocking DirectView on all engines, registering `%px` magics and friends `wait_for_engines` can also derive `n` from parent cluster, if available
1 parent 1e884de commit acb3081

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

ipyparallel/client/client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1455,7 +1455,7 @@ def _futures_for_msgs(self, msg_ids):
14551455
return futures
14561456

14571457
def wait_for_engines(
1458-
self, n, *, timeout=-1, block=True, interactive=None, widget=None
1458+
self, n=None, *, timeout=-1, block=True, interactive=None, widget=None
14591459
):
14601460
"""Wait for `n` engines to become available.
14611461
@@ -1490,6 +1490,18 @@ def wait_for_engines(
14901490
------
14911491
TimeoutError : if timeout is reached.
14921492
"""
1493+
if n is None:
1494+
# get n from cluster, if not specified
1495+
if self.cluster is None:
1496+
raise TypeError("n engines to wait for must be specified")
1497+
1498+
if self.cluster.n:
1499+
n = self.cluster.n
1500+
else:
1501+
# compute n from engine sets,
1502+
# e.g. the default where n is calculated at runtime from `cpu_count()`
1503+
n = sum(engine_set.n for engine_set in self.cluster.engines.values())
1504+
14931505
if len(self.ids) >= n:
14941506
if block:
14951507
return

ipyparallel/cluster/cluster.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -711,25 +711,49 @@ def _engines_stopped(self, engine_set_id, stop_data=None):
711711
log(f"engine set stopped {engine_set_id}: {stop_data}")
712712
self.update_cluster_file()
713713

714-
async def start_and_connect(self, n=None):
714+
async def start_and_connect(self, n=None, activate=False):
715715
"""Single call to start a cluster and connect a client
716716
717+
If `activate` is given, a blocking DirectView on all engines will be created
718+
and activated, registering `%px` magics for use in IPython
719+
720+
Example::
721+
722+
rc = await Cluster(engines="mpi").start_and_connect(n=8, activate=True)
723+
724+
%px print("hello, world!")
725+
717726
Equivalent to::
718727
719728
await self.start_cluster(n)
720729
client = await self.connect_client()
721730
await client.wait_for_engines(n, block=False)
722731
723-
.. versionadded: 7.1
732+
.. versionadded:: 7.1
733+
734+
.. versionadded:: 8.1
735+
736+
activate argument.
724737
"""
725738
if n is None:
726739
n = self.n
727740
await self.start_cluster(n=n)
728741
client = await self.connect_client()
742+
743+
if n is None:
744+
# number of engines to wait for
745+
# if not specified, derive current value from EngineSets
746+
n = sum(engine_set.n for engine_set in self.engines.values())
747+
729748
if n:
730749
await asyncio.wrap_future(
731750
client.wait_for_engines(n, block=False, timeout=self.engine_timeout)
732751
)
752+
753+
if activate:
754+
view = client[:]
755+
view.block = True
756+
view.activate()
733757
return client
734758

735759
async def start_cluster(self, n=None):

ipyparallel/tests/test_cluster.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,17 @@ async def test_wait_for_engines_crash(Cluster):
359359
rc = c.connect_client_sync()
360360
with pytest.raises(ipp.error.EngineError):
361361
rc.wait_for_engines(3, timeout=20)
362+
363+
364+
@pytest.mark.parametrize("activate", (True, False))
365+
def test_start_and_connect_activate(ipython, Cluster, activate):
366+
rc = Cluster(n=2, log_level=10).start_and_connect_sync(activate=activate)
367+
with rc:
368+
if activate:
369+
assert "px" in ipython.magics_manager.magics["cell"]
370+
px = ipython.magics_manager.magics["cell"]["px"]
371+
assert px.__self__.view.client is rc
372+
else:
373+
if "px" in ipython.magics_manager.magics["cell"]:
374+
px = ipython.magics_manager.magics["cell"]["px"]
375+
assert px.__self__.view.client is not rc

0 commit comments

Comments
 (0)