Skip to content

Commit 94fed0d

Browse files
committed
add KernelNanny for remote signaling
Nanny proxies control channel - add Client.send_signal for signaling engines - notify Hub immediately on process exit instead of waiting for heart failure - exit Engine on any error during completion of registration - consolidate `_send_control_request` for control-channel requests - fix some mixed str/bytes uuid/identity logic in the Hub
1 parent f3777c6 commit 94fed0d

File tree

9 files changed

+413
-64
lines changed

9 files changed

+413
-64
lines changed

docs/source/tutorial/direct.rst

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,16 +266,16 @@ and blocks until all of the associated results are ready:
266266

267267
.. sourcecode:: ipython
268268

269-
In [72]: dview.block=False
269+
In [72]: dview.block = False
270270

271271
# A trivial list of AsyncResults objects
272-
In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
272+
In [73]: ar_list = [dview.apply_async(wait, 3) for i in range(10)]
273273

274274
# Wait until all of them are done
275-
In [74]: dview.wait(pr_list)
275+
In [74]: dview.wait(ar_list)
276276

277277
# Then, their results are ready using get()
278-
In [75]: pr_list[0].get()
278+
In [75]: ar_list[0].get()
279279
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
280280

281281

@@ -297,7 +297,7 @@ The following examples demonstrate how to use the instance attributes:
297297

298298
.. sourcecode:: ipython
299299

300-
In [16]: dview.targets = [0,2]
300+
In [16]: dview.targets = [0, 2]
301301

302302
In [17]: dview.block = False
303303

@@ -406,6 +406,56 @@ between engines, MPI, pyzmq, or some other direct interconnect should be used.
406406
Other things to look at
407407
=======================
408408

409+
Signaling engines
410+
-----------------
411+
412+
New in IPython Parallel 7.0 is the :meth:`Client.send_signal` method.
413+
This lets you directly interrupt engines, which might be running a blocking task
414+
that you want to cancel.
415+
416+
This is also available via the Cluster API.
417+
Unlike the Cluster API, though,
418+
which only allows interrupting whole engine 'sets' (usally all engines in the cluster),
419+
the client API allows interrupting individual engines.
420+
421+
.. sourcecode:: ipython
422+
423+
In [9]: ar = rc[:].apply_async(time.sleep, 5)
424+
425+
In [10]: rc.send_signal(signal.SIGINT)
426+
Out[10]: <Future at 0x7f91a9489fd0 state=pending>
427+
428+
In [11]: ar.get()
429+
[12:apply]:
430+
---------------------------------------------------------------------------
431+
KeyboardInterrupt Traceback (most recent call last)
432+
<string> in <module>
433+
434+
KeyboardInterrupt:
435+
436+
[13:apply]:
437+
---------------------------------------------------------------------------
438+
KeyboardInterrupt Traceback (most recent call last)
439+
<string> in <module>
440+
441+
KeyboardInterrupt:
442+
443+
[14:apply]:
444+
---------------------------------------------------------------------------
445+
KeyboardInterrupt Traceback (most recent call last)
446+
<string> in <module>
447+
448+
KeyboardInterrupt:
449+
450+
[15:apply]:
451+
---------------------------------------------------------------------------
452+
KeyboardInterrupt Traceback (most recent call last)
453+
<string> in <module>
454+
455+
KeyboardInterrupt:
456+
457+
458+
409459
Remote function decorators
410460
--------------------------
411461

ipyparallel/client/client.py

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,14 +1494,13 @@ def wait_interactive(self, jobs=None, interval=1.0, timeout=-1.0):
14941494
# Control methods
14951495
# --------------------------------------------------------------------------
14961496

1497-
def clear(self, targets=None, block=None):
1498-
"""Clear the namespace in target(s)."""
1499-
block = self.block if block is None else block
1500-
targets = self._build_targets(targets)[0]
1497+
def _send_control_request(self, targets, msg_type, content, block):
1498+
"""Send a request on the control channel"""
1499+
target_identities = self._build_targets(targets)[0]
15011500
futures = []
1502-
for t in targets:
1501+
for ident in target_identities:
15031502
futures.append(
1504-
self._send(self._control_stream, 'clear_request', content={}, ident=t)
1503+
self._send(self._control_stream, msg_type, content=content, ident=ident)
15051504
)
15061505
if not block:
15071506
return multi_future(futures)
@@ -1511,6 +1510,34 @@ def clear(self, targets=None, block=None):
15111510
if msg['content']['status'] != 'ok':
15121511
raise self._unwrap_exception(msg['content'])
15131512

