Skip to content

Commit ed1619a

Browse files
committed
Runners refactoring
This is a refactoring of runners. It moves most of the code related to running and monitoring processes into base class. This change removes duplicities across the runners, and it will help with implementation of common features like signal handling. Also it makes the future development of new runners easier, because developers can focus on actual runner and don't have to deal with things like process monitoring, logging and exception handling Signed-off-by: Jan Richter <jarichte@redhat.com>
1 parent fbe6a75 commit ed1619a

File tree

17 files changed

+306
-610
lines changed

17 files changed

+306
-610
lines changed

avocado/core/nrunner/runner.py

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import multiprocessing
2+
import signal
13
import time
4+
import traceback
25

6+
from avocado.core.exceptions import TestInterrupt
37
from avocado.core.nrunner.runnable import RUNNERS_REGISTRY_STANDALONE_EXECUTABLE
48
from avocado.core.plugin_interfaces import RunnableRunner
9+
from avocado.core.utils import messages
510

611
#: The amount of time (in seconds) between each internal status check
712
RUNNER_RUN_CHECK_INTERVAL = 0.01
@@ -46,6 +51,11 @@ class BaseRunner(RunnableRunner):
4651
#: this runners makes use of.
4752
CONFIGURATION_USED = []
4853

54+
@staticmethod
55+
def signal_handler(signum, frame): # pylint: disable=W0613
56+
if signum == signal.SIGTERM.value:
57+
raise TestInterrupt("Test interrupted: Timeout reached")
58+
4959
@staticmethod
5060
def prepare_status(status_type, additional_info=None):
5161
"""Prepare a status dict with some basic information.
@@ -66,23 +76,80 @@ def prepare_status(status_type, additional_info=None):
6676
status.update({"status": status_type, "time": time.monotonic()})
6777
return status
6878

69-
def running_loop(self, condition):
70-
"""Produces timely running messages until end condition is found.
79+
@staticmethod
80+
def _monitor(queue):
81+
most_recent_status_time = None
82+
while True:
83+
time.sleep(RUNNER_RUN_CHECK_INTERVAL)
84+
if queue.empty():
85+
now = time.monotonic()
86+
if (
87+
most_recent_status_time is None
88+
or now >= most_recent_status_time + RUNNER_RUN_STATUS_INTERVAL
89+
):
90+
most_recent_status_time = now
91+
yield messages.RunningMessage.get()
92+
continue
93+
else:
94+
message = queue.get()
95+
yield message
96+
if message.get("status") == "finished":
97+
break
98+
99+
def _catch_errors(self, runnable, queue):
100+
"""Wrapper around runners methods for catching and logging failures."""
101+
try:
102+
messages.start_logging(runnable.config, queue)
103+
signal.signal(signal.SIGTERM, self.signal_handler)
104+
for message in self._run(runnable):
105+
queue.put(message)
106+
except TestInterrupt as e:
107+
queue.put(messages.StderrMessage.get(str(e)))
108+
queue.put(messages.FinishedMessage.get("interrupted", fail_reason=str(e)))
109+
except Exception as e:
110+
queue.put(messages.StderrMessage.get(traceback.format_exc()))
111+
queue.put(
112+
messages.FinishedMessage.get(
113+
"error",
114+
fail_reason=str(e),
115+
fail_class=e.__class__.__name__,
116+
traceback=traceback.format_exc(),
117+
)
118+
)
119+
120+
def run(self, runnable):
121+
# pylint: disable=W0201
122+
signal.signal(signal.SIGTERM, self.signal_handler)
123+
self.runnable = runnable
124+
yield messages.StartedMessage.get()
125+
try:
126+
queue = multiprocessing.SimpleQueue()
127+
process = multiprocessing.Process(
128+
target=self._catch_errors, args=(self.runnable, queue)
129+
)
130+
131+
process.start()
132+
133+
for message in self._monitor(queue):
134+
yield message
135+
136+
except TestInterrupt:
137+
process.terminate()
138+
for message in self._monitor(queue):
139+
yield message
140+
except Exception as e:
141+
yield messages.StderrMessage.get(traceback.format_exc())
142+
yield messages.FinishedMessage.get(
143+
"error",
144+
fail_reason=str(e),
145+
fail_class=e.__class__.__name__,
146+
traceback=traceback.format_exc(),
147+
)
148+
149+
def _run(self, runnable):
150+
"""
151+
Run the Runnable
71152
72-
:param condition: a callable that will be evaluated as a
73-
condition for continuing the loop
153+
:param runnable: the runnable object
154+
:type runnable: :class:`Runnable`
74155
"""
75-
most_current_execution_state_time = None
76-
while not condition():
77-
now = time.monotonic()
78-
if most_current_execution_state_time is not None:
79-
next_execution_state_mark = (
80-
most_current_execution_state_time + RUNNER_RUN_STATUS_INTERVAL
81-
)
82-
if (
83-
most_current_execution_state_time is None
84-
or now > next_execution_state_mark
85-
):
86-
most_current_execution_state_time = now
87-
yield self.prepare_status("running")
88-
time.sleep(RUNNER_RUN_CHECK_INTERVAL)

