-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathsubprocess_with_timeout.py
More file actions
198 lines (157 loc) · 6.21 KB
/
subprocess_with_timeout.py
File metadata and controls
198 lines (157 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from select import select
from subprocess import PIPE, STDOUT, Popen
from threading import Thread, Condition, current_thread, main_thread
from time import time
import sys
import signal
from .denoise_client import deliver_kill_signal
from .output import output_as_str
from .subprocess_kill import kill_process
# Indicate timeout with standard exit code
E_TIMEOUT = -9
_signals_setup = False
def keyboard_interrupt_on_sigterm(signum, frame):
raise KeyboardInterrupt()
def _setup_signal_handling_if_needed():
if current_thread() is not main_thread():
return
global _signals_setup # pylint: disable=global-statement
if not _signals_setup:
_signals_setup = True
signal.signal(signal.SIGTERM, keyboard_interrupt_on_sigterm)
class _SubprocessThread(Thread):
def __init__(self, executable_name, args, env,
shell, cwd, verbose, stdout, stderr, stdin_input):
Thread.__init__(self, name="Subprocess %s" % executable_name)
self._args = args
self._shell = shell
self._cwd = cwd
self._verbose = verbose
self._stdout = stdout
self._stderr = stderr
self._stdin_input = stdin_input
self._env = env
self._pid = None
self._started_cv = Condition()
self.stdout_result = None
self.stderr_result = None
self.returncode = None
self._exception = None
@property
def exception(self):
return self._exception
def run(self):
try:
self._started_cv.acquire()
stdin = PIPE if self._stdin_input else None
# pylint: disable-next=consider-using-with
proc = Popen(self._args, shell=self._shell, cwd=self._cwd,
stdin=stdin, stdout=self._stdout, stderr=self._stderr, env=self._env)
self._pid = proc.pid
self._started_cv.notify()
self._started_cv.release()
if self._stdin_input:
proc.stdin.write(self._stdin_input)
proc.stdin.flush()
self.process_output(proc)
self.returncode = proc.returncode
except Exception as err: # pylint: disable=broad-except
self._exception = err
def get_pid(self):
self._started_cv.acquire()
while self._pid is None:
self._started_cv.wait()
self._started_cv.release()
return self._pid
def process_output(self, proc):
if self._verbose and self._stdout == PIPE and self._stderr in (PIPE, STDOUT):
self.stdout_result = ""
self.stderr_result = ""
stdout_eof = False
stderr_eof = False
while True:
reads = []
if proc.stdout and not proc.stdout.closed and not stdout_eof:
reads.append(proc.stdout.fileno())
if (self._stderr == PIPE and
proc.stderr and
not proc.stderr.closed and
not stderr_eof):
reads.append(proc.stderr.fileno())
if not reads:
proc.wait()
break
ret = select(reads, [], [], 0.1)
for file_no in ret[0]:
if file_no == proc.stdout.fileno():
read = output_as_str(proc.stdout.readline())
if read == "":
stdout_eof = True
else:
sys.stdout.write(read)
self.stdout_result += read
if self._stderr == PIPE and file_no == proc.stderr.fileno():
read = output_as_str(proc.stderr.readline())
if read == "":
stderr_eof = True
else:
sys.stderr.write(read)
self.stderr_result += read
else:
stdout_r, stderr_r = proc.communicate()
self.stdout_result = output_as_str(stdout_r)
self.stderr_result = output_as_str(stderr_r)
def _print_keep_alive(seconds_since_start):
print("Keep alive, current job runs for %dmin\n" % (seconds_since_start / 60))
def run(args, env, cwd=None, shell=False, kill_tree=True, timeout=-1,
verbose=False, stdout=PIPE, stderr=PIPE, stdin_input=None,
keep_alive_output=_print_keep_alive, uses_sudo=False):
"""
Run a command with a timeout after which it will be forcibly
killed.
"""
_setup_signal_handling_if_needed()
executable_name = args.split(" ", 1)[0]
thread = _SubprocessThread(executable_name, args, env, shell, cwd, verbose, stdout,
stderr, stdin_input)
thread.start()
was_interrupted = False
try:
_join_with_keep_alive(keep_alive_output, thread, timeout)
except KeyboardInterrupt:
was_interrupted = True
if (timeout != -1 or was_interrupted) and thread.is_alive():
assert thread.get_pid() is not None
result = kill_process(thread.get_pid(), kill_tree, thread,
deliver_kill_signal if uses_sudo else None)
if was_interrupted:
raise KeyboardInterrupt()
return result
if was_interrupted:
raise KeyboardInterrupt()
if not thread.is_alive():
exp = thread.exception
if exp:
raise exp # pylint: disable=raising-bad-type
return thread.returncode, thread.stdout_result, thread.stderr_result
def _join_with_keep_alive(keep_alive_output, thread, timeout):
if timeout == -1:
thread.join()
else:
t10min = 10 * 60
if timeout < t10min:
thread.join(timeout)
else:
start = time()
diff = 0
while diff < timeout:
if t10min < timeout - diff:
max_10min_timeout = t10min
else:
max_10min_timeout = timeout - diff
thread.join(max_10min_timeout)
if not thread.is_alive():
break
diff = time() - start
if diff < timeout:
keep_alive_output(diff)