1513+
def send_signal(self, sig, targets=None, block=None):
1514+
"""Send a signal target(s).
1515+
1516+
Parameters
1517+
----------
1518+
1519+
sig: int or str
1520+
The signal number or name to send.
1521+
If a str, will evaluate to getattr(signal, sig) on the engine,
1522+
which is useful for sending signals cross-platform.
1523+
1524+
.. versionadded:: 7.0
1525+
"""
1526+
block = self.block if block is None else block
1527+
return self._send_control_request(
1528+
targets=targets,
1529+
msg_type='signal_request',
1530+
content={'sig': sig},
1531+
block=block,
1532+
)
1533+
1534+
def clear(self, targets=None, block=None):
1535+
"""Clear the namespace in target(s)."""
1536+
block = self.block if block is None else block
1537+
return self._send_control_request(
1538+
targets=targets, msg_type='clear_request', content={}, block=block
1539+
)
1540+
15141541
def abort(self, jobs=None, targets=None, block=None):
15151542
"""Abort specific jobs from the execution queues of target(s).
15161543
@@ -1531,7 +1558,6 @@ def abort(self, jobs=None, targets=None, block=None):
15311558
"""
15321559
block = self.block if block is None else block
15331560
jobs = jobs if jobs is not None else list(self.outstanding)
1534-
targets = self._build_targets(targets)[0]
15351561

15361562
msg_ids = []
15371563
if isinstance(jobs, string_types + (AsyncResult,)):
@@ -1549,22 +1575,13 @@ def abort(self, jobs=None, targets=None, block=None):
15491575
else:
15501576
msg_ids.append(j)
15511577
content = dict(msg_ids=msg_ids)
1552-
futures = []
1553-
for t in targets:
1554-
futures.append(
1555-
self._send(
1556-
self._control_stream, 'abort_request', content=content, ident=t
1557-
)
1558-
)
15591578

1560-
if not block:
1561-
return multi_future(futures)
1562-
else:
1563-
for f in futures:
1564-
f.wait()
1565-
msg = f.result()
1566-
if msg['content']['status'] != 'ok':
1567-
raise self._unwrap_exception(msg['content'])
1579+
return self._send_control_request(
1580+
targets,
1581+
msg_type='abort_request',
1582+
content=content,
1583+
block=block,
1584+
)
15681585

15691586
def shutdown(self, targets='all', restart=False, hub=False, block=None):
15701587
"""Terminates one or more engine processes, optionally including the hub.

ipyparallel/controller/hub.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from tornado.gen import coroutine
2424
from tornado.gen import maybe_future
2525
from traitlets import Any
26+
from traitlets import Bytes
2627
from traitlets import Dict
2728
from traitlets import HasTraits
2829
from traitlets import Instance
@@ -122,8 +123,9 @@ class EngineConnector(HasTraits):
122123
stallback: tornado timeout for stalled registration
123124
"""
124125

125-
id = Integer(0)
126+
id = Integer()
126127
uuid = Unicode()
128+
ident = Bytes()
127129
pending = Set()
128130
stallback = Any()
129131

@@ -152,17 +154,15 @@ class Hub(LoggingConfigurable):
152154