avocado/plugins/runners/asset.py

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import sys
2-
import time
3-
from multiprocessing import Process, SimpleQueue, set_start_method
2+
from multiprocessing import set_start_method
43

54
from avocado.core.nrunner.app import BaseRunnerApp
6-
from avocado.core.nrunner.runner import RUNNER_RUN_STATUS_INTERVAL, BaseRunner
5+
from avocado.core.nrunner.runner import BaseRunner
76
from avocado.core.settings import settings
87
from avocado.utils import data_structures
98
from avocado.utils.asset import Asset
@@ -36,7 +35,7 @@ class AssetRunner(BaseRunner):
3635
CONFIGURATION_USED = ["datadir.paths.cache_dirs"]
3736

3837
@staticmethod
39-
def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, queue):
38+
def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire):
4039

4140
asset_manager = Asset(
4241
name, asset_hash, algorithm, locations, cache_dirs, expire
@@ -52,50 +51,25 @@ def _fetch_asset(name, asset_hash, algorithm, locations, cache_dirs, expire, que
5251
result = "error"
5352
stderr = str(exc)
5453

55-
output = {"result": result, "stdout": stdout, "stderr": stderr}
56-
queue.put(output)
54+
return {"result": result, "stdout": stdout, "stderr": stderr}
5755

58-
def run(self, runnable):
59-
# pylint: disable=W0201
60-
self.runnable = runnable
61-
yield self.prepare_status("started")
62-
63-
name = self.runnable.kwargs.get("name")
56+
def _run(self, runnable):
57+
name = runnable.kwargs.get("name")
6458
# if name was passed correctly, run the Avocado Asset utility
6559
if name is not None:
66-
asset_hash = self.runnable.kwargs.get("asset_hash")
67-
algorithm = self.runnable.kwargs.get("algorithm")
68-
locations = self.runnable.kwargs.get("locations")
69-
expire = self.runnable.kwargs.get("expire")
60+
asset_hash = runnable.kwargs.get("asset_hash")
61+
algorithm = runnable.kwargs.get("algorithm")
62+
locations = runnable.kwargs.get("locations")
63+
expire = runnable.kwargs.get("expire")
7064
if expire is not None:
7165
expire = data_structures.time_to_seconds(str(expire))
7266

73-
cache_dirs = self.runnable.config.get("datadir.paths.cache_dirs")
67+
cache_dirs = runnable.config.get("datadir.paths.cache_dirs")
7468
if cache_dirs is None:
7569
cache_dirs = settings.as_dict().get("datadir.paths.cache_dirs")
76-
77-
# let's spawn it to another process to be able to update the
78-
# status messages and avoid the Asset to lock this process
79-
queue = SimpleQueue()
80-
process = Process(
81-
target=self._fetch_asset,
82-
args=(
83-
name,
84-
asset_hash,
85-
algorithm,
86-
locations,
87-
cache_dirs,
88-
expire,
89-
queue,
90-
),
70+
output = self._fetch_asset(
71+
name, asset_hash, algorithm, locations, cache_dirs, expire
9172
)
92-
process.start()
93-
94-
while queue.empty():
95-
time.sleep(RUNNER_RUN_STATUS_INTERVAL)
96-
yield self.prepare_status("running")
97-
98-
output = queue.get()
9973
result = output["result"]
10074
stdout = output["stdout"]
10175
stderr = output["stderr"]
@@ -104,7 +78,6 @@ def run(self, runnable):
10478
result = "error"
10579
stdout = ""
10680
stderr = 'At least name should be passed as kwargs using name="uri".'
107-
10881
yield self.prepare_status("running", {"type": "stdout", "log": stdout.encode()})
10982
yield self.prepare_status("running", {"type": "stderr", "log": stderr.encode()})
11083
yield self.prepare_status("finished", {"result": result})

0 commit comments

Comments
 (0)