|
5 | 5 | from contextlib import closing
|
6 | 6 | from functools import partial
|
7 | 7 | from queue import Queue
|
8 |
| -import http |
| 8 | +import http.client |
9 | 9 | import json
|
10 | 10 | import multiprocessing.connection
|
11 | 11 | import os
|
@@ -49,70 +49,99 @@ def on_stderr_message(self, message: str) -> None:
|
49 | 49 |
|
50 | 50 | class AbstractProcessor(Generic[T]):
|
51 | 51 |
|
52 |
| - def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None: |
| 52 | + def write_data(self, data: T) -> None: |
53 | 53 | raise NotImplementedError()
|
54 | 54 |
|
55 |
| - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]: |
| 55 | + def read_data(self) -> Optional[T]: |
56 | 56 | raise NotImplementedError()
|
57 | 57 |
|
58 | 58 |
|
59 |
| -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): |
| 59 | +def encode_payload(data: Dict[str, Any]) -> bytes: |
| 60 | + return json.dumps( |
| 61 | + data, |
| 62 | + ensure_ascii=False, |
| 63 | + check_circular=False, |
| 64 | + separators=(',', ':') |
| 65 | + ).encode('utf-8') |
60 | 66 |
|
61 |
| - def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None: |
62 |
| - body = self._encode(data) |
63 |
| - if not is_node_ipc: |
64 |
| - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) |
65 |
| - else: |
66 |
| - writer.write(body + b"\n") |
67 | 67 |
|
68 |
| - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]: |
69 |
| - if not is_node_ipc: |
70 |
| - headers = http.client.parse_headers(reader) # type: ignore |
71 |
| - try: |
72 |
| - body = reader.read(int(headers.get("Content-Length"))) |
73 |
| - except TypeError: |
74 |
| - # Expected error on process stopping. Stop the read loop. |
75 |
| - raise StopLoopError() |
76 |
| - else: |
77 |
| - body = reader.readline() |
| 68 | +def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: |
| 69 | + try: |
| 70 | + return json.loads(message.decode('utf-8')) |
| 71 | + except Exception as ex: |
| 72 | + exception_log("JSON decode error", ex) |
| 73 | + return None |
| 74 | + |
| 75 | + |
| 76 | +class StandardProcessor(AbstractProcessor[Dict[str, Any]]): |
78 | 77 |
|
| 78 | + def __init__(self, reader: Optional[IO[bytes]], writer: IO[bytes]): |
| 79 | + if not reader or not writer: |
| 80 | + raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) |
| 81 | + self._reader = reader |
| 82 | + self._writer = writer |
| 83 | + |
| 84 | + def write_data(self, data: Dict[str, Any]) -> None: |
| 85 | + body = encode_payload(data) |
| 86 | + self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) |
| 87 | + self._writer.flush() |
| 88 | + |
| 89 | + def read_data(self) -> Optional[Dict[str, Any]]: |
| 90 | + headers = http.client.parse_headers(self._reader) # type: ignore |
79 | 91 | try:
|
80 |
| - return self._decode(body) |
81 |
| - except Exception as ex: |
82 |
| - exception_log("JSON decode error", ex) |
83 |
| - return None |
84 |
| - |
85 |
| - @staticmethod |
86 |
| - def _encode(data: Dict[str, Any]) -> bytes: |
87 |
| - return json.dumps( |
88 |
| - data, |
89 |
| - ensure_ascii=False, |
90 |
| - check_circular=False, |
91 |
| - separators=(',', ':') |
92 |
| - ).encode('utf-8') |
93 |
| - |
94 |
| - @staticmethod |
95 |
| - def _decode(message: bytes) -> Dict[str, Any]: |
96 |
| - return json.loads(message.decode('utf-8')) |
| 92 | + body = self._reader.read(int(headers.get("Content-Length"))) |
| 93 | + except TypeError: |
| 94 | + # Expected error on process stopping. Stop the read loop. |
| 95 | + raise StopLoopError() |
| 96 | + return decode_payload(body) |
| 97 | + |
| 98 | + |
| 99 | +class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): |
| 100 | + _buf = bytearray() |
| 101 | + _lines = 0 |
| 102 | + |
| 103 | + def __init__(self, conn: multiprocessing.connection._ConnectionBase): |
| 104 | + self._conn = conn |
| 105 | + |
| 106 | + def write_data(self, data: Dict[str, Any]) -> None: |
| 107 | + body = encode_payload(data) + b"\n" |
| 108 | + while len(body): |
| 109 | + n = self._conn._write(self._conn.fileno(), body) # type: ignore |
| 110 | + body = body[n:] |
| 111 | + |
| 112 | + def read_data(self) -> Optional[Dict[str, Any]]: |
| 113 | + while self._lines == 0: |
| 114 | + chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore |
| 115 | + if len(chunk) == 0: |
| 116 | + # EOF reached: https://docs.python.org/3/library/os.html#os.read |
| 117 | + raise StopLoopError() |
| 118 | + |
| 119 | + self._buf += chunk |
| 120 | + self._lines += chunk.count(b'\n') |
| 121 | + |
| 122 | + self._lines -= 1 |
| 123 | + message, _, self._buf = self._buf.partition(b'\n') |
| 124 | + return decode_payload(message) |
97 | 125 |
|
98 | 126 |
|
99 | 127 | class ProcessTransport(Transport[T]):
|
100 | 128 |
|
101 |
| - def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], |
102 |
| - writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], |
103 |
| - callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: |
| 129 | + def __init__(self, |
| 130 | + name: str, |
| 131 | + process: subprocess.Popen, |
| 132 | + socket: Optional[socket.socket], |
| 133 | + stderr: Optional[IO[bytes]], |
| 134 | + processor: AbstractProcessor[T], |
| 135 | + callback_object: TransportCallbacks[T]) -> None: |
104 | 136 | self._closed = False
|
105 | 137 | self._process = process
|
106 | 138 | self._socket = socket
|
107 |
| - self._reader = reader |
108 |
| - self._writer = writer |
109 | 139 | self._stderr = stderr
|
110 | 140 | self._processor = processor
|
111 | 141 | self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name))
|
112 | 142 | self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
|
113 | 143 | self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
|
114 | 144 | self._callback_object = weakref.ref(callback_object)
|
115 |
| - self._is_node_ipc = is_node_ipc |
116 | 145 | self._send_queue = Queue(0) # type: Queue[Union[T, None]]
|
117 | 146 | self._reader_thread.start()
|
118 | 147 | self._writer_thread.start()
|
@@ -144,8 +173,8 @@ def __del__(self) -> None:
|
144 | 173 |
|
145 | 174 | def _read_loop(self) -> None:
|
146 | 175 | try:
|
147 |
| - while self._reader: |
148 |
| - payload = self._processor.read_data(self._reader, self._is_node_ipc) |
| 176 | + while True: |
| 177 | + payload = self._processor.read_data() |
149 | 178 | if payload is None:
|
150 | 179 | continue
|
151 | 180 |
|
@@ -194,13 +223,11 @@ def invoke() -> None:
|
194 | 223 | def _write_loop(self) -> None:
|
195 | 224 | exception = None # type: Optional[Exception]
|
196 | 225 | try:
|
197 |
| - while self._writer: |
| 226 | + while True: |
198 | 227 | d = self._send_queue.get()
|
199 | 228 | if d is None:
|
200 | 229 | break
|
201 |
| - self._processor.write_data(self._writer, d, self._is_node_ipc) |
202 |
| - if not self._is_node_ipc: |
203 |
| - self._writer.flush() |
| 230 | + self._processor.write_data(d) |
204 | 231 | except (BrokenPipeError, AttributeError):
|
205 | 232 | pass
|
206 | 233 | except Exception as ex:
|
@@ -228,35 +255,6 @@ def _stderr_loop(self) -> None:
|
228 | 255 | self._send_queue.put_nowait(None)
|
229 | 256 |
|
230 | 257 |
|
231 |
| -# Can be a singleton since it doesn't hold any state. |
232 |
| -json_rpc_processor = JsonRpcProcessor() |
233 |
| - |
234 |
| - |
235 |
| -class NodeIpcIO(): |
236 |
| - _buf = bytearray() |
237 |
| - _lines = 0 |
238 |
| - |
239 |
| - def __init__(self, conn: multiprocessing.connection._ConnectionBase): |
240 |
| - self._conn = conn |
241 |
| - |
242 |
| - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 |
243 |
| - def readline(self) -> bytearray: |
244 |
| - while self._lines == 0: |
245 |
| - chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore |
246 |
| - self._buf += chunk |
247 |
| - self._lines += chunk.count(b'\n') |
248 |
| - |
249 |
| - self._lines -= 1 |
250 |
| - line, _, self._buf = self._buf.partition(b'\n') |
251 |
| - return line |
252 |
| - |
253 |
| - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 |
254 |
| - def write(self, data: bytes) -> None: |
255 |
| - while len(data): |
256 |
| - n = self._conn._write(self._conn.fileno(), data) # type: ignore |
257 |
| - data = data[n:] |
258 |
| - |
259 |
| - |
260 | 258 | def create_transport(config: TransportConfig, cwd: Optional[str],
|
261 | 259 | callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
|
262 | 260 | stderr = subprocess.PIPE
|
@@ -292,24 +290,22 @@ def start_subprocess() -> subprocess.Popen:
|
292 | 290 | config.listener_socket,
|
293 | 291 | start_subprocess
|
294 | 292 | )
|
| 293 | + processor = StandardProcessor(reader, writer) # type: AbstractProcessor |
295 | 294 | else:
|
296 | 295 | process = start_subprocess()
|
297 | 296 | if config.tcp_port:
|
298 | 297 | sock = _connect_tcp(config.tcp_port)
|
299 | 298 | if sock is None:
|
300 | 299 | raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
|
301 |
| - reader = sock.makefile('rwb') # type: ignore |
302 |
| - writer = reader |
| 300 | + reader = writer = sock.makefile('rwb') |
| 301 | + processor = StandardProcessor(reader, writer) |
303 | 302 | elif not config.node_ipc:
|
304 |
| - reader = process.stdout # type: ignore |
305 |
| - writer = process.stdin # type: ignore |
| 303 | + processor = StandardProcessor(process.stdout, process.stdin) # type: ignore |
306 | 304 | else:
|
307 |
| - reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore |
308 |
| - if not reader or not writer: |
309 |
| - raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) |
| 305 | + processor = NodeIpcProcessor(config.node_ipc.parent_conn) |
| 306 | + |
310 | 307 | stderr_reader = process.stdout if config.node_ipc else process.stderr
|
311 |
| - return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, |
312 |
| - callback_object, bool(config.node_ipc)) |
| 308 | + return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) |
313 | 309 |
|
314 | 310 |
|
315 | 311 | _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
|
@@ -403,8 +399,7 @@ def start_in_background(d: _SubprocessData) -> None:
|
403 | 399 | # Await one client connection (blocking!)
|
404 | 400 | sock, _ = listener_socket.accept()
|
405 | 401 | thread.join()
|
406 |
| - reader = sock.makefile('rwb') # type: IO[bytes] |
407 |
| - writer = reader |
| 402 | + reader = writer = sock.makefile('rwb') |
408 | 403 | assert data.process
|
409 | 404 | return data.process, sock, reader, writer
|
410 | 405 |
|
|
0 commit comments