153155
# internal data structures:
154156
ids = Set() # engine IDs
155-
keytable = Dict()
156-
by_ident = Dict()
157-
engines = Dict()
158-
clients = Dict()
159-
hearts = Dict()
157+
by_ident = Dict() # map bytes identities : int engine id
158+
engines = Dict() # map int engine id : EngineConnector
159+
hearts = Dict() # map bytes identities : int engine id, only for active heartbeats
160160
pending = Set()
161161
queues = Dict() # pending msg_ids keyed by engine_id
162162
tasks = Dict() # pending msg_ids submitted as tasks, keyed by client_id
163163
completed = Dict() # completed msg_ids keyed by engine_id
164164
all_completed = Set() # completed msg_ids keyed by engine_id
165-
unassigned = Set() # set of task msg_ds not yet assigned a destination
165+
unassigned = Set() # set of task msg_ids not yet assigned a destination
166166
incoming_registrations = Dict()
167167
registration_timeout = Integer()
168168
_idcounter = Integer(0)
@@ -830,8 +830,8 @@ def connection_request(self, client_id, msg):
830830
self.log.info("client::client %r connected", client_id)
831831
content = dict(status='ok')
832832
jsonable = {}
833-
for k, v in iteritems(self.keytable):
834-
jsonable[str(k)] = v
833+
for eid, ec in self.engines.items():
834+
jsonable[str(eid)] = ec.uuid
835835
content['engines'] = jsonable
836836
self.session.send(
837837
self.query, 'connection_reply', content, parent=msg, ident=client_id
@@ -889,7 +889,9 @@ def register_engine(self, reg, msg):
889889
if content['status'] == 'ok':
890890
if heart in self.heartmonitor.hearts:
891891
# already beating
892-
self.incoming_registrations[heart] = EngineConnector(id=eid, uuid=uuid)
892+
self.incoming_registrations[heart] = EngineConnector(
893+
id=eid, uuid=uuid, ident=heart
894+
)
893895
self.finish_registration(heart)
894896
else:
895897
purge = lambda: self._purge_stalled_registration(heart)
@@ -898,7 +900,7 @@ def register_engine(self, reg, msg):
898900
purge,
899901
)
900902
self.incoming_registrations[heart] = EngineConnector(
901-
id=eid, uuid=uuid, stallback=t
903+
id=eid, uuid=uuid, ident=heart, stallback=t
902904
)
903905
else:
904906
self.log.error(
@@ -920,23 +922,22 @@ def unregister_engine(self, ident, msg):
920922
return
921923
self.log.info("registration::unregister_engine(%r)", eid)
922924

923-
uuid = self.keytable[eid]
924-
content = dict(id=eid, uuid=uuid)
925+
ec = self.engines[eid]
926+
content = dict(id=eid, uuid=ec.uuid)
925927

926928
# stop the heartbeats
927-
self.hearts.pop(uuid, None)
928-
self.heartmonitor.responses.discard(uuid)
929-
self.heartmonitor.hearts.discard(uuid)
929+
self.hearts.pop(ec.ident, None)
930+
self.heartmonitor.responses.discard(ec.ident)
931+
self.heartmonitor.hearts.discard(ec.ident)
930932

931933
self.loop.add_timeout(
932934
self.loop.time() + self.registration_timeout,
933-
lambda: self._handle_stranded_msgs(eid, uuid),
935+
lambda: self._handle_stranded_msgs(eid, ec.uuid),
934936
)
935937

936938
# cleanup mappings
937-
self.by_ident.pop(uuid, None)
939+
self.by_ident.pop(ec.ident, None)
938940
self.engines.pop(eid, None)
939-
self.keytable.pop(eid, None)
940941

941942
self._save_engine_state()
942943

@@ -993,14 +994,13 @@ def finish_registration(self, heart):
993994
self.loop.remove_timeout(ec.stallback)
994995
eid = ec.id
995996
self.ids.add(eid)
996-
self.keytable[eid] = ec.uuid
997997
self.engines[eid] = ec
998-
self.by_ident[cast_bytes(ec.uuid)] = ec.id
998+
self.by_ident[ec.ident] = ec.id
999999
self.queues[eid] = list()
10001000
self.tasks[eid] = list()
10011001
self.completed[eid] = list()
10021002
self.hearts[heart] = eid
1003-
content = dict(id=eid, uuid=self.engines[eid].uuid)
1003+
content = dict(id=eid, uuid=ec.uuid)
10041004
if self.notifier:
10051005
self.session.send(
10061006
self.notifier, "registration_notification", content=content
@@ -1067,7 +1067,9 @@ def _load_engine_state(self):
10671067
self.heartmonitor.responses.add(heart)
10681068
self.heartmonitor.hearts.add(heart)
10691069

1070-
self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1070+
self.incoming_registrations[heart] = EngineConnector(
1071+
id=int(eid), uuid=uuid, ident=heart
1072+
)
10711073
self.finish_registration(heart)
10721074

10731075
self.notifier = save_notifier

0 commit comments

Comments
 (0)