Skip to content

Commit aaf86d6

Browse files
authored
Merge pull request #663 from minrk/fix-engine-id
Fix engine id, show %px output on error
2 parents 42be42d + a6f8e05 commit aaf86d6

File tree

5 files changed

+172
-34
lines changed

5 files changed

+172
-34
lines changed

ipyparallel/client/asyncresult.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,7 @@ def display_outputs(self, groupby="type", result_only=False):
990990
stderrs = self.stderr
991991
execute_results = self.execute_result
992992
output_lists = self.outputs
993-
results = self.get()
993+
results = self.get(return_exceptions=True)
994994

995995
targets = self.engine_id
996996

ipyparallel/client/magics.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,10 @@ def result(self, line=''):
330330
if self.last_result is None:
331331
raise UsageError(NO_LAST_RESULT)
332332

333+
if args.save_name:
334+
self.shell.user_ns[args.save_name] = self.last_result
335+
return
336+
333337
self.last_result.get()
334338
self.last_result.display_outputs(groupby=args.groupby)
335339

@@ -402,37 +406,26 @@ def parallel_execute(
402406
# wait for 'quick' results before showing progress
403407
tic = time.perf_counter()
404408
deadline = tic + progress_after
405-
try:
406-
result.get(timeout=progress_after)
407-
remaining = max(deadline - time.perf_counter(), 0)
408-
result.wait_for_output(timeout=remaining)
409-
except TimeoutError:
410-
pass
411-
except error.CompositeError as e:
412-
if stream_output:
413-
# already streamed, show an abbreviated result
414-
raise error.AlreadyDisplayedError(e) from None
415-
else:
416-
raise
417-
else:
418-
finished_waiting = True
409+
result.wait(timeout=progress_after)
410+
remaining = max(deadline - time.perf_counter(), 0)
411+
result.wait_for_output(timeout=remaining)
412+
finished_waiting = result.done()
419413

420414
if not finished_waiting:
421415
if progress_after >= 0:
422416
# not an immediate result, start interactive progress
423417
result.wait_interactive()
424-
result.wait_for_output()
425-
try:
426-
result.get()
427-
except error.CompositeError as e:
428-
if stream_output:
429-
# already streamed, show an abbreviated result
430-
raise error.AlreadyDisplayedError(e) from None
431-
else:
432-
raise
418+
result.wait_for_output(1)
419+
420+
try:
421+
result.get()
422+
except error.CompositeError as e:
423+
if stream_output:
424+
# already streamed, show an abbreviated result
425+
raise error.AlreadyDisplayedError(e) from None
426+
else:
427+
raise
433428
# Skip redisplay if streaming output
434-
if not stream_output:
435-
result.display_outputs(groupby)
436429
except KeyboardInterrupt:
437430
if signal_on_interrupt is not None:
438431
print(
@@ -444,6 +437,14 @@ def parallel_execute(
444437
)
445438
else:
446439
raise
440+
finally:
441+
# always redisplay outputs if not streaming,
442+
# on both success and error
443+
444+
if not stream_output:
445+
# wait for at most 1 second for output to be complete
446+
result.wait_for_output(1)
447+
result.display_outputs(groupby)
447448
else:
448449
# return AsyncResult only on non-blocking submission
449450
return result

ipyparallel/engine/kernel.py

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""IPython kernel for parallel computing"""
2+
import asyncio
3+
import inspect
24
import sys
5+
from functools import partial
36

7+
import ipykernel
48
from ipykernel.ipkernel import IPythonKernel
59
from traitlets import Integer
610
from traitlets import Type
@@ -15,6 +19,11 @@ class IPythonParallelKernel(IPythonKernel):
1519
"""Extend IPython kernel for parallel computing"""
1620

1721
engine_id = Integer(-1)
22+
23+
@property
24+
def int_id(self):
25+
return self.engine_id
26+
1827
msg_types = getattr(IPythonKernel, 'msg_types', []) + ['apply_request']
1928
control_msg_types = getattr(IPythonKernel, 'control_msg_types', []) + [
2029
'abort_request',
@@ -40,6 +49,47 @@ def __init__(self, **kwargs):
4049
data_pub.pub_socket = self.iopub_socket
4150
self.aborted = set()
4251

52+
def _abort_queues(self):
53+
# forward-port ipython/ipykernel#853
54+
# may remove after requiring ipykernel 6.9
55+
56+
# while this flag is true,
57+
# execute requests will be aborted
58+
self._aborting = True
59+
self.log.info("Aborting queue")
60+
61+
# Callback to signal that we are done aborting
62+
def stop_aborting():
63+
self.log.info("Finishing abort")
64+
self._aborting = False
65+
66+
# put stop_aborting on the message queue
67+
# so that it's handled after processing of already-pending messages
68+
if ipykernel.version_info < (6,):
69+
# 10 is SHELL priority in ipykernel 5.x
70+
streams = self.shell_streams
71+
schedule_stop_aborting = partial(self.schedule_dispatch, 10, stop_aborting)
72+
else:
73+
streams = [self.shell_stream]
74+
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)
75+
76+
# flush streams, so all currently waiting messages
77+
# are added to the queue
78+
for stream in streams:
79+
stream.flush()
80+
81+
# if we have a delay, give messages this long to arrive on the queue
82+
# before we start accepting requests
83+
asyncio.get_event_loop().call_later(
84+
self.stop_on_error_timeout, schedule_stop_aborting
85+
)
86+
87+
# for compatibility, return a completed Future
88+
# so this is still awaitable
89+
f = asyncio.Future()
90+
f.set_result(None)
91+
return f
92+
4393
def should_handle(self, stream, msg, idents):
4494
"""Check whether a shell-channel message should be handled
4595
@@ -78,13 +128,20 @@ def finish_metadata(self, parent, metadata, reply_content):
78128
if reply_content['status'] == 'error':
79129
if reply_content['ename'] == 'UnmetDependency':
80130
metadata['dependencies_met'] = False
81-
metadata['engine_info'] = dict(
82-
engine_uuid=self.ident,
83-
engine_id=self.engine_id,
84-
)
131+
metadata['engine_info'] = self.get_engine_info()
85132

86133
return metadata
87134

135+
def get_engine_info(self, method=None):
136+
"""Return engine_info dict"""
137+
engine_info = dict(
138+
engine_uuid=self.ident,
139+
engine_id=self.engine_id,
140+
)
141+
if method:
142+
engine_info["method"] = method
143+
return engine_info
144+
88145
def apply_request(self, stream, ident, parent):
89146
try:
90147
content = parent['content']
@@ -168,8 +225,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
168225
else:
169226
self.log.warning("Didn't find a traceback where I expected to")
170227
shell._last_traceback = None
171-
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
172-
reply_content['engine_info'] = e_info
228+
reply_content["engine_info"] = self.get_engine_info(method="apply")
173229

174230
self.log.info(
175231
"Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])
@@ -181,6 +237,26 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
181237

182238
return reply_content, result_buf
183239

240+
async def _do_execute_async(self, *args, **kwargs):
241+
super_execute = super().do_execute(*args, **kwargs)
242+
if inspect.isawaitable(super_execute):
243+
reply_content = await super_execute
244+
else:
245+
reply_content = super_execute
246+
# add engine info
247+
if reply_content['status'] == 'error':
248+
reply_content["engine_info"] = self.get_engine_info(method="execute")
249+
return reply_content
250+
251+
def do_execute(self, *args, **kwargs):
252+
coro = self._do_execute_async(*args, **kwargs)
253+
if ipykernel.version_info < (6,):
254+
# ipykernel 5 uses gen.maybe_future which doesn't accept async def coroutines,
255+
# but it does accept asyncio.Futures
256+
return asyncio.ensure_future(coro)
257+
else:
258+
return coro
259+
184260
# Control messages for msgspec extensions:
185261

186262
def abort_request(self, stream, ident, parent):
@@ -189,7 +265,9 @@ def abort_request(self, stream, ident, parent):
189265
if isinstance(msg_ids, str):
190266
msg_ids = [msg_ids]
191267
if not msg_ids:
192-
self._abort_queues()
268+
f = self._abort_queues()
269+
if inspect.isawaitable(f):
270+
asyncio.ensure_future(f)
193271
for mid in msg_ids:
194272
self.aborted.add(str(mid))
195273

ipyparallel/tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def teardown():
124124
try:
125125
f = p.stop()
126126
if f:
127-
asyncio.run(f)
127+
asyncio.get_event_loop().run_until_complete(f)
128128
except Exception as e:
129129
print(e)
130130
pass

ipyparallel/tests/test_asyncresult.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
from datetime import datetime
77

8+
import pytest
89
from IPython.utils.io import capture_output
910

1011
import ipyparallel as ipp
@@ -287,6 +288,20 @@ def test_display_empty_streams_engine(self):
287288
self.assertEqual(io.stderr, '')
288289
self.assertEqual(io.stdout, '')
289290

291+
def test_display_output_error(self):
292+
"""display_outputs shows output on error"""
293+
self.minimum_engines(1)
294+
295+
v = self.client[-1]
296+
ar = v.execute("print (5555)\n1/0")
297+
ar.get(5, return_exceptions=True)
298+
ar.wait_for_output(5)
299+
with capture_output() as io:
300+
ar.display_outputs()
301+
self.assertEqual(io.stderr, '')
302+
self.assertEqual('5555\n', io.stdout)
303+
assert 'ZeroDivisionError' not in io.stdout
304+
290305
def test_await_data(self):
291306
"""asking for ar.data flushes outputs"""
292307
self.minimum_engines(1)
@@ -488,3 +503,47 @@ def test_progress(self):
488503
assert amr.progress == 0
489504
amr.wait_interactive()
490505
assert amr.progress == len(amr)
506+
507+
def test_error_engine_info_apply(self):
508+
dv = self.client[:]
509+
targets = self.client.ids
510+
ar = dv.apply_async(lambda: 1 / 0)
511+
try:
512+
ar.get()
513+
except Exception as e:
514+
exc = e
515+
else:
516+
pytest.fail("Should have raised remote ZeroDivisionError")
517+
assert isinstance(exc, ipp.error.CompositeError)
518+
expected_engine_info = [
519+
{
520+
"engine_id": engine_id,
521+
"engine_uuid": self.client._engines[engine_id],
522+
"method": "apply",
523+
}
524+
for engine_id in self.client.ids
525+
]
526+
engine_infos = [e[-1] for e in exc.elist]
527+
assert engine_infos == expected_engine_info
528+
529+
def test_error_engine_info_execute(self):
530+
dv = self.client[:]
531+
targets = self.client.ids
532+
ar = dv.execute("1 / 0", block=False)
533+
try:
534+
ar.get()
535+
except Exception as e:
536+
exc = e
537+
else:
538+
pytest.fail("Should have raised remote ZeroDivisionError")
539+
assert isinstance(exc, ipp.error.CompositeError)
540+
expected_engine_info = [
541+
{
542+
"engine_id": engine_id,
543+
"engine_uuid": self.client._engines[engine_id],
544+
"method": "execute",
545+
}
546+
for engine_id in self.client.ids
547+
]
548+
engine_infos = [e[-1] for e in exc.elist]
549+
assert engine_infos == expected_engine_info

0 commit comments

Comments
 (0)