2
2
import base64
3
3
from textwrap import dedent
4
4
5
- # For python 3.5 compatibility we import asynccontextmanager from async_generator instead of
6
- # contextlib, and we `await yield_()` instead of just `yield`
7
- from async_generator import asynccontextmanager , async_generator , yield_
5
+ from async_generator import asynccontextmanager
6
+ from contextlib import contextmanager
8
7
9
8
from time import monotonic
10
9
from queue import Empty
15
14
16
15
from nbformat .v4 import output_from_msg
17
16
18
- from .exceptions import CellTimeoutError , DeadKernelError , CellExecutionComplete , CellExecutionError
19
- from .util import run_sync
17
+ from .exceptions import (
18
+ CellControlSignal ,
19
+ CellTimeoutError ,
20
+ DeadKernelError ,
21
+ CellExecutionComplete ,
22
+ CellExecutionError
23
+ )
24
+ from .util import run_sync , ensure_async
20
25
21
26
22
27
def timestamp ():
@@ -324,7 +329,28 @@ def start_kernel_manager(self):
324
329
self .km .client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
325
330
return self .km
326
331
327
- async def start_new_kernel_client (self , ** kwargs ):
332
+ async def _async_cleanup_kernel (self ):
333
+ try :
334
+ # Send a polite shutdown request
335
+ await ensure_async (self .kc .shutdown ())
336
+ try :
337
+ # Queue the manager to kill the process, sometimes the built-in and above
338
+ # shutdowns have not been successful or called yet, so give a direct kill
339
+ # call here and recover gracefully if it's already dead.
340
+ await ensure_async (self .km .shutdown_kernel (now = True ))
341
+ except RuntimeError as e :
342
+ # The error isn't specialized, so we have to check the message
343
+ if 'No kernel is running!' not in str (e ):
344
+ raise
345
+ finally :
346
+ # Remove any state left over even if we failed to stop the kernel
347
+ await ensure_async (self .km .cleanup ())
348
+ await ensure_async (self .kc .stop_channels ())
349
+ self .kc = None
350
+
351
+ _cleanup_kernel = run_sync (_async_cleanup_kernel )
352
+
353
+ async def async_start_new_kernel_client (self , ** kwargs ):
328
354
"""Creates a new kernel client.
329
355
330
356
Parameters
@@ -346,22 +372,43 @@ async def start_new_kernel_client(self, **kwargs):
346
372
if self .km .ipykernel and self .ipython_hist_file :
347
373
self .extra_arguments += ['--HistoryManager.hist_file={}' .format (self .ipython_hist_file )]
348
374
349
- await self .km .start_kernel (extra_arguments = self .extra_arguments , ** kwargs )
375
+ await ensure_async ( self .km .start_kernel (extra_arguments = self .extra_arguments , ** kwargs ) )
350
376
351
377
self .kc = self .km .client ()
352
- self .kc .start_channels ()
378
+ await ensure_async ( self .kc .start_channels () )
353
379
try :
354
- await self .kc .wait_for_ready (timeout = self .startup_timeout )
380
+ await ensure_async ( self .kc .wait_for_ready (timeout = self .startup_timeout ) )
355
381
except RuntimeError :
356
- self .kc .stop_channels ()
357
- await self .km .shutdown_kernel ()
382
+ await self ._async_cleanup_kernel ()
358
383
raise
359
384
self .kc .allow_stdin = False
360
385
return self .kc
361
386
387
+ start_new_kernel_client = run_sync (async_start_new_kernel_client )
388
+
389
+ @contextmanager
390
+ def setup_kernel (self , ** kwargs ):
391
+ """
392
+ Context manager for setting up the kernel to execute a notebook.
393
+
394
+ The assigns the Kernel Manager (`self.km`) if missing and Kernel Client(`self.kc`).
395
+
396
+ When control returns from the yield it stops the client's zmq channels, and shuts
397
+ down the kernel.
398
+ """
399
+ # Can't use run_until_complete on an asynccontextmanager function :(
400
+ if self .km is None :
401
+ self .start_kernel_manager ()
402
+
403
+ if not self .km .has_kernel :
404
+ self .start_new_kernel_client (** kwargs )
405
+ try :
406
+ yield
407
+ finally :
408
+ self ._cleanup_kernel ()
409
+
362
410
@asynccontextmanager
363
- @async_generator # needed for python 3.5 compatibility
364
- async def setup_kernel (self , ** kwargs ):
411
+ async def async_setup_kernel (self , ** kwargs ):
365
412
"""
366
413
Context manager for setting up the kernel to execute a notebook.
367
414
@@ -374,12 +421,11 @@ async def setup_kernel(self, **kwargs):
374
421
self .start_kernel_manager ()
375
422
376
423
if not self .km .has_kernel :
377
- await self .start_new_kernel_client (** kwargs )
424
+ await self .async_start_new_kernel_client (** kwargs )
378
425
try :
379
- await yield_ ( None ) # would just yield in python >3.5
426
+ yield
380
427
finally :
381
- self .kc .stop_channels ()
382
- self .kc = None
428
+ await self ._async_cleanup_kernel ()
383
429
384
430
async def async_execute (self , ** kwargs ):
385
431
"""
@@ -392,15 +438,16 @@ async def async_execute(self, **kwargs):
392
438
"""
393
439
self .reset_execution_trackers ()
394
440
395
- async with self .setup_kernel (** kwargs ):
441
+ async with self .async_setup_kernel (** kwargs ):
396
442
self .log .info ("Executing notebook with kernel: %s" % self .kernel_name )
397
443
for index , cell in enumerate (self .nb .cells ):
398
444
# Ignore `'execution_count' in content` as it's always 1
399
445
# when store_history is False
400
446
await self .async_execute_cell (
401
447
cell , index , execution_count = self .code_cells_executed + 1
402
448
)
403
- info_msg = await self ._wait_for_reply (self .kc .kernel_info ())
449
+ msg_id = await ensure_async (self .kc .kernel_info ())
450
+ info_msg = await self .async_wait_for_reply (msg_id )
404
451
self .nb .metadata ['language_info' ] = info_msg ['content' ]['language_info' ]
405
452
self .set_widgets_metadata ()
406
453
@@ -450,12 +497,12 @@ def _update_display_id(self, display_id, msg):
450
497
outputs [output_idx ]['data' ] = out ['data' ]
451
498
outputs [output_idx ]['metadata' ] = out ['metadata' ]
452
499
453
- async def _poll_for_reply (self , msg_id , cell , timeout , task_poll_output_msg ):
500
+ async def _async_poll_for_reply (self , msg_id , cell , timeout , task_poll_output_msg ):
454
501
if timeout is not None :
455
502
deadline = monotonic () + timeout
456
503
while True :
457
504
try :
458
- msg = await self .kc .shell_channel .get_msg (timeout = timeout )
505
+ msg = await ensure_async ( self .kc .shell_channel .get_msg (timeout = timeout ) )
459
506
if msg ['parent_header' ].get ('msg_id' ) == msg_id :
460
507
if self .record_timing :
461
508
cell ['metadata' ]['execution' ]['shell.execute_reply' ] = timestamp ()
@@ -474,12 +521,12 @@ async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
474
521
timeout = max (0 , deadline - monotonic ())
475
522
except Empty :
476
523
# received no message, check if kernel is still alive
477
- await self ._check_alive ()
478
- await self ._handle_timeout (timeout , cell )
524
+ await self ._async_check_alive ()
525
+ await self ._async_handle_timeout (timeout , cell )
479
526
480
- async def _poll_output_msg (self , parent_msg_id , cell , cell_index ):
527
+ async def _async_poll_output_msg (self , parent_msg_id , cell , cell_index ):
481
528
while True :
482
- msg = await self .kc .iopub_channel .get_msg (timeout = None )
529
+ msg = await ensure_async ( self .kc .iopub_channel .get_msg (timeout = None ) )
483
530
if msg ['parent_header' ].get ('msg_id' ) == parent_msg_id :
484
531
try :
485
532
# Will raise CellExecutionComplete when completed
@@ -498,39 +545,46 @@ def _get_timeout(self, cell):
498
545
499
546
return timeout
500
547
501
- async def _handle_timeout (self , timeout , cell = None ):
548
+ async def _async_handle_timeout (self , timeout , cell = None ):
502
549
self .log .error ("Timeout waiting for execute reply (%is)." % timeout )
503
550
if self .interrupt_on_timeout :
504
551
self .log .error ("Interrupting kernel" )
505
- await self .km .interrupt_kernel ()
552
+ await ensure_async ( self .km .interrupt_kernel () )
506
553
else :
507
554
raise CellTimeoutError .error_from_timeout_and_cell (
508
555
"Cell execution timed out" , timeout , cell
509
556
)
510
557
511
- async def _check_alive (self ):
512
- if not await self .kc .is_alive ():
558
+ async def _async_check_alive (self ):
559
+ if not await ensure_async ( self .kc .is_alive () ):
513
560
self .log .error ("Kernel died while waiting for execute reply." )
514
561
raise DeadKernelError ("Kernel died" )
515
562
516
- async def _wait_for_reply (self , msg_id , cell = None ):
563
+ async def async_wait_for_reply (self , msg_id , cell = None ):
517
564
# wait for finish, with timeout
518
565
timeout = self ._get_timeout (cell )
519
566
cummulative_time = 0
520
- self .shell_timeout_interval = 5
521
567
while True :
522
568
try :
523
- msg = await self .kc .shell_channel .get_msg (timeout = self .shell_timeout_interval )
569
+ msg = await ensure_async (
570
+ self .kc .shell_channel .get_msg (
571
+ timeout = self .shell_timeout_interval
572
+ )
573
+ )
524
574
except Empty :
525
- await self ._check_alive ()
575
+ await self ._async_check_alive ()
526
576
cummulative_time += self .shell_timeout_interval
527
577
if timeout and cummulative_time > timeout :
528
- await self ._handle_timeout (timeout , cell )
578
+ await self ._async_async_handle_timeout (timeout , cell )
529
579
break
530
580
else :
531
581
if msg ['parent_header' ].get ('msg_id' ) == msg_id :
532
582
return msg
533
583
584
+ wait_for_reply = run_sync (async_wait_for_reply )
585
+ # Backwards compatability naming for papermill
586
+ _wait_for_reply = wait_for_reply
587
+
534
588
def _timeout_with_deadline (self , timeout , deadline ):
535
589
if deadline is not None and deadline - monotonic () < timeout :
536
590
timeout = deadline - monotonic ()
@@ -596,8 +650,12 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store
596
650
cell ['metadata' ]['execution' ] = {}
597
651
598
652
self .log .debug ("Executing cell:\n %s" , cell .source )
599
- parent_msg_id = self .kc .execute (
600
- cell .source , store_history = store_history , stop_on_error = not self .allow_errors
653
+ parent_msg_id = await ensure_async (
654
+ self .kc .execute (
655
+ cell .source ,
656
+ store_history = store_history ,
657
+ stop_on_error = not self .allow_errors
658
+ )
601
659
)
602
660
# We launched a code cell to execute
603
661
self .code_cells_executed += 1
@@ -607,11 +665,20 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store
607
665
self .clear_before_next_output = False
608
666
609
667
task_poll_output_msg = asyncio .ensure_future (
610
- self ._poll_output_msg (parent_msg_id , cell , cell_index )
611
- )
612
- exec_reply = await self ._poll_for_reply (
613
- parent_msg_id , cell , exec_timeout , task_poll_output_msg
668
+ self ._async_poll_output_msg (parent_msg_id , cell , cell_index )
614
669
)
670
+ try :
671
+ exec_reply = await self ._async_poll_for_reply (
672
+ parent_msg_id , cell , exec_timeout , task_poll_output_msg
673
+ )
674
+ except Exception as e :
675
+ # Best effort to cancel request if it hasn't been resolved
676
+ try :
677
+ # Check if the task_poll_output is doing the raising for us
678
+ if not isinstance (e , CellControlSignal ):
679
+ task_poll_output_msg .cancel ()
680
+ finally :
681
+ raise
615
682
616
683
if execution_count :
617
684
cell ['execution_count' ] = execution_count
0 commit comments