7
7
from queue import Queue
8
8
import http
9
9
import json
10
+ import multiprocessing .connection
10
11
import os
11
12
import shutil
12
13
import socket
@@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None:
48
49
49
50
class AbstractProcessor (Generic [T ]):
50
51
51
- def write_data (self , writer : IO [bytes ], data : T ) -> None :
52
+ def write_data (self , writer : IO [bytes ], data : T , is_node_ipc : bool ) -> None :
52
53
raise NotImplementedError ()
53
54
54
- def read_data (self , reader : IO [bytes ]) -> Optional [T ]:
55
+ def read_data (self , reader : IO [bytes ], is_node_ipc : bool ) -> Optional [T ]:
55
56
raise NotImplementedError ()
56
57
57
58
58
59
class JsonRpcProcessor (AbstractProcessor [Dict [str , Any ]]):
59
60
60
- def write_data (self , writer : IO [bytes ], data : Dict [str , Any ]) -> None :
61
+ def write_data (self , writer : IO [bytes ], data : Dict [str , Any ], is_node_ipc : bool ) -> None :
61
62
body = self ._encode (data )
62
- writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
63
+ if not is_node_ipc :
64
+ writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
65
+ else :
66
+ writer .write (body + b"\n " )
67
+
68
+ def read_data (self , reader : IO [bytes ], is_node_ipc : bool ) -> Optional [Dict [str , Any ]]:
69
+ if not is_node_ipc :
70
+ headers = http .client .parse_headers (reader ) # type: ignore
71
+ try :
72
+ body = reader .read (int (headers .get ("Content-Length" )))
73
+ except TypeError :
74
+ # Expected error on process stopping. Stop the read loop.
75
+ raise StopLoopError ()
76
+ else :
77
+ body = reader .readline ()
63
78
64
- def read_data (self , reader : IO [bytes ]) -> Optional [Dict [str , Any ]]:
65
- headers = http .client .parse_headers (reader ) # type: ignore
66
- try :
67
- body = reader .read (int (headers .get ("Content-Length" )))
68
- except TypeError :
69
- # Expected error on process stopping. Stop the read loop.
70
- raise StopLoopError ()
71
79
try :
72
80
return self ._decode (body )
73
81
except Exception as ex :
@@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes:
79
87
return json .dumps (
80
88
data ,
81
89
ensure_ascii = False ,
82
- sort_keys = False ,
83
90
check_circular = False ,
84
91
separators = (',' , ':' )
85
92
).encode ('utf-8' )
@@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]):
93
100
94
101
def __init__ (self , name : str , process : subprocess .Popen , socket : Optional [socket .socket ], reader : IO [bytes ],
95
102
writer : IO [bytes ], stderr : Optional [IO [bytes ]], processor : AbstractProcessor [T ],
96
- callback_object : TransportCallbacks [T ]) -> None :
103
+ callback_object : TransportCallbacks [T ], is_node_ipc : bool ) -> None :
97
104
self ._closed = False
98
105
self ._process = process
99
106
self ._socket = socket
@@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket
105
112
self ._writer_thread = threading .Thread (target = self ._write_loop , name = '{}-writer' .format (name ))
106
113
self ._stderr_thread = threading .Thread (target = self ._stderr_loop , name = '{}-stderr' .format (name ))
107
114
self ._callback_object = weakref .ref (callback_object )
115
+ self ._is_node_ipc = is_node_ipc
108
116
self ._send_queue = Queue (0 ) # type: Queue[Union[T, None]]
109
117
self ._reader_thread .start ()
110
118
self ._writer_thread .start ()
@@ -137,7 +145,7 @@ def __del__(self) -> None:
137
145
def _read_loop (self ) -> None :
138
146
try :
139
147
while self ._reader :
140
- payload = self ._processor .read_data (self ._reader )
148
+ payload = self ._processor .read_data (self ._reader , self . _is_node_ipc )
141
149
if payload is None :
142
150
continue
143
151
@@ -190,8 +198,9 @@ def _write_loop(self) -> None:
190
198
d = self ._send_queue .get ()
191
199
if d is None :
192
200
break
193
- self ._processor .write_data (self ._writer , d )
194
- self ._writer .flush ()
201
+ self ._processor .write_data (self ._writer , d , self ._is_node_ipc )
202
+ if not self ._is_node_ipc :
203
+ self ._writer .flush ()
195
204
except (BrokenPipeError , AttributeError ):
196
205
pass
197
206
except Exception as ex :
@@ -223,24 +232,59 @@ def _stderr_loop(self) -> None:
223
232
json_rpc_processor = JsonRpcProcessor ()
224
233
225
234
235
+ class NodeIpcIO ():
236
+ _buf = bytearray ()
237
+ _lines = 0
238
+
239
+ def __init__ (self , conn : multiprocessing .connection ._ConnectionBase ):
240
+ self ._fd = conn .fileno ()
241
+ self ._read = conn ._read # type: ignore
242
+ self ._write = conn ._write # type: ignore
243
+
244
+ # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
245
+ def readline (self ) -> bytearray :
246
+ while self ._lines == 0 :
247
+ chunk = self ._read (self ._fd , 65536 ) # type: bytes
248
+ self ._buf += chunk
249
+ self ._lines += chunk .count (b'\n ' )
250
+
251
+ self ._lines -= 1
252
+ line , _ , self ._buf = self ._buf .partition (b'\n ' )
253
+ return line
254
+
255
+ # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
256
+ def write (self , data : bytes ) -> None :
257
+ while len (data ):
258
+ n = self ._write (self ._fd , data ) # type: int
259
+ data = data [n :]
260
+
261
+
226
262
def create_transport (config : TransportConfig , cwd : Optional [str ],
227
263
callback_object : TransportCallbacks ) -> Transport [Dict [str , Any ]]:
264
+ stderr = subprocess .PIPE
265
+ pass_fds = () # type: Union[Tuple[()], Tuple[int]]
228
266
if config .tcp_port is not None :
229
267
assert config .tcp_port is not None
230
268
if config .tcp_port < 0 :
231
269
stdout = subprocess .PIPE
232
270
else :
233
271
stdout = subprocess .DEVNULL
234
272
stdin = subprocess .DEVNULL
235
- else :
273
+ elif not config . node_ipc :
236
274
stdout = subprocess .PIPE
237
275
stdin = subprocess .PIPE
276
+ else :
277
+ stdout = subprocess .PIPE
278
+ stdin = subprocess .DEVNULL
279
+ stderr = subprocess .STDOUT
280
+ pass_fds = (config .node_ipc .child_conn .fileno (),)
281
+
238
282
startupinfo = _fixup_startup_args (config .command )
239
283
sock = None # type: Optional[socket.socket]
240
284
process = None # type: Optional[subprocess.Popen]
241
285
242
286
def start_subprocess () -> subprocess .Popen :
243
- return _start_subprocess (config .command , stdin , stdout , subprocess . PIPE , startupinfo , config .env , cwd )
287
+ return _start_subprocess (config .command , stdin , stdout , stderr , startupinfo , config .env , cwd , pass_fds )
244
288
245
289
if config .listener_socket :
246
290
assert isinstance (config .tcp_port , int ) and config .tcp_port > 0
@@ -258,13 +302,16 @@ def start_subprocess() -> subprocess.Popen:
258
302
raise RuntimeError ("Failed to connect on port {}" .format (config .tcp_port ))
259
303
reader = sock .makefile ('rwb' ) # type: ignore
260
304
writer = reader
261
- else :
305
+ elif not config . node_ipc :
262
306
reader = process .stdout # type: ignore
263
307
writer = process .stdin # type: ignore
308
+ else :
309
+ reader = writer = NodeIpcIO (config .node_ipc .parent_conn ) # type: ignore
264
310
if not reader or not writer :
265
311
raise RuntimeError ('Failed initializing transport: reader: {}, writer: {}' .format (reader , writer ))
266
- return ProcessTransport (config .name , process , sock , reader , writer , process .stderr , json_rpc_processor ,
267
- callback_object )
312
+ stderr_reader = process .stdout if config .node_ipc else process .stderr
313
+ return ProcessTransport (config .name , process , sock , reader , writer , stderr_reader , json_rpc_processor ,
314
+ callback_object , bool (config .node_ipc ))
268
315
269
316
270
317
_subprocesses = weakref .WeakSet () # type: weakref.WeakSet[subprocess.Popen]
@@ -312,7 +359,8 @@ def _start_subprocess(
312
359
stderr : int ,
313
360
startupinfo : Any ,
314
361
env : Dict [str , str ],
315
- cwd : Optional [str ]
362
+ cwd : Optional [str ],
363
+ pass_fds : Union [Tuple [()], Tuple [int ]]
316
364
) -> subprocess .Popen :
317
365
debug ("starting {} in {}" .format (args , cwd if cwd else os .getcwd ()))
318
366
process = subprocess .Popen (
@@ -322,7 +370,8 @@ def _start_subprocess(
322
370
stderr = stderr ,
323
371
startupinfo = startupinfo ,
324
372
env = env ,
325
- cwd = cwd )
373
+ cwd = cwd ,
374
+ pass_fds = pass_fds )
326
375
_subprocesses .add (process )
327
376
return process
328
377
0 commit comments