48
48
Instance ,
49
49
Integer ,
50
50
List ,
51
- Set ,
52
51
Unicode ,
53
52
default ,
54
53
)
@@ -199,9 +198,6 @@ def _parent_header(self):
199
198
# by record_ports and used by connect_request.
200
199
_recorded_ports = Dict ()
201
200
202
- # set of aborted msg_ids
203
- aborted = Set ()
204
-
205
201
# Track execution count here. For IPython, we override this to use the
206
202
# execution count we store in the shell.
207
203
execution_count = 0
@@ -217,14 +213,10 @@ def _parent_header(self):
217
213
"shutdown_request" ,
218
214
"is_complete_request" ,
219
215
"interrupt_request" ,
220
- # deprecated:
221
- "apply_request" ,
222
216
]
223
217
# add deprecated ipyparallel control messages
224
218
control_msg_types = [
225
219
* msg_types ,
226
- "clear_request" ,
227
- "abort_request" ,
228
220
"debug_request" ,
229
221
"usage_request" ,
230
222
"create_subshell_request" ,
@@ -308,17 +300,15 @@ async def process_control_message(self, msg=None):
308
300
sys .stderr .flush ()
309
301
self ._publish_status ("idle" , "control" )
310
302
311
- async def should_handle (self , stream , msg , idents ):
303
+ def should_handle (self , stream , msg , idents ):
312
304
"""Check whether a shell-channel message should be handled
313
305
314
306
Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
307
+
308
+ .. versionchanged:: 7
309
+ Subclass should_handle _may_ be async.
310
+ Base class implementation is not async.
315
311
"""
316
- msg_id = msg ["header" ]["msg_id" ]
317
- if msg_id in self .aborted :
318
- # is it safe to assume a msg_id will not be resubmitted?
319
- self .aborted .remove (msg_id )
320
- await self ._send_abort_reply (stream , msg , idents )
321
- return False
322
312
return True
323
313
324
314
async def enter_eventloop (self ):
@@ -483,7 +473,11 @@ async def process_shell_message(self, msg=None, socket=None):
483
473
self .log .debug ("\n *** MESSAGE TYPE:%s***" , msg_type )
484
474
self .log .debug (" Content: %s\n --->\n " , msg ["content" ])
485
475
486
- if not await self .should_handle (socket , msg , idents ):
476
+ should_handle : bool | t .Awaitable [bool ] = self .should_handle (socket , msg , idents )
477
+ if inspect .isawaitable (should_handle ):
478
+ should_handle = await should_handle
479
+ if not should_handle :
480
+ self .log .debug ("Not handling %s:%s" , msg_type , msg ["header" ].get ("msg_id" ))
487
481
return
488
482
489
483
handler = self .shell_handlers .get (msg_type )
@@ -1126,84 +1120,6 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
1126
1120
1127
1121
self .session .send (socket , "list_subshell_reply" , reply , parent , ident )
1128
1122
1129
- # ---------------------------------------------------------------------------
1130
- # Engine methods (DEPRECATED)
1131
- # ---------------------------------------------------------------------------
1132
-
1133
- async def apply_request (self , socket , ident , parent ): # pragma: no cover
1134
- """Handle an apply request."""
1135
- self .log .warning ("apply_request is deprecated in kernel_base, moving to ipyparallel." )
1136
- try :
1137
- content = parent ["content" ]
1138
- bufs = parent ["buffers" ]
1139
- msg_id = parent ["header" ]["msg_id" ]
1140
- except Exception :
1141
- self .log .error ("Got bad msg: %s" , parent , exc_info = True ) # noqa: G201
1142
- return
1143
-
1144
- md = self .init_metadata (parent )
1145
-
1146
- reply_content , result_buf = self .do_apply (content , bufs , msg_id , md )
1147
-
1148
- # flush i/o
1149
- if sys .stdout is not None :
1150
- sys .stdout .flush ()
1151
- if sys .stderr is not None :
1152
- sys .stderr .flush ()
1153
-
1154
- md = self .finish_metadata (parent , md , reply_content )
1155
- if not self .session :
1156
- return
1157
- self .session .send (
1158
- socket ,
1159
- "apply_reply" ,
1160
- reply_content ,
1161
- parent = parent ,
1162
- ident = ident ,
1163
- buffers = result_buf ,
1164
- metadata = md ,
1165
- )
1166
-
1167
- def do_apply (self , content , bufs , msg_id , reply_metadata ):
1168
- """DEPRECATED"""
1169
- raise NotImplementedError
1170
-
1171
- # ---------------------------------------------------------------------------
1172
- # Control messages (DEPRECATED)
1173
- # ---------------------------------------------------------------------------
1174
-
1175
- async def abort_request (self , socket , ident , parent ): # pragma: no cover
1176
- """abort a specific msg by id"""
1177
- self .log .warning (
1178
- "abort_request is deprecated in kernel_base. It is only part of IPython parallel"
1179
- )
1180
- msg_ids = parent ["content" ].get ("msg_ids" , None )
1181
- if isinstance (msg_ids , str ):
1182
- msg_ids = [msg_ids ]
1183
- for mid in msg_ids :
1184
- self .aborted .add (str (mid ))
1185
-
1186
- content = dict (status = "ok" )
1187
- if not self .session :
1188
- return
1189
- reply_msg = self .session .send (
1190
- socket , "abort_reply" , content = content , parent = parent , ident = ident
1191
- )
1192
- self .log .debug ("%s" , reply_msg )
1193
-
1194
- async def clear_request (self , socket , idents , parent ): # pragma: no cover
1195
- """Clear our namespace."""
1196
- self .log .warning (
1197
- "clear_request is deprecated in kernel_base. It is only part of IPython parallel"
1198
- )
1199
- content = self .do_clear ()
1200
- if self .session :
1201
- self .session .send (socket , "clear_reply" , ident = idents , parent = parent , content = content )
1202
-
1203
- def do_clear (self ):
1204
- """DEPRECATED since 4.0.3"""
1205
- raise NotImplementedError
1206
-
1207
1123
# ---------------------------------------------------------------------------
1208
1124
# Protected interface
1209
1125
# ---------------------------------------------------------------------------
0 commit comments