Skip to content
This repository was archived by the owner on Dec 27, 2023. It is now read-only.

Commit c7b84a5

Browse files
committed
Add support for parallel execution in host
We define a list of processes which are currently running inside host and we populate it each time run_command() implementation is called. In this way, we keep a list of running processes and all of them can be killed by running stop() method. Signed-off-by: Andrea Cervesato <[email protected]>
1 parent 232c7e7 commit c7b84a5

File tree

2 files changed

+187
-78
lines changed

2 files changed

+187
-78
lines changed

ltp/host.py

Lines changed: 87 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def __init__(self) -> None:
3333
self._initialized = False
3434
self._cmd_lock = threading.Lock()
3535
self._fetch_lock = threading.Lock()
36-
self._proc = None
36+
self._procs = []
3737
self._stop = False
3838
self._cwd = None
3939
self._env = None
@@ -85,18 +85,23 @@ def _inner_stop(self, sig: int, timeout: float = 30) -> None:
8585

8686
self._stop = True
8787

88-
if self._proc:
89-
self._logger.info("Terminating process with %s", sig)
90-
self._proc.send_signal(sig)
91-
9288
with Timeout(timeout) as timer:
93-
while self._fetch_lock.locked():
94-
time.sleep(1e-6)
95-
timer.check(err_msg="Timeout waiting for command to stop")
89+
if self._procs:
90+
self._logger.info(
91+
"Terminating %d process(es) with %s",
92+
len(self._procs), sig)
93+
94+
for proc in self._procs:
95+
proc.send_signal(sig)
96+
97+
while proc.poll() is None:
98+
time.sleep(1e-6)
99+
timer.check(
100+
err_msg="Timeout waiting for command to stop")
96101

97-
while self._cmd_lock.locked():
102+
while self._fetch_lock.locked():
98103
time.sleep(1e-6)
99-
timer.check(err_msg="Timeout waiting for command to stop")
104+
timer.check(err_msg="Timeout waiting to fetch file")
100105

101106
self._logger.info("Process terminated")
102107

@@ -114,14 +119,18 @@ def force_stop(
114119
iobuffer: IOBuffer = None) -> None:
115120
self._inner_stop(signal.SIGKILL, timeout)
116121

117-
def _read_stdout(self, size: int, iobuffer: IOBuffer = None) -> str:
122+
def _read_stdout(
123+
self,
124+
proc: subprocess.Popen,
125+
size: int,
126+
iobuffer: IOBuffer = None) -> str:
118127
"""
119128
Read data from stdout.
120129
"""
121130
if not self.is_running:
122131
return None
123132

124-
data = os.read(self._proc.stdout.fileno(), size)
133+
data = os.read(proc.stdout.fileno(), size)
125134
rdata = data.decode(encoding="utf-8", errors="replace")
126135
rdata = rdata.replace('\r', '')
127136

