Skip to content

Commit 7d9f3d1

Browse files
authored
Merge pull request #514 from minrk/signal-engines
add KernelNanny for remote signaling
2 parents e719136 + 5762231 commit 7d9f3d1

File tree

14 files changed

+582
-189
lines changed

14 files changed

+582
-189
lines changed

docs/source/tutorial/direct.rst

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,16 +266,16 @@ and blocks until all of the associated results are ready:
266266

267267
.. sourcecode:: ipython
268268

269-
In [72]: dview.block=False
269+
In [72]: dview.block = False
270270

271271
# A trivial list of AsyncResults objects
272-
In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
272+
In [73]: ar_list = [dview.apply_async(wait, 3) for i in range(10)]
273273

274274
# Wait until all of them are done
275-
In [74]: dview.wait(pr_list)
275+
In [74]: dview.wait(ar_list)
276276

277277
# Then, their results are ready using get()
278-
In [75]: pr_list[0].get()
278+
In [75]: ar_list[0].get()
279279
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
280280

281281

@@ -297,7 +297,7 @@ The following examples demonstrate how to use the instance attributes:
297297

298298
.. sourcecode:: ipython
299299

300-
In [16]: dview.targets = [0,2]
300+
In [16]: dview.targets = [0, 2]
301301

302302
In [17]: dview.block = False
303303

@@ -406,6 +406,56 @@ between engines, MPI, pyzmq, or some other direct interconnect should be used.
406406
Other things to look at
407407
=======================
408408

409+
Signaling engines
410+
-----------------
411+
412+
New in IPython Parallel 7.0 is the :meth:`Client.send_signal` method.
413+
This lets you directly interrupt engines, which might be running a blocking task
414+
that you want to cancel.
415+
416+
This is also available via the Cluster API.
417+
Unlike the Cluster API, though,
418+
which only allows interrupting whole engine 'sets' (usally all engines in the cluster),
419+
the client API allows interrupting individual engines.
420+
421+
.. sourcecode:: ipython
422+
423+
In [9]: ar = rc[:].apply_async(time.sleep, 5)
424+
425+
In [10]: rc.send_signal(signal.SIGINT)
426+
Out[10]: <Future at 0x7f91a9489fd0 state=pending>
427+
428+
In [11]: ar.get()
429+
[12:apply]:
430+
---------------------------------------------------------------------------
431+
KeyboardInterrupt Traceback (most recent call last)
432+
<string> in <module>
433+
434+
KeyboardInterrupt:
435+
436+
[13:apply]:
437+
---------------------------------------------------------------------------
438+
KeyboardInterrupt Traceback (most recent call last)
439+
<string> in <module>
440+
441+
KeyboardInterrupt:
442+
443+
[14:apply]:
444+
---------------------------------------------------------------------------
445+
KeyboardInterrupt Traceback (most recent call last)
446+
<string> in <module>
447+
448+
KeyboardInterrupt:
449+
450+
[15:apply]:
451+
---------------------------------------------------------------------------
452+
KeyboardInterrupt Traceback (most recent call last)
453+
<string> in <module>
454+
455+
KeyboardInterrupt:
456+
457+
458+
409459
Remote function decorators
410460
--------------------------
411461

ipyparallel/apps/baseapp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ def reinit_logging(self):
202202
# Start logging to the new log file
203203
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
204204
logfile = os.path.join(log_dir, log_filename)
205+
if sys.__stderr__:
206+
print(f"Sending logs to {logfile}", file=sys.__stderr__)
205207
open_log_file = open(logfile, 'w')
206208
else:
207209
open_log_file = None

ipyparallel/client/client.py

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,14 +1494,13 @@ def wait_interactive(self, jobs=None, interval=1.0, timeout=-1.0):
14941494
# Control methods
14951495
# --------------------------------------------------------------------------
14961496

1497-
def clear(self, targets=None, block=None):
1498-
"""Clear the namespace in target(s)."""
1499-
block = self.block if block is None else block
1500-
targets = self._build_targets(targets)[0]
1497+
def _send_control_request(self, targets, msg_type, content, block):
1498+
"""Send a request on the control channel"""
1499+
target_identities = self._build_targets(targets)[0]
15011500
futures = []
1502-
for t in targets:
1501+
for ident in target_identities:
15031502
futures.append(
1504-
self._send(self._control_stream, 'clear_request', content={}, ident=t)
1503+
self._send(self._control_stream, msg_type, content=content, ident=ident)
15051504
)
15061505
if not block:
15071506
return multi_future(futures)
@@ -1511,6 +1510,34 @@ def clear(self, targets=None, block=None):
15111510
if msg['content']['status'] != 'ok':
15121511
raise self._unwrap_exception(msg['content'])
15131512

