@@ -49,10 +49,10 @@ def on_stderr_message(self, message: str) -> None:
49
49
50
50
class AbstractProcessor (Generic [T ]):
51
51
52
- def write_data (self , data : T ) -> None :
52
+ def write_data (self , writer : IO [ bytes ], data : T ) -> None :
53
53
raise NotImplementedError ()
54
54
55
- def read_data (self ) -> Optional [T ]:
55
+ def read_data (self , reader : IO [ bytes ] ) -> Optional [T ]:
56
56
raise NotImplementedError ()
57
57
58
58
@@ -74,20 +74,15 @@ def decode_payload(message: bytes) -> Optional[Dict[str, Any]]:
74
74
75
75
76
76
class StandardProcessor (AbstractProcessor [Dict [str , Any ]]):
77
-
78
- def __init__ (self , reader : IO [bytes ], writer : IO [bytes ]):
79
- self ._reader = reader
80
- self ._writer = writer
81
-
82
- def write_data (self , data : Dict [str , Any ]) -> None :
77
+ def write_data (self , writer : IO [bytes ], data : Dict [str , Any ]) -> None :
83
78
body = encode_payload (data )
84
- self . _writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
85
- self . _writer .flush ()
79
+ writer .writelines (("Content-Length: {}\r \n \r \n " .format (len (body )).encode ('ascii' ), body ))
80
+ writer .flush ()
86
81
87
- def read_data (self ) -> Optional [Dict [str , Any ]]:
88
- headers = http .client .parse_headers (self . _reader ) # type: ignore
82
+ def read_data (self , reader : IO [ bytes ] ) -> Optional [Dict [str , Any ]]:
83
+ headers = http .client .parse_headers (reader ) # type: ignore
89
84
try :
90
- body = self . _reader .read (int (headers .get ("Content-Length" )))
85
+ body = reader .read (int (headers .get ("Content-Length" )))
91
86
except TypeError :
92
87
# Expected error on process stopping. Stop the read loop.
93
88
raise StopLoopError ()
@@ -98,18 +93,15 @@ class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]):
98
93
_buf = bytearray ()
99
94
_lines = 0
100
95
101
- def __init__ (self , conn : multiprocessing .connection ._ConnectionBase ):
102
- self ._conn = conn
103
-
104
- def write_data (self , data : Dict [str , Any ]) -> None :
96
+ def write_data (self , connection : multiprocessing .connection ._ConnectionBase , data : Dict [str , Any ]) -> None :
105
97
body = encode_payload (data ) + b"\n "
106
98
while len (body ):
107
- n = self . _conn . _write (self . _conn .fileno (), body ) # type: ignore
99
+ n = connection . _write (connection .fileno (), body ) # type: ignore
108
100
body = body [n :]
109
101
110
- def read_data (self ) -> Optional [Dict [str , Any ]]:
102
+ def read_data (self , connection : multiprocessing . connection . _ConnectionBase ) -> Optional [Dict [str , Any ]]:
111
103
while self ._lines == 0 :
112
- chunk = self . _conn . _read (self . _conn .fileno (), 65536 ) # type: ignore
104
+ chunk = connection . _read (connection .fileno (), 65536 ) # type: ignore
113
105
if len (chunk ) == 0 :
114
106
# EOF reached: https://docs.python.org/3/library/os.html#os.read
115
107
raise StopLoopError ()
@@ -124,16 +116,14 @@ def read_data(self) -> Optional[Dict[str, Any]]:
124
116
125
117
class ProcessTransport (Transport [T ]):
126
118
127
- def __init__ (self ,
128
- name : str ,
129
- process : subprocess .Popen ,
130
- socket : Optional [socket .socket ],
131
- stderr : Optional [IO [bytes ]],
132
- processor : AbstractProcessor [T ],
119
+ def __init__ (self , name : str , process : subprocess .Popen , socket : Optional [socket .socket ], reader : Any ,
120
+ writer : Any , stderr : Optional [IO [bytes ]], processor : AbstractProcessor [T ],
133
121
callback_object : TransportCallbacks [T ]) -> None :
134
122
self ._closed = False
135
123
self ._process = process
136
124
self ._socket = socket
125
+ self ._reader = reader
126
+ self ._writer = writer
137
127
self ._stderr = stderr
138
128
self ._processor = processor
139
129
self ._reader_thread = threading .Thread (target = self ._read_loop , name = '{}-reader' .format (name ))
@@ -171,8 +161,8 @@ def __del__(self) -> None:
171
161
172
162
def _read_loop (self ) -> None :
173
163
try :
174
- while True :
175
- payload = self ._processor .read_data ()
164
+ while self . _reader :
165
+ payload = self ._processor .read_data (self . _reader )
176
166
if payload is None :
177
167
continue
178
168
@@ -221,11 +211,11 @@ def invoke() -> None:
221
211
def _write_loop (self ) -> None :
222
212
exception = None # type: Optional[Exception]
223
213
try :
224
- while True :
214
+ while self . _writer :
225
215
d = self ._send_queue .get ()
226
216
if d is None :
227
217
break
228
- self ._processor .write_data (d )
218
+ self ._processor .write_data (self . _writer , d )
229
219
except (BrokenPipeError , AttributeError ):
230
220
pass
231
221
except Exception as ex :
@@ -252,6 +242,10 @@ def _stderr_loop(self) -> None:
252
242
exception_log ('unexpected exception type in stderr loop' , ex )
253
243
self ._send_queue .put_nowait (None )
254
244
245
+ # Can be a singleton since it doesn't hold any state.
246
+ standard_processor = StandardProcessor ()
247
+ node_ipc_processor = NodeIpcProcessor ()
248
+
255
249
256
250
def create_transport (config : TransportConfig , cwd : Optional [str ],
257
251
callback_object : TransportCallbacks ) -> Transport [Dict [str , Any ]]:
@@ -264,14 +258,14 @@ def create_transport(config: TransportConfig, cwd: Optional[str],
264
258
else :
265
259
stdout = subprocess .DEVNULL
266
260
stdin = subprocess .DEVNULL
267
- elif not config .node_ipc :
268
- stdout = subprocess .PIPE
269
- stdin = subprocess .PIPE
270
- else :
261
+ elif config .node_ipc :
271
262
stdout = subprocess .PIPE
272
263
stdin = subprocess .DEVNULL
273
264
stderr = subprocess .STDOUT
274
- pass_fds = (config .node_ipc .child_conn .fileno (),)
265
+ pass_fds = (config .node_ipc .child_connection .fileno (),)
266
+ else :
267
+ stdout = subprocess .PIPE
268
+ stdin = subprocess .PIPE
275
269
276
270
startupinfo = _fixup_startup_args (config .command )
277
271
sock = None # type: Optional[socket.socket]
@@ -288,27 +282,31 @@ def start_subprocess() -> subprocess.Popen:
288
282
config .listener_socket ,
289
283
start_subprocess
290
284
)
291
- processor = StandardProcessor (reader , writer ) # type: AbstractProcessor
292
285
else :
293
286
process = start_subprocess ()
294
287
if config .tcp_port :
295
288
sock = _connect_tcp (config .tcp_port )
296
289
if sock is None :
297
290
raise RuntimeError ("Failed to connect on port {}" .format (config .tcp_port ))
298
291
reader = writer = sock .makefile ('rwb' )
299
- processor = StandardProcessor (reader , writer )
300
- elif not config .node_ipc :
292
+ elif config .node_ipc :
293
+ reader = writer = config .node_ipc .parent_connection
294
+ else :
301
295
if not process .stdout or not process .stdin :
302
296
raise RuntimeError (
303
297
'Failed initializing transport: reader: {}, writer: {}'
304
298
.format (process .stdout , process .stdin )
305
299
)
306
- processor = StandardProcessor (process .stdout , process .stdin )
307
- else :
308
- processor = NodeIpcProcessor (config .node_ipc .parent_conn )
309
-
300
+ reader = process .stdout
301
+ writer = process .stdin
310
302
stderr_reader = process .stdout if config .node_ipc else process .stderr
311
- return ProcessTransport (config .name , process , sock , stderr_reader , processor , callback_object )
303
+ processor = node_ipc_processor if config .node_ipc else standard_processor
304
+
305
+ if not reader or not writer :
306
+ raise RuntimeError ('Failed initializing transport: reader: {}, writer: {}' .format (reader , writer ))
307
+
308
+ return ProcessTransport (config .name , process , sock , reader , writer , stderr_reader , processor ,
309
+ callback_object )
312
310
313
311
314
312
_subprocesses = weakref .WeakSet () # type: weakref.WeakSet[subprocess.Popen]
0 commit comments