1
1
"""IPython kernel for parallel computing"""
2
+ import inspect
2
3
import sys
3
4
4
5
from ipykernel .ipkernel import IPythonKernel
@@ -15,6 +16,11 @@ class IPythonParallelKernel(IPythonKernel):
15
16
"""Extend IPython kernel for parallel computing"""
16
17
17
18
engine_id = Integer (- 1 )
19
+
20
+ @property
21
+ def int_id (self ):
22
+ return self .engine_id
23
+
18
24
msg_types = getattr (IPythonKernel , 'msg_types' , []) + ['apply_request' ]
19
25
control_msg_types = getattr (IPythonKernel , 'control_msg_types' , []) + [
20
26
'abort_request' ,
@@ -78,13 +84,20 @@ def finish_metadata(self, parent, metadata, reply_content):
78
84
if reply_content ['status' ] == 'error' :
79
85
if reply_content ['ename' ] == 'UnmetDependency' :
80
86
metadata ['dependencies_met' ] = False
81
- metadata ['engine_info' ] = dict (
82
- engine_uuid = self .ident ,
83
- engine_id = self .engine_id ,
84
- )
87
+ metadata ['engine_info' ] = self .get_engine_info ()
85
88
86
89
return metadata
87
90
91
+ def get_engine_info (self , method = None ):
92
+ """Return engine_info dict"""
93
+ engine_info = dict (
94
+ engine_uuid = self .ident ,
95
+ engine_id = self .engine_id ,
96
+ )
97
+ if method :
98
+ engine_info ["method" ] = method
99
+ return engine_info
100
+
88
101
def apply_request (self , stream , ident , parent ):
89
102
try :
90
103
content = parent ['content' ]
@@ -168,8 +181,7 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
168
181
else :
169
182
self .log .warning ("Didn't find a traceback where I expected to" )
170
183
shell ._last_traceback = None
171
- e_info = dict (engine_uuid = self .ident , engine_id = self .int_id , method = 'apply' )
172
- reply_content ['engine_info' ] = e_info
184
+ reply_content ["engine_info" ] = self .get_engine_info (method = "apply" )
173
185
174
186
self .log .info (
175
187
"Exception in apply request:\n %s" , '\n ' .join (reply_content ['traceback' ])
@@ -181,6 +193,14 @@ def do_apply(self, content, bufs, msg_id, reply_metadata):
181
193
182
194
return reply_content , result_buf
183
195
196
+ async def do_execute (self , * args , ** kwargs ):
197
+ reply_content = super ().do_execute (* args , ** kwargs )
198
+ if inspect .isawaitable (reply_content ):
199
+ reply_content = await reply_content
200
+ if reply_content ['status' ] == 'error' :
201
+ reply_content ["engine_info" ] = self .get_engine_info (method = "execute" )
202
+ return reply_content
203
+
184
204
# Control messages for msgspec extensions:
185
205
186
206
def abort_request (self , stream , ident , parent ):
0 commit comments