@@ -143,82 +152,82 @@ def run_command(self,
143152
if not self.is_running:
144153
raise SUTError("SUT is not running")
145154

146-
with self._cmd_lock:
147-
t_secs = max(timeout, 0)
148-
149-
self._logger.info("Executing command (timeout=%d): %s",
150-
t_secs,
151-
command)
152-
153-
# pylint: disable=consider-using-with
154-
self._proc = subprocess.Popen(
155-
command,
156-
stdout=subprocess.PIPE,
157-
stderr=subprocess.STDOUT,
158-
cwd=self._cwd,
159-
env=self._env,
160-
shell=True)
161-
162-
ret = None
163-
t_start = time.time()
164-
t_end = 0
165-
stdout = ""
155+
t_secs = max(timeout, 0)
156+
157+
self._logger.info(
158+
"Executing command (timeout=%.3f): %s",
159+
t_secs,
160+
command)
161+
162+
# pylint: disable=consider-using-with
163+
proc = subprocess.Popen(
164+
command,
165+
stdout=subprocess.PIPE,
166+
stderr=subprocess.STDOUT,
167+
cwd=self._cwd,
168+
env=self._env,
169+
shell=True)
170+
171+
self._procs.append(proc)
172+
173+
ret = None
174+
t_start = time.time()
175+
t_end = 0
176+
stdout = ""
177+
178+
try:
179+
poller = select.epoll()
180+
poller.register(
181+
proc.stdout.fileno(),
182+
select.POLLIN |
183+
select.POLLPRI |
184+
select.POLLHUP |
185+
select.POLLERR)
186+
187+
with Timeout(timeout) as timer:
188+
while True:
189+
events = poller.poll(0.1)
190+
for fdesc, _ in events:
191+
if fdesc != proc.stdout.fileno():
192+
break
166193

167-
try:
168-
poller = select.epoll()
169-
poller.register(
170-
self._proc.stdout.fileno(),
171-
select.POLLIN |
172-
select.POLLPRI |
173-
select.POLLHUP |
174-
select.POLLERR)
194+
data = self._read_stdout(proc, 1024, iobuffer)
195+
if data:
196+
stdout += data
175197

176-
with Timeout(timeout) as timer:
177-
while True:
178-
events = poller.poll(0.1)
179-
for fdesc, _ in events:
180-
if fdesc != self._proc.stdout.fileno():
181-
break
182-
183-
data = self._read_stdout(1024, iobuffer)
184-
if data:
185-
stdout += data
198+
if proc.poll() is not None:
199+
break
186200

187-
if self._proc.poll() is not None:
188-
break
201+
timer.check(
202+
err_msg="Timeout during command execution",
203+
exc=SUTTimeoutError)
189204

190-
timer.check(
191-
err_msg="Timeout during command execution",
192-
exc=SUTTimeoutError)
205+
t_end = time.time() - t_start
193206

194-
t_end = time.time() - t_start
207+
# once the process stopped, we still might have some data
208+
# inside the stdout buffer
209+
while not self._stop:
210+
data = self._read_stdout(proc, 1024, iobuffer)
211+
if not data:
212+
break
195213

196-
# once the process stopped, we still might have some data
197-
# inside the stdout buffer
198-
while not self._stop:
199-
data = self._read_stdout(1024, iobuffer)
200-
if not data:
201-
break
214+
stdout += data
215+
finally:
216+
self._procs.remove(proc)
202217

203-
stdout += data
204-
except subprocess.TimeoutExpired as err:
205-
self._proc.kill()
206-
raise SUTError(err)
207-
finally:
208-
ret = {
209-
"command": command,
210-
"stdout": stdout,
211-
"returncode": self._proc.returncode,
212-
"timeout": t_secs,
213-
"exec_time": t_end,
214-
}
218+
ret = {
219+
"command": command,
220+
"stdout": stdout,
221+
"returncode": proc.returncode,
222+
"timeout": t_secs,
223+
"exec_time": t_end,
224+
}
215225

216-
self._logger.debug("return data=%s", ret)
217-
self._proc = None
226+
self._logger.debug("return data=%s", ret)
218227

219-
self._logger.info("Command executed")
228+
self._logger.info("Command executed")
220229

221-
return ret
230+
return ret
222231

223232
def fetch_file(
224233
self,

ltp/tests/test_host.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
"""
22
Test SUT implementations.
33
"""
4+
import os
5+
import time
6+
import threading
7+
from concurrent.futures import ThreadPoolExecutor
48
import pytest
9+
from ltp.sut import IOBuffer
10+
from ltp.sut import SUTTimeoutError
511
from ltp.host import HostSUT
612
from ltp.tests.sut import _TestSUT
713
from ltp.tests.sut import Printer
@@ -59,3 +65,97 @@ def test_env(self, tmpdir):
5965
ret = sut.run_command("cat $FILE", timeout=2, iobuffer=Printer())
6066
assert ret["returncode"] == 0
6167
assert ret["stdout"] == "runltp-ng tests"
68+
69+
def test_multiple_commands(self, sut):
70+
"""
71+
Execute run_command multiple times.
72+
"""
73+
sut.communicate()
74+
75+
def _runner(index):
76+
return sut.run_command(f"echo -n {index}", timeout=15)
77+
78+
results = []
79+
80+
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
81+
for result in executor.map(_runner, range(100)):
82+
results.append(result)
83+
84+
for i in range(100):
85+
data = results[i]
86+
87+
assert data["command"] == f"echo -n {i}"
88+
assert data["timeout"] == 15
89+
assert data["returncode"] == 0
90+
assert data["stdout"] == f"{i}"
91+
assert 0 < data["exec_time"] < time.time()
92+
93+
def test_multiple_commands_timeout(self, sut):
94+
"""
95+
Execute run_command multiple times with low timeout.
96+
"""
97+
sut.communicate()
98+
99+
def _runner(_):
100+
with pytest.raises(SUTTimeoutError):
101+
sut.run_command("sleep 1", timeout=0.1)
102+
103+
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
104+
executor.map(_runner, range(100))
105+
106+
def test_multiple_commands_stop(self, sut):
107+
"""
108+
Execute run_command multiple time, then call stop().
109+
"""
110+
class MyBuffer(IOBuffer):
111+
"""
112+
For each echo command, we store 1 inside `executed` list.
113+
At the end of all commands executions we know how many
114+
commands are sleeping by checking `executed` list.
115+
"""
116+
executed = []
117+
118+
def write(self, _: str) -> None:
119+
self.executed.append(1)
120+
121+
def flush(self) -> None:
122+
pass
123+
124+
buffer = MyBuffer()
125+
results = []
126+
cpu_count = os.cpu_count()
127+
exec_count = cpu_count - 1
128+
sut.communicate()
129+
130+
def _threaded():
131+
def _runner(index):
132+
return sut.run_command(
133+
f"echo -n {index}; sleep 3",
134+
timeout=5,
135+
iobuffer=buffer)
136+
137+
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
138+
for result in executor.map(_runner, range(exec_count)):
139+
results.append(result)
140+
141+
thread = threading.Thread(target=_threaded, daemon=True)
142+
thread.start()
143+
144+
while len(buffer.executed) < exec_count:
145+
time.sleep(0.001)
146+
continue
147+
148+
sut.force_stop()
149+
150+
while len(results) < exec_count:
151+
time.sleep(0.001)
152+
continue
153+
154+
for i in range(exec_count):
155+
data = results[i]
156+
157+
assert data["command"] == f"echo -n {i}; sleep 3"
158+
assert data["timeout"] == 5
159+
assert data["returncode"] != 0
160+
assert data["stdout"] == f"{i}"
161+
assert 0 < data["exec_time"] < time.time()

0 commit comments

Comments
 (0)