|
32 | 32 | import functools
|
33 | 33 | import multiprocessing
|
34 | 34 | import threading
|
| 35 | +import os |
| 36 | +import psutil |
| 37 | +import signal |
35 | 38 |
|
36 | 39 | from absl import logging
|
37 | 40 | # pylint: disable=unused-import
|
38 | 41 | from compiler_opt.distributed.worker import Worker
|
39 | 42 |
|
40 | 43 | from contextlib import AbstractContextManager
|
41 | 44 | from multiprocessing import connection
|
42 |
| -from typing import Any, Callable, Dict, Optional |
| 45 | +from typing import Any, Callable, Dict, Optional, List |
43 | 46 |
|
44 | 47 |
|
45 | 48 | @dataclasses.dataclass(frozen=True)
|
@@ -131,6 +134,7 @@ def __init__(self):
|
131 | 134 | # when we stop.
|
132 | 135 | self._lock = threading.Lock()
|
133 | 136 | self._map: Dict[int, concurrent.futures.Future] = {}
|
| 137 | + self.is_paused = False |
134 | 138 |
|
135 | 139 | # thread draining the pipe
|
136 | 140 | self._pump = threading.Thread(target=self._msg_pump)
|
@@ -205,10 +209,37 @@ def shutdown(self):
|
205 | 209 | try:
|
206 | 210 | # Killing the process triggers observer exit, which triggers msg_pump
|
207 | 211 | # exit
|
| 212 | + self.resume() |
208 | 213 | self._process.kill()
|
209 | 214 | except: # pylint: disable=bare-except
|
210 | 215 | pass
|
211 | 216 |
|
| 217 | + def pause(self): |
| 218 | + if self.is_paused: |
| 219 | + return |
| 220 | + self.is_paused = True |
| 221 | + # used to send the STOP signal; does not actually kill the process |
| 222 | + os.kill(self._process.pid, signal.SIGSTOP) |
| 223 | + |
| 224 | + def resume(self): |
| 225 | + if not self.is_paused: |
| 226 | + return |
| 227 | + self.is_paused = False |
| 228 | + # used to send the CONTINUE signal; does not actually kill the process |
| 229 | + os.kill(self._process.pid, signal.SIGCONT) |
| 230 | + |
| 231 | + def set_nice(self, val: int): |
| 232 | + """Sets the nice-ness of the process, this modifies how the OS |
| 233 | + schedules it. Only works on Unix, since val is presumed to be an int. |
| 234 | + """ |
| 235 | + psutil.Process(self._process.pid).nice(val) |
| 236 | + |
| 237 | + def set_affinity(self, val: List[int]): |
| 238 | + """Sets the CPU affinity of the process, this modifies which cores the OS |
| 239 | + schedules it on. |
| 240 | + """ |
| 241 | + psutil.Process(self._process.pid).cpu_affinity(val) |
| 242 | + |
212 | 243 | def join(self):
|
213 | 244 | self._observer.join()
|
214 | 245 | self._pump.join()
|
@@ -242,3 +273,11 @@ def __exit__(self, *args):
|
242 | 273 | # now wait for the message pumps to indicate they exit.
|
243 | 274 | for s in self._stubs:
|
244 | 275 | s.join()
|
| 276 | + |
| 277 | + def __del__(self): |
| 278 | + self.__exit__() |
| 279 | + |
| 280 | + @property |
| 281 | + def stubs(self): |
| 282 | + # Return a shallow copy, to avoid something messing the internal list up |
| 283 | + return list(self._stubs) |
0 commit comments