15
15
"""
16
16
import asyncio
17
17
import logging
18
- import multiprocessing
19
18
import os
19
+ import pickle
20
20
import signal
21
21
import sys
22
- from multiprocessing import Pipe
23
- from multiprocessing . connection import Connection
22
+ from subprocess import PIPE
23
+ from subprocess import Popen
24
24
from threading import Thread
25
25
26
26
import psutil
@@ -51,7 +51,7 @@ def __init__(
51
51
registration_url : str ,
52
52
identity : bytes ,
53
53
config : Config ,
54
- start_pipe : Pipe ,
54
+ pipe ,
55
55
log_level : int = logging .INFO ,
56
56
):
57
57
self .pid = pid
@@ -60,9 +60,10 @@ def __init__(
60
60
self .control_url = control_url
61
61
self .registration_url = registration_url
62
62
self .identity = identity
63
- self .start_pipe = start_pipe
64
63
self .config = config
64
+ self .pipe = pipe
65
65
self .session = Session (config = self .config )
66
+ log_level = 10
66
67
67
68
self .log = local_logger (f"{ self .__class__ .__name__ } .{ engine_id } " , log_level )
68
69
self .log .propagate = False
@@ -88,6 +89,10 @@ def wait_for_parent_thread(self):
88
89
def pipe_handler (self , fd , events ):
89
90
self .log .debug (f"Pipe event { events } " )
90
91
self .loop .remove_handler (fd )
92
+ try :
93
+ fd .close ()
94
+ except BrokenPipeError :
95
+ pass
91
96
try :
92
97
status = self .parent_process .wait (0 )
93
98
except psutil .TimeoutExpired :
@@ -198,15 +203,12 @@ def start(self):
198
203
port = self .parent_socket .bind_to_random_port ("tcp://127.0.0.1" )
199
204
200
205
# now that we've bound, pass port to parent via AsyncResult
201
- self .start_pipe .send (f"tcp://127.0.0.1:{ port } " )
202
- if sys .platform .startswith ("win" ):
203
- # close the pipe on Windows
204
- self .loop .add_timeout (self .loop .time () + 10 , self .start_pipe .close )
205
- else :
206
- # otherwise, watch for the pipe to close
206
+ self .pipe .write (f"tcp://127.0.0.1:{ port } \n " )
207
+ if not sys .platform .startswith ("win" ):
208
+ # watch for the stdout pipe to close
207
209
# as a signal that our parent is shutting down
208
210
self .loop .add_handler (
209
- self .start_pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
211
+ self .pipe , self .pipe_handler , IOLoop .READ | IOLoop .ERROR
210
212
)
211
213
self .parent_stream = ZMQStream (self .parent_socket )
212
214
self .parent_stream .on_recv_stream (self .dispatch_parent )
@@ -215,6 +217,11 @@ def start(self):
215
217
finally :
216
218
self .loop .close (all_fds = True )
217
219
self .context .term ()
220
+ try :
221
+ self .pipe .close ()
222
+ except BrokenPipeError :
223
+ pass
224
+ self .log .debug ("exiting" )
218
225
219
226
@classmethod
220
227
def main (cls , * args , ** kwargs ):
@@ -244,23 +251,39 @@ def start_nanny(**kwargs):
244
251
instead of connecting directly to the control Scheduler.
245
252
"""
246
253
247
- pipe_r , pipe_w = Pipe (duplex = False )
248
-
249
- kwargs ['start_pipe' ] = pipe_w
250
254
kwargs ['pid' ] = os .getpid ()
251
- # make sure to not use fork, which can be an issue for MPI
252
- p = multiprocessing .get_context ("spawn" ).Process (
253
- target = KernelNanny .main ,
254
- kwargs = kwargs ,
255
- name = "KernelNanny" ,
256
- daemon = True ,
255
+
256
+ env = os .environ .copy ()
257
+ env ['PYTHONUNBUFFERED' ] = '1'
258
+ p = Popen (
259
+ [sys .executable , '-m' , __name__ ],
260
+ stdin = PIPE ,
261
+ stdout = PIPE ,
262
+ env = env ,
263
+ start_new_session = True , # don't inherit signals
257
264
)
258
- p .start ()
259
- # close our copy of the write pipe
260
- pipe_w .close ()
261
- nanny_url = pipe_r .recv ()
262
- if sys .platform .startswith ("win" ):
263
- pipe_r .close ()
264
- # return the handle on the read pipe
265
- # need to keep this open for the nanny
266
- return nanny_url , pipe_r
265
+ p .stdin .write (pickle .dumps (kwargs ))
266
+ p .stdin .close ()
267
+ out = p .stdout .readline ()
268
+ nanny_url = out .decode ("utf8" ).strip ()
269
+ if not nanny_url :
270
+ p .terminate ()
271
+ raise RuntimeError ("nanny failed" )
272
+ # return the handle on the process
273
+ # need to keep the pipe open for the nanny
274
+ return nanny_url , p
275
+
276
+
277
+ def main ():
278
+ """Entrypoint from the command-line
279
+
280
+ Loads kwargs from stdin,
281
+ sets pipe to stdout
282
+ """
283
+ kwargs = pickle .load (os .fdopen (sys .stdin .fileno (), mode = 'rb' ))
284
+ kwargs ['pipe' ] = sys .stdout
285
+ KernelNanny .main (** kwargs )
286
+
287
+
288
+ if __name__ == "__main__" :
289
+ main ()
0 commit comments