5
5
6
6
import asyncio
7
7
import concurrent .futures
8
- from datetime import datetime
9
- from functools import partial
8
+ import inspect
10
9
import itertools
11
10
import logging
12
- import inspect
13
11
import os
14
- from signal import signal , default_int_handler , SIGINT
15
- import sys
16
12
import socket
13
+ import sys
17
14
import time
18
15
import uuid
19
16
import warnings
17
+ from datetime import datetime
18
+ from functools import partial
19
+ from signal import (SIGINT , SIGKILL , SIGTERM , Signals , default_int_handler ,
20
+ signal )
21
+
20
22
try :
21
23
import psutil
22
24
except ImportError :
23
25
psutil = None
24
26
27
+
25
28
try :
26
29
# jupyter_client >= 5, use tz-aware now
27
30
from jupyter_client .session import utcnow as now
28
31
except ImportError :
29
32
# jupyter_client < 5, use local now()
30
33
now = datetime .now
31
34
35
+ import zmq
36
+ from IPython .core .error import StdinNotImplementedError
37
+ from jupyter_client .session import Session
32
38
from tornado import ioloop
33
39
from tornado .queues import Queue , QueueEmpty
34
- import zmq
40
+ from traitlets import (Any , Bool , Dict , Float , Instance , Integer , List , Set ,
41
+ Unicode , default , observe )
42
+ from traitlets .config .configurable import SingletonConfigurable
35
43
from zmq .eventloop .zmqstream import ZMQStream
36
44
37
- from traitlets .config .configurable import SingletonConfigurable
38
- from IPython .core .error import StdinNotImplementedError
39
45
from ipykernel .jsonutil import json_clean
40
- from traitlets import (
41
- Any , Instance , Float , Dict , List , Set , Integer , Unicode , Bool ,
42
- observe , default
43
- )
44
-
45
- from jupyter_client .session import Session
46
46
47
47
from ._version import kernel_protocol_version
48
48
@@ -796,13 +796,13 @@ async def comm_info_request(self, stream, ident, parent):
796
796
reply_content , parent , ident )
797
797
self .log .debug ("%s" , msg )
798
798
799
- async def interrupt_request (self , stream , ident , parent ):
799
+ def _send_interupt_children (self ):
800
+
800
801
pid = os .getpid ()
801
802
pgid = os .getpgid (pid )
802
803
803
804
if os .name == "nt" :
804
805
self .log .error ("Interrupt message not supported on Windows" )
805
-
806
806
else :
807
807
# Prefer process-group over process
808
808
if pgid and hasattr (os , "killpg" ):
@@ -816,6 +816,8 @@ async def interrupt_request(self, stream, ident, parent):
816
816
except OSError :
817
817
pass
818
818
819
+ async def interrupt_request (self , stream , ident , parent ):
820
+ self ._send_interupt_children ()
819
821
content = parent ['content' ]
820
822
self .session .send (stream , 'interrupt_reply' , content , parent , ident = ident )
821
823
return
@@ -830,7 +832,7 @@ async def shutdown_request(self, stream, ident, parent):
830
832
content , parent
831
833
)
832
834
833
- self ._at_shutdown ()
835
+ await self ._at_shutdown ()
834
836
835
837
self .log .debug ('Stopping control ioloop' )
836
838
control_io_loop = self .control_stream .io_loop
@@ -1131,9 +1133,60 @@ def _input_request(self, prompt, ident, parent, password=False):
1131
1133
raise EOFError
1132
1134
return value
1133
1135
1134
- def _at_shutdown (self ):
1136
+ async def _progressively_terminate_all_children (self ):
1137
+
1138
+ pgid = os .getpgid (os .getpid ())
1139
+ if not pgid :
1140
+ self .log .warning (f"No Pgid ({ pgid = } ), not trying to stop subprocesses." )
1141
+ return
1142
+ if psutil is None :
1143
+ # blindly send quickly sigterm/sigkill to processes if psutil not there.
1144
+ self .log .warning (
1145
+ f"Please install psutil for a cleaner subprocess shutdown."
1146
+ )
1147
+ self ._send_interupt_children ()
1148
+ try :
1149
+ await asyncio .sleep (0.05 )
1150
+ self .log .debug ("Sending SIGTERM to {pgid=}" )
1151
+ os .killpg (pgid , SIGTERM )
1152
+ await asyncio .sleep (0.05 )
1153
+ self .log .debug ("Sending SIGKILL to {pgid=}" )
1154
+ os .killpg (pgid , SIGKILL )
1155
+ except Exception :
1156
+ self .log .exception ("Exception during subprocesses termination" )
1157
+ return
1158
+
1159
+ sleeps = (0.01 , 0.03 , 0.1 , 0.3 , 1 )
1160
+ children = psutil .Process ().children (recursive = True )
1161
+ if not children :
1162
+ self .log .debug ("Kernel has no children." )
1163
+ return
1164
+ self .log .debug (f"Trying to interrupt then kill subprocesses : { children = } " )
1165
+ self ._send_interupt_children ()
1166
+ for signum in (SIGTERM , SIGKILL ):
1167
+ self .log .debug (
1168
+ f"Will try to send { signum } ({ Signals (signum )} ) to subprocesses :{ children } "
1169
+ )
1170
+ for delay in sleeps :
1171
+ children = psutil .Process ().children (recursive = True )
1172
+ if not children :
1173
+ self .log .debug ("No more children, continuing shutdown routine." )
1174
+ return
1175
+ if pgid and hasattr (os , "killpg" ):
1176
+ try :
1177
+ os .killpg (pgid , signum )
1178
+ except OSError :
1179
+ self .log .warning ("OSError running killpg, not killing children" )
1180
+ return
1181
+ self .log .debug (
1182
+ f"Will sleep { delay } s before checking for children and retrying."
1183
+ )
1184
+ await ascynio .sleep (delay )
1185
+
1186
+ async def _at_shutdown (self ):
1135
1187
"""Actions taken at shutdown by the kernel, called by python's atexit.
1136
1188
"""
1189
+ await self ._progressively_terminate_all_children ()
1137
1190
if self ._shutdown_message is not None :
1138
1191
self .session .send (self .iopub_socket , self ._shutdown_message , ident = self ._topic ('shutdown' ))
1139
1192
self .log .debug ("%s" , self ._shutdown_message )
0 commit comments