1513+
def send_signal(self, sig, targets=None, block=None):
1514+
"""Send a signal target(s).
1515+
1516+
Parameters
1517+
----------
1518+
1519+
sig: int or str
1520+
The signal number or name to send.
1521+
If a str, will evaluate to getattr(signal, sig) on the engine,
1522+
which is useful for sending signals cross-platform.
1523+
1524+
.. versionadded:: 7.0
1525+
"""
1526+
block = self.block if block is None else block
1527+
return self._send_control_request(
1528+
targets=targets,
1529+
msg_type='signal_request',
1530+
content={'sig': sig},
1531+
block=block,
1532+
)
1533+
1534+
def clear(self, targets=None, block=None):
1535+
"""Clear the namespace in target(s)."""
1536+
block = self.block if block is None else block
1537+
return self._send_control_request(
1538+
targets=targets, msg_type='clear_request', content={}, block=block
1539+
)
1540+
15141541
def abort(self, jobs=None, targets=None, block=None):
15151542
"""Abort specific jobs from the execution queues of target(s).
15161543
@@ -1531,7 +1558,6 @@ def abort(self, jobs=None, targets=None, block=None):
15311558
"""
15321559
block = self.block if block is None else block
15331560
jobs = jobs if jobs is not None else list(self.outstanding)
1534-
targets = self._build_targets(targets)[0]
15351561

15361562
msg_ids = []
15371563
if isinstance(jobs, string_types + (AsyncResult,)):
@@ -1549,22 +1575,13 @@ def abort(self, jobs=None, targets=None, block=None):
15491575
else:
15501576
msg_ids.append(j)
15511577
content = dict(msg_ids=msg_ids)
1552-
futures = []
1553-
for t in targets:
1554-
futures.append(
1555-
self._send(
1556-
self._control_stream, 'abort_request', content=content, ident=t
1557-
)
1558-
)
15591578

1560-
if not block:
1561-
return multi_future(futures)
1562-
else:
1563-
for f in futures:
1564-
f.wait()
1565-
msg = f.result()
1566-
if msg['content']['status'] != 'ok':
1567-
raise self._unwrap_exception(msg['content'])
1579+
return self._send_control_request(
1580+
targets,
1581+
msg_type='abort_request',
1582+
content=content,
1583+
block=block,
1584+
)
15681585

15691586
def shutdown(self, targets='all', restart=False, hub=False, block=None):
15701587
"""Terminates one or more engine processes, optionally including the hub.

ipyparallel/cluster/_win32support.py

Lines changed: 0 additions & 72 deletions
This file was deleted.

ipyparallel/cluster/app.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,15 +250,14 @@ def _classes_default(self):
250250
daemonize = Bool(
251251
False,
252252
config=True,
253-
help="""Daemonize the ipcluster program. This implies --log-to-file.
253+
help="""Launch the cluster and immediately exit.
254+
255+
.. versionchanged:: 7.0
256+
No longer leaves the ipcluster process itself running.
257+
Prior to 7.0, --daemonize did not work on Windows.
254258
""",
255259
)
256260

257-
@observe('daemonize')
258-
def _daemonize_changed(self, change):
259-
if change['new']:
260-
self.log_to_file = True
261-
262261
early_shutdown = Integer(
263262
30,
264263
config=True,
@@ -382,6 +381,9 @@ def engines_stopped_early(self, stop_data):
382381
a public interface.
383382
"""
384383
)
384+
engine_output = self.engine_launcher.get_output(remove=True)
385+
if engine_output:
386+
self.log.error(f"Engine output:\n{engine_output}")
385387
self.loop.add_callback(self.stop_cluster)
386388

387389
return self.engines_stopped(stop_data)
@@ -471,7 +473,9 @@ def engines_stopped(self, r):
471473
async def start_cluster(self):
472474
await self.cluster.start_cluster()
473475
if self.daemonize:
474-
self.log.info(f"Leaving cluster running: {self.cluster.cluster_file}")
476+
print(
477+
f"Leaving cluster running: {self.cluster.cluster_file}", file=sys.stderr
478+
)
475479
self.loop.add_callback(self.loop.stop)
476480
self.cluster._controller.on_stop(self.controller_stopped)
477481
self.watch_engines()

ipyparallel/cluster/cluster.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ def from_dict(cls, d, **kwargs):
357357
cls = self.engine_launcher_class = import_item(engine_info["class"])
358358
for engine_set_id, engine_state in engine_info.get("sets", {}).items():
359359
self._engine_sets[engine_set_id] = cls.from_dict(
360-
engine_state, parent=self
360+
engine_state,
361+
engine_set_id=engine_set_id,
362+
parent=self,
361363
)
362364

363365
return self
@@ -503,6 +505,7 @@ async def start_engines(self, n=None, engine_set_id=None, **kwargs):
503505
log=self.log,
504506
profile_dir=self.profile_dir,
505507
cluster_id=self.cluster_id,
508+
engine_set_id=engine_set_id,
506509
**kwargs,
507510
)
508511
if n is None:
@@ -547,6 +550,8 @@ async def stop_engines(self, engine_set_id=None):
547550
r = engine_set.stop()
548551
if inspect.isawaitable(r):
549552
await r
553+
# retrieve and cleanup output files
554+
engine_set.get_output(remove=True)
550555
self._engine_sets.pop(engine_set_id)
551556
self.update_cluster_file()
552557

@@ -608,6 +613,9 @@ async def stop_controller(self):
608613
if inspect.isawaitable(r):
609614
await r
610615

616+
if self._controller:
617+
self._controller.get_output(remove=True)
618+
611619
self._controller = None
612620
self.update_cluster_file()
613621

0 commit comments

Comments
 (0)