Skip to content

Commit a8b500d

Browse files
authored
Refactored kill/killpg mechanism to be unified and actually fail on n… (#625)
* Refactored kill/killpg mechanism to be unified and actually fail on not process not found * Implementation change refactor and clarifications
1 parent 1016234 commit a8b500d

File tree

3 files changed

+56
-51
lines changed

3 files changed

+56
-51
lines changed

lib/process_helpers.py

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,28 @@
22
import os
33
import subprocess
44

5-
def kill_ps(ps_to_kill):
6-
print('Killing processes')
7-
for ps_info in ps_to_kill:
8-
pid = ps_info['ps'].pid
9-
print(f"Trying to kill {ps_info['cmd']} with PID: {pid}")
10-
try:
11-
if ps_info['ps_group'] is True:
12-
try:
13-
ps_group_id = os.getpgid(pid)
14-
print(f" with process group {ps_group_id}")
15-
os.killpg(os.getpgid(pid), signal.SIGTERM)
16-
try:
17-
ps_info['ps'].wait(timeout=5)
18-
except subprocess.TimeoutExpired:
19-
# If the process hasn't gracefully exited after 5 seconds we kill it
20-
os.killpg(os.getpgid(pid), signal.SIGKILL)
21-
except ProcessLookupError:
22-
# process may be not have been in a process group
23-
print(f"Could not find process-group for {pid}")
24-
# always, just in case the calling process (typically the shell) did not die
25-
os.kill(pid, signal.SIGTERM)
26-
try:
27-
ps_info['ps'].wait(timeout=5)
28-
except subprocess.TimeoutExpired:
29-
# If the process hasn't gracefully exited after 5 seconds we kill it
30-
os.kill(pid, signal.SIGKILL)
5+
def kill_pg(ps, cmd):
6+
pgid = os.getpgid(ps.pid)
7+
print(f"Trying to kill {cmd} with PGID: {pgid}")
8+
9+
os.killpg(pgid, signal.SIGTERM)
10+
try:
11+
ps.wait(timeout=10)
12+
except subprocess.TimeoutExpired as exc:
13+
# If the process hasn't gracefully exited after 5 seconds we kill it
14+
os.killpg(pgid, signal.SIGKILL)
15+
raise RuntimeError(f"Killed the process {cmd} with SIGKILL. This could lead to corrupted data!") from exc
3116

32-
except ProcessLookupError:
33-
# process may already have ended or been killed in the process group
34-
print(f"Could not find process {pid}")
17+
def kill_ps(ps, cmd):
18+
pid = ps.pid
19+
print(f"Trying to kill {cmd} with PID: {pid}")
20+
21+
os.kill(pid, signal.SIGTERM)
22+
try:
23+
ps.wait(timeout=10)
24+
except subprocess.TimeoutExpired as exc:
25+
os.kill(pid, signal.SIGKILL)
26+
raise RuntimeError(f"Killed the process {cmd} with SIGKILL. This could lead to corrupted data!") from exc
3527

3628
# currently unused
3729
def timeout(process, cmd: str, duration: int):

metric_providers/base.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import os
22
from pathlib import Path
33
import subprocess
4-
import signal
5-
import sys
64
from io import StringIO
75
import pandas
86

97
from lib.system_checks import ConfigurationCheckError
8+
from lib import process_helpers
109

1110
class MetricProviderConfigurationError(ConfigurationCheckError):
1211
pass
@@ -146,20 +145,6 @@ def start_profiling(self, containers=None):
146145
def stop_profiling(self):
147146
if self._ps is None:
148147
return
149-
try:
150-
print(f"Killing process with id: {self._ps.pid}")
151-
ps_group_id = os.getpgid(self._ps.pid)
152-
print(f" and process group {ps_group_id}")
153-
os.killpg(ps_group_id, signal.SIGTERM)
154-
try:
155-
self._ps.wait(timeout=5)
156-
except subprocess.TimeoutExpired:
157-
# If the process hasn't gracefully exited after 5 seconds we kill it
158-
os.killpg(ps_group_id, signal.SIGKILL)
159-
print("Killed the process with SIGKILL. This could lead to corrupted metric log files!")
160-
161-
except ProcessLookupError:
162-
print(f"Could not find process-group for {self._ps.pid}",
163-
file=sys.stderr) # process/-group may have already closed
164148

149+
process_helpers.kill_pg(self._ps, self._metric_provider_executable)
165150
self._ps = None

runner.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,7 @@ def run_flows(self):
10581058
if stdout_behaviour == subprocess.PIPE:
10591059
os.set_blocking(ps.stdout.fileno(), False)
10601060

1061-
self.__ps_to_kill.append({'ps': ps, 'cmd': inner_el['command'], 'ps_group': False})
1061+
self.__ps_to_kill.append({'ps': ps, 'cmd': inner_el['command']})
10621062
else:
10631063
print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-runtime']}s runtime ...")
10641064
ps = subprocess.run(
@@ -1103,7 +1103,11 @@ def stop_metric_providers(self):
11031103
if stderr_read is not None and str(stderr_read):
11041104
errors.append(f"Stderr on {metric_provider.__class__.__name__} was NOT empty: {stderr_read}")
11051105

1106-
metric_provider.stop_profiling()
1106+
# pylint: disable=broad-exception-caught
1107+
try:
1108+
metric_provider.stop_profiling()
1109+
except Exception as exc:
1110+
errors.append(f"Could not stop profiling on {metric_provider.__class__.__name__}: {str(exc)}")
11071111

11081112
df = metric_provider.read_metrics(self._run_id, self.__containers)
11091113
if isinstance(df, int):
@@ -1124,7 +1128,17 @@ def stop_metric_providers(self):
11241128

11251129
def read_and_cleanup_processes(self):
11261130
print(TerminalColors.HEADER, '\nReading process stdout/stderr (if selected) and cleaning them up', TerminalColors.ENDC)
1127-
process_helpers.kill_ps(self.__ps_to_kill)
1131+
1132+
ps_errors = []
1133+
for ps in self.__ps_to_kill:
1134+
try:
1135+
process_helpers.kill_ps(ps['ps'], ps['cmd'])
1136+
except ProcessLookupError as exc: # Process might have done expected exit already. However all other errors shall bubble
1137+
ps_errors.append(f"Could not kill {ps['cmd']}. Exception: {exc}")
1138+
if ps_errors:
1139+
raise RuntimeError(ps_errors)
1140+
self.__ps_to_kill = [] # we need to clear, so we do not kill twice later
1141+
11281142
for ps in self.__ps_to_read:
11291143
if ps['detach']:
11301144
stdout, stderr = ps['ps'].communicate(timeout=5)
@@ -1240,7 +1254,12 @@ def cleanup(self):
12401254

12411255
print('Stopping metric providers')
12421256
for metric_provider in self.__metric_providers:
1243-
metric_provider.stop_profiling()
1257+
# pylint: disable=broad-exception-caught
1258+
try:
1259+
metric_provider.stop_profiling()
1260+
except Exception as exc:
1261+
error_helpers.log_error(f"Could not stop profiling on {metric_provider.__class__.__name__}: {str(exc)}")
1262+
12441263

12451264
print('Stopping containers')
12461265
for container_id in self.__containers:
@@ -1257,7 +1276,16 @@ def cleanup(self):
12571276

12581277
self.remove_docker_images()
12591278

1260-
process_helpers.kill_ps(self.__ps_to_kill)
1279+
ps_errors = []
1280+
for ps in self.__ps_to_kill:
1281+
try:
1282+
process_helpers.kill_ps(ps['ps'], ps['cmd'])
1283+
except ProcessLookupError as exc: # Process might have done expected exit already. However all other errors shall bubble
1284+
ps_errors.append(f"Could not kill {ps['cmd']}. Exception: {exc}")
1285+
if ps_errors:
1286+
raise RuntimeError(ps_errors)
1287+
1288+
12611289
print(TerminalColors.OKBLUE, '-Cleanup gracefully completed', TerminalColors.ENDC)
12621290

12631291
self.__notes_helper = Notes()

0 commit comments

Comments
 (0)