@@ -47,6 +47,8 @@ def _default_kernel_manager_class(self):
47
47
48
48
_kernel_connections = Dict ()
49
49
50
+ _kernel_ports = Dict ()
51
+
50
52
_culler_callback = None
51
53
52
54
_initialized_culler = False
@@ -183,6 +185,7 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
183
185
kwargs ['cwd' ] = self .cwd_for_path (path )
184
186
kernel_id = await ensure_async (self .pinned_superclass .start_kernel (self , ** kwargs ))
185
187
self ._kernel_connections [kernel_id ] = 0
188
+ self ._kernel_ports [kernel_id ] = self ._kernels [kernel_id ].ports
186
189
self .start_watching_activity (kernel_id )
187
190
self .log .info ("Kernel started: %s" % kernel_id )
188
191
self .log .debug ("Kernel args: %r" % kwargs )
@@ -208,6 +211,40 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
208
211
209
212
return kernel_id
210
213
214
+ def ports_changed (self , kernel_id ):
215
+ """Used by ZMQChannelsHandler to determine how to coordinate nudge and replays.
216
+
217
+ Ports are captured when starting a kernel (via MappingKernelManager). Ports
218
+ are considered changed (following restarts) if the referenced KernelManager
219
+ is using a set of ports different from those captured at startup. If changes
220
+ are detected, the captured set is updated and a value of True is returned.
221
+
222
+ NOTE: Use is exclusive to ZMQChannelsHandler because this object is a singleton
223
+ instance while ZMQChannelsHandler instances are per WebSocket connection that
224
+ can vary per kernel lifetime.
225
+ """
226
+ changed_ports = self ._get_changed_ports (kernel_id )
227
+ if changed_ports :
228
+ # If changed, update captured ports and return True, else return False.
229
+ self .log .debug (f"Port change detected for kernel: { kernel_id } " )
230
+ self ._kernel_ports [kernel_id ] = changed_ports
231
+ return True
232
+ return False
233
+
234
+ def _get_changed_ports (self , kernel_id ):
235
+ """Internal method to test if a kernel's ports have changed and, if so, return their values.
236
+
237
+ This method does NOT update the captured ports for the kernel as that can only be done
238
+ by ZMQChannelsHandler, but instead returns the new list of ports if they are different
239
+ than those captured at startup. This enables the ability to conditionally restart
240
+ activity monitoring immediately following a kernel's restart (if ports have changed).
241
+ """
242
+ # Get current ports and return comparison with ports captured at startup.
243
+ km = self .get_kernel (kernel_id )
244
+ if km .ports != self ._kernel_ports [kernel_id ]:
245
+ return km .ports
246
+ return None
247
+
211
248
def start_buffering (self , kernel_id , session_key , channels ):
212
249
"""Start buffering messages for a kernel
213
250
@@ -300,10 +337,7 @@ def stop_buffering(self, kernel_id):
300
337
def shutdown_kernel (self , kernel_id , now = False , restart = False ):
301
338
"""Shutdown a kernel by kernel_id"""
302
339
self ._check_kernel_id (kernel_id )
303
- kernel = self ._kernels [kernel_id ]
304
- if kernel ._activity_stream :
305
- kernel ._activity_stream .close ()
306
- kernel ._activity_stream = None
340
+ self .stop_watching_activity (kernel_id )
307
341
self .stop_buffering (kernel_id )
308
342
self ._kernel_connections .pop (kernel_id , None )
309
343
@@ -319,6 +353,7 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False):
319
353
# method is synchronous. However, we'll keep the relative call orders the same from
320
354
# a maintenance perspective.
321
355
self ._kernel_connections .pop (kernel_id , None )
356
+ self ._kernel_ports .pop (kernel_id , None )
322
357
323
358
async def restart_kernel (self , kernel_id , now = False ):
324
359
"""Restart a kernel by kernel_id"""
@@ -359,6 +394,10 @@ def on_restart_failed():
359
394
channel .on_recv (on_reply )
360
395
loop = IOLoop .current ()
361
396
timeout = loop .add_timeout (loop .time () + self .kernel_info_timeout , on_timeout )
397
+ # Re-establish activity watching if ports have changed...
398
+ if self ._get_changed_ports (kernel_id ) is not None :
399
+ self .stop_watching_activity (kernel_id )
400
+ self .start_watching_activity (kernel_id )
362
401
return future
363
402
364
403
def notify_connect (self , kernel_id ):
@@ -440,6 +479,13 @@ def record_activity(msg_list):
440
479
441
480
kernel ._activity_stream .on_recv (record_activity )
442
481
482
+ def stop_watching_activity (self , kernel_id ):
483
+ """Stop watching IOPub messages on a kernel for activity."""
484
+ kernel = self ._kernels [kernel_id ]
485
+ if kernel ._activity_stream :
486
+ kernel ._activity_stream .close ()
487
+ kernel ._activity_stream = None
488
+
443
489
def initialize_culler (self ):
444
490
"""Start idle culler if 'cull_idle_timeout' is greater than zero.
445
491
@@ -511,10 +557,7 @@ def __init__(self, **kwargs):
511
557
async def shutdown_kernel (self , kernel_id , now = False , restart = False ):
512
558
"""Shutdown a kernel by kernel_id"""
513
559
self ._check_kernel_id (kernel_id )
514
- kernel = self ._kernels [kernel_id ]
515
- if kernel ._activity_stream :
516
- kernel ._activity_stream .close ()
517
- kernel ._activity_stream = None
560
+ self .stop_watching_activity (kernel_id )
518
561
self .stop_buffering (kernel_id )
519
562
520
563
# Decrease the metric of number of kernels
@@ -526,4 +569,5 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False):
526
569
# Finish shutting down the kernel before clearing state to avoid a race condition.
527
570
ret = await self .pinned_superclass .shutdown_kernel (self , kernel_id , now = now , restart = restart )
528
571
self ._kernel_connections .pop (kernel_id , None )
572
+ self ._kernel_ports .pop (kernel_id , None )
529
573
return ret
0 commit comments