77from queue import Queue
88import http
99import json
10+ import multiprocessing .connection
1011import os
1112import shutil
1213import socket
@@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None:
4849
4950class AbstractProcessor (Generic [T ]):
5051
51- def write_data (self , writer : IO [bytes ], data : T ) -> None :
52+ def write_data (self , writer : IO [bytes ], data : T , is_node_ipc = False ) -> None :
5253 raise NotImplementedError ()
5354
54- def read_data (self , reader : IO [bytes ]) -> Optional [T ]:
55+ def read_data (self , reader : IO [bytes ], is_node_ipc = False ) -> Optional [T ]:
5556 raise NotImplementedError ()
5657
5758
5859class JsonRpcProcessor (AbstractProcessor [Dict [str , Any ]]):
5960
60- def write_data (self , writer : IO [bytes ], data : Dict [str , Any ]) -> None :
61+ def write_data (self , writer : IO [bytes ], data : Dict [str , Any ], is_node_ipc = False ) -> None :
6162 body = self ._encode (data )
62- writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
63+ if not is_node_ipc :
64+ writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
65+ else :
66+ writer .write (body + b"\n " )
67+
68+ def read_data (self , reader : IO [bytes ], is_node_ipc = False ) -> Optional [Dict [str , Any ]]:
69+ if not is_node_ipc :
70+ headers = http .client .parse_headers (reader ) # type: ignore
71+ try :
72+ body = reader .read (int (headers .get ("Content-Length" )))
73+ except TypeError :
74+ # Expected error on process stopping. Stop the read loop.
75+ raise StopLoopError ()
76+ else :
77+ body = reader .readline ()
6378
64- def read_data (self , reader : IO [bytes ]) -> Optional [Dict [str , Any ]]:
65- headers = http .client .parse_headers (reader ) # type: ignore
66- try :
67- body = reader .read (int (headers .get ("Content-Length" )))
68- except TypeError :
69- # Expected error on process stopping. Stop the read loop.
70- raise StopLoopError ()
7179 try :
7280 return self ._decode (body )
7381 except Exception as ex :
@@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes:
7987 return json .dumps (
8088 data ,
8189 ensure_ascii = False ,
82- sort_keys = False ,
8390 check_circular = False ,
8491 separators = (',' , ':' )
8592 ).encode ('utf-8' )
@@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]):
93100
94101 def __init__ (self , name : str , process : subprocess .Popen , socket : Optional [socket .socket ], reader : IO [bytes ],
95102 writer : IO [bytes ], stderr : Optional [IO [bytes ]], processor : AbstractProcessor [T ],
96- callback_object : TransportCallbacks [T ]) -> None :
103+ callback_object : TransportCallbacks [T ], is_node_ipc : bool ) -> None :
97104 self ._closed = False
98105 self ._process = process
99106 self ._socket = socket
@@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket
105112 self ._writer_thread = threading .Thread (target = self ._write_loop , name = '{}-writer' .format (name ))
106113 self ._stderr_thread = threading .Thread (target = self ._stderr_loop , name = '{}-stderr' .format (name ))
107114 self ._callback_object = weakref .ref (callback_object )
115+ self ._is_node_ipc = is_node_ipc
108116 self ._send_queue = Queue (0 ) # type: Queue[Union[T, None]]
109117 self ._reader_thread .start ()
110118 self ._writer_thread .start ()
@@ -137,7 +145,7 @@ def __del__(self) -> None:
137145 def _read_loop (self ) -> None :
138146 try :
139147 while self ._reader :
140- payload = self ._processor .read_data (self ._reader )
148+ payload = self ._processor .read_data (self ._reader , self . _is_node_ipc )
141149 if payload is None :
142150 continue
143151
@@ -190,8 +198,9 @@ def _write_loop(self) -> None:
190198 d = self ._send_queue .get ()
191199 if d is None :
192200 break
193- self ._processor .write_data (self ._writer , d )
194- self ._writer .flush ()
201+ self ._processor .write_data (self ._writer , d , self ._is_node_ipc )
202+ if not self ._is_node_ipc :
203+ self ._writer .flush ()
195204 except (BrokenPipeError , AttributeError ):
196205 pass
197206 except Exception as ex :
@@ -223,24 +232,58 @@ def _stderr_loop(self) -> None:
223232json_rpc_processor = JsonRpcProcessor ()
224233
225234
235+ class NodeIpcIO ():
236+ _buf = bytearray ()
237+ _lines = 0
238+
239+ def __init__ (self , conn : multiprocessing .connection ._ConnectionBase ):
240+ self ._conn = conn
241+
242+ # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
243+ def readline (self ):
244+ while self ._lines == 0 :
245+ chunk = self ._conn ._read (self ._conn .fileno (), 65536 ) # type: bytes
246+ self ._buf += chunk
247+ self ._lines += chunk .count (b'\n ' )
248+
249+ self ._lines -= 1
250+ foo , _ , self ._buf = self ._buf .partition (b'\n ' )
251+ print ('READLINE: ' + str (foo ))
252+ return foo
253+
254+ # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
255+ def write (self , data : bytes ):
256+ while len (data ):
257+ n = self ._conn ._write (self ._conn .fileno (), data )
258+ data = data [n :]
259+
260+
226261def create_transport (config : TransportConfig , cwd : Optional [str ],
227262 callback_object : TransportCallbacks ) -> Transport [Dict [str , Any ]]:
263+ stderr = subprocess .PIPE
264+ pass_fds = ()
228265 if config .tcp_port is not None :
229266 assert config .tcp_port is not None
230267 if config .tcp_port < 0 :
231268 stdout = subprocess .PIPE
232269 else :
233270 stdout = subprocess .DEVNULL
234271 stdin = subprocess .DEVNULL
235- else :
272+ elif not config . node_ipc :
236273 stdout = subprocess .PIPE
237274 stdin = subprocess .PIPE
275+ else :
276+ stdout = subprocess .PIPE
277+ stdin = subprocess .DEVNULL
278+ stderr = subprocess .STDOUT
279+ pass_fds = (config .node_ipc .child_conn .fileno (),)
280+
238281 startupinfo = _fixup_startup_args (config .command )
239282 sock = None # type: Optional[socket.socket]
240283 process = None # type: Optional[subprocess.Popen]
241284
242285 def start_subprocess () -> subprocess .Popen :
243- return _start_subprocess (config .command , stdin , stdout , subprocess . PIPE , startupinfo , config .env , cwd )
286+ return _start_subprocess (config .command , stdin , stdout , stderr , startupinfo , config .env , cwd , pass_fds )
244287
245288 if config .listener_socket :
246289 assert isinstance (config .tcp_port , int ) and config .tcp_port > 0
@@ -258,13 +301,16 @@ def start_subprocess() -> subprocess.Popen:
258301 raise RuntimeError ("Failed to connect on port {}" .format (config .tcp_port ))
259302 reader = sock .makefile ('rwb' ) # type: ignore
260303 writer = reader
261- else :
304+ elif not config . node_ipc :
262305 reader = process .stdout # type: ignore
263306 writer = process .stdin # type: ignore
307+ else :
308+ reader = writer = NodeIpcIO (config .node_ipc .parent_conn )
264309 if not reader or not writer :
265310 raise RuntimeError ('Failed initializing transport: reader: {}, writer: {}' .format (reader , writer ))
266- return ProcessTransport (config .name , process , sock , reader , writer , process .stderr , json_rpc_processor ,
267- callback_object )
311+ stderr_reader = process .stdout if config .node_ipc else process .stderr
312+ return ProcessTransport (config .name , process , sock , reader , writer , stderr_reader , json_rpc_processor ,
313+ callback_object , bool (config .node_ipc ))
268314
269315
270316_subprocesses = weakref .WeakSet () # type: weakref.WeakSet[subprocess.Popen]
@@ -312,7 +358,8 @@ def _start_subprocess(
312358 stderr : int ,
313359 startupinfo : Any ,
314360 env : Dict [str , str ],
315- cwd : Optional [str ]
361+ cwd : Optional [str ],
362+ pass_fds : Union [Tuple [()], Tuple [int ]]
316363) -> subprocess .Popen :
317364 debug ("starting {} in {}" .format (args , cwd if cwd else os .getcwd ()))
318365 process = subprocess .Popen (
@@ -322,7 +369,8 @@ def _start_subprocess(
322369 stderr = stderr ,
323370 startupinfo = startupinfo ,
324371 env = env ,
325- cwd = cwd )
372+ cwd = cwd ,
373+ pass_fds = pass_fds )
326374 _subprocesses .add (process )
327375 return process
328376
0 commit comments