Skip to content

Commit 0bda174

Browse files
committed
Suspend execution feature
This commit introduces the suspend execution feature to the nrunner. The suspend execution was available on the legacy runner, but we didn't move it to the nrunner. With this feature, it is possible to pause execution of python based task on process spawner by sending SIGTSTP signal (ctrl+z). It is helpful for debugging test execution. Reference: #6059 Signed-off-by: Jan Richter <jarichte@redhat.com>
1 parent cd711de commit 0bda174

File tree

9 files changed

+227
-9
lines changed

9 files changed

+227
-9
lines changed

avocado/core/nrunner/runner.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import multiprocessing
2+
import os
23
import signal
34
import time
45
import traceback
@@ -7,6 +8,7 @@
78
from avocado.core.nrunner.runnable import RUNNERS_REGISTRY_STANDALONE_EXECUTABLE
89
from avocado.core.plugin_interfaces import RunnableRunner
910
from avocado.core.utils import messages
11+
from avocado.utils import process
1012

1113
#: The amount of time (in seconds) between each internal status check
1214
RUNNER_RUN_CHECK_INTERVAL = 0.01
@@ -51,10 +53,29 @@ class BaseRunner(RunnableRunner):
5153
#: this runners makes use of.
5254
CONFIGURATION_USED = []
5355

54-
@staticmethod
55-
def signal_handler(signum, frame): # pylint: disable=W0613
56+
def __init__(self):
57+
super().__init__()
58+
self.proc = None
59+
self.process_stopped = False
60+
self.stop_signal = False
61+
62+
def signal_handler(self, signum, frame): # pylint: disable=W0613
5663
if signum == signal.SIGTERM.value:
5764
raise TestInterrupt("Test interrupted: Timeout reached")
65+
elif signum == signal.SIGTSTP.value:
66+
self.stop_signal = True
67+
68+
def pause_process(self):
69+
if self.process_stopped:
70+
self.process_stopped = False
71+
sign = signal.SIGCONT
72+
else:
73+
self.process_stopped = True
74+
sign = signal.SIGSTOP
75+
processes = process.get_children_pids(self.proc.pid, recursive=True)
76+
processes.append(self.proc.pid)
77+
for pid in processes:
78+
os.kill(pid, sign)
5879

5980
@staticmethod
6081
def prepare_status(status_type, additional_info=None):
@@ -76,11 +97,13 @@ def prepare_status(status_type, additional_info=None):
7697
status.update({"status": status_type, "time": time.monotonic()})
7798
return status
7899

79-
@staticmethod
80-
def _monitor(queue):
100+
def _monitor(self, queue):
81101
most_recent_status_time = None
82102
while True:
83103
time.sleep(RUNNER_RUN_CHECK_INTERVAL)
104+
if self.stop_signal:
105+
self.stop_signal = False
106+
self.pause_process()
84107
if queue.empty():
85108
now = time.monotonic()
86109
if (
@@ -118,23 +141,26 @@ def _catch_errors(self, runnable, queue):
118141
)
119142

120143
def run(self, runnable):
121-
# pylint: disable=W0201
144+
if hasattr(signal, "SIGTSTP"):
145+
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
146+
signal.signal(signal.SIGTSTP, self.signal_handler)
122147
signal.signal(signal.SIGTERM, self.signal_handler)
148+
# pylint: disable=W0201
123149
self.runnable = runnable
124150
yield messages.StartedMessage.get()
125151
try:
126152
queue = multiprocessing.SimpleQueue()
127-
process = multiprocessing.Process(
153+
self.proc = multiprocessing.Process(
128154
target=self._catch_errors, args=(self.runnable, queue)
129155
)
130156

131-
process.start()
157+
self.proc.start()
132158

133159
for message in self._monitor(queue):
134160
yield message
135161

136162
except TestInterrupt:
137-
process.terminate()
163+
self.proc.terminate()
138164
for message in self._monitor(queue):
139165
yield message
140166
except Exception as e:

avocado/core/plugin_interfaces.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,28 @@ async def terminate_task(self, runtime_task):
376376
:rtype: bool
377377
"""
378378

379+
async def stop_task(self, runtime_task):
380+
"""Stop already spawned task.
381+
382+
:param runtime_task: wrapper for a Task with additional runtime
383+
information.
384+
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
385+
:returns: whether the task has been stopped or not.
386+
:rtype: bool
387+
"""
388+
raise NotImplementedError()
389+
390+
async def resume_task(self, runtime_task):
391+
"""Resume already stopped task.
392+
393+
:param runtime_task: wrapper for a Task with additional runtime
394+
information.
395+
:type runtime_task: :class:`avocado.core.task.runtime.RuntimeTask`
396+
:returns: whether the task has been resumed or not.
397+
:rtype: bool
398+
"""
399+
raise NotImplementedError()
400+
379401
@staticmethod
380402
@abc.abstractmethod
381403
async def check_task_requirements(runtime_task):

avocado/core/task/runtime.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class RuntimeTaskStatus(Enum):
1818
FAIL_TRIAGE = "FINISHED WITH FAILURE ON TRIAGE"
1919
FAIL_START = "FINISHED FAILING TO START"
2020
STARTED = "STARTED"
21+
PAUSED = "PAUSED"
2122

2223
@staticmethod
2324
def finished_statuses():

avocado/core/task/statemachine.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66

77
from avocado.core.exceptions import JobFailFast
8+
from avocado.core.output import LOG_UI
89
from avocado.core.task.runtime import RuntimeTaskStatus
910
from avocado.core.teststatus import STATUSES_NOT_OK
1011
from avocado.core.utils import messages
@@ -493,6 +494,31 @@ async def terminate_tasks_interrupted(self):
493494
terminated = await self._terminate_tasks(task_status)
494495
await self._send_finished_tasks_message(terminated, "Interrupted by user")
495496

497+
@staticmethod
498+
async def stop_resume_tasks(state_machine, spawner):
499+
async with state_machine.lock:
500+
try:
501+
for runtime_task in state_machine.monitored:
502+
if runtime_task.status == RuntimeTaskStatus.STARTED:
503+
await spawner.stop_task(runtime_task)
504+
runtime_task.status = RuntimeTaskStatus.PAUSED
505+
LOG_UI.warning(
506+
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
507+
)
508+
elif runtime_task.status == RuntimeTaskStatus.PAUSED:
509+
await spawner.resume_task(runtime_task)
510+
runtime_task.status = RuntimeTaskStatus.STARTED
511+
LOG_UI.warning(
512+
f"{runtime_task.task.identifier}: {runtime_task.status.value}"
513+
)
514+
except NotImplementedError:
515+
LOG.warning(
516+
f"Sending signals to tasks is not implemented for spawner: {spawner}"
517+
)
518+
LOG_UI.warning(
519+
f"Sending signals to tasks is not implemented for spawner: {spawner}"
520+
)
521+
496522
async def run(self):
497523
"""Pushes Tasks forward and makes them do something with their lives."""
498524
while True:

avocado/plugins/runner_nrunner.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import os
2222
import platform
2323
import random
24+
import signal
2425
import tempfile
26+
import threading
2527

2628
from avocado.core.dispatcher import SpawnerDispatcher
2729
from avocado.core.exceptions import JobError, JobFailFast
@@ -269,6 +271,10 @@ def _abort_if_missing_runners(runnables):
269271
)
270272
raise JobError(msg)
271273

274+
@staticmethod
275+
def signal_handler(spawner, state_machine):
276+
asyncio.create_task(Worker.stop_resume_tasks(state_machine, spawner))
277+
272278
def run_suite(self, job, test_suite):
273279
summary = set()
274280

@@ -335,6 +341,14 @@ def run_suite(self, job, test_suite):
335341
]
336342
asyncio.ensure_future(self._update_status(job))
337343
loop = asyncio.get_event_loop()
344+
if (
345+
hasattr(signal, "SIGTSTP")
346+
and threading.current_thread() is threading.main_thread()
347+
):
348+
loop.add_signal_handler(
349+
signal.SIGTSTP,
350+
lambda: self.signal_handler(spawner, self.tsm),
351+
)
338352
try:
339353
try:
340354
loop.run_until_complete(

avocado/plugins/spawners/process.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import os
3+
import signal
34
import socket
45

56
from avocado.core.dependencies.requirements import cache
@@ -106,6 +107,16 @@ async def terminate_task(self, runtime_task):
106107
pass
107108
return returncode is not None
108109

110+
async def stop_task(self, runtime_task):
111+
try:
112+
runtime_task.spawner_handle.process.send_signal(signal.SIGTSTP)
113+
except ProcessLookupError:
114+
return False
115+
return
116+
117+
async def resume_task(self, runtime_task):
118+
await self.stop_task(runtime_task)
119+
109120
@staticmethod
110121
async def check_task_requirements(runtime_task):
111122
"""Check the runtime task requirements needed to be able to run"""

docs/source/guides/contributor/chapters/tips.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,27 @@ During the execution look for::
3333

3434
avocado --show avocado.utils.debug run examples/tests/assets.py
3535

36+
Interrupting test
37+
-----------------
38+
39+
In case you want to "pause" the running test, you can use SIGTSTP (ctrl+z)
40+
signal sent to the main avocado process. This signal is forwarded to test
41+
and it's children processes. To resume testing you repeat the same signal.
42+
43+
.. note::
44+
The job and test timeouts are still enabled on stopped processes. This
45+
means that after you restart the test can be killed by the timeout if
46+
the timeout was reached. You can use run `-p timeout_factor=$int$` to
47+
increase the timeouts for your debugging.
48+
49+
.. note::
50+
It is supported by the process spawner only.
51+
52+
.. warning::
53+
This feature is meant only for debugging purposes and it can
54+
cause unreliable behavior especially if the signal is sent during the
55+
test initialization. Therefore use it with caution.
56+
3657
Line-profiler
3758
-------------
3859

selftests/check.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"unit": 675,
3131
"jobs": 11,
3232
"functional-parallel": 315,
33-
"functional-serial": 7,
33+
"functional-serial": 9,
3434
"optional-plugins": 0,
3535
"optional-plugins-golang": 2,
3636
"optional-plugins-html": 3,
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import glob
2+
import os
3+
import signal
4+
import sys
5+
import time
6+
import unittest
7+
8+
from avocado.utils import process, script
9+
from selftests.utils import AVOCADO, TestCaseTmpDir
10+
11+
SLEEP_TEST_PYTHON = """import os
12+
import time
13+
14+
from avocado import Test
15+
16+
17+
class SleepTest(Test):
18+
19+
def test(self):
20+
with open(os.path.join(self.logdir, "sleep.txt"), "w") as f:
21+
for _ in range(10):
22+
f.write("Sleeping \\n")
23+
time.sleep(1)
24+
"""
25+
26+
SLEEP_TEST_EXEC = """#!/bin/bash
27+
output_file="$AVOCADO_TEST_LOGDIR/sleep.txt"
28+
for i in {1..10}; do
29+
echo "This is line $i" >> "$output_file"
30+
sleep 1
31+
done
32+
"""
33+
34+
35+
@unittest.skipIf(
36+
sys.platform.startswith("darwin"),
37+
"The test pause feature is not supported on macOS",
38+
)
39+
class RunnerOperationTest(TestCaseTmpDir):
40+
def _count_lines(self, file_path):
41+
with open(os.path.join(file_path, "sleep.txt"), encoding="utf-8") as file:
42+
return sum(1 for _ in file)
43+
44+
def _check_pause(self, tst):
45+
cmd_line = f"{AVOCADO} run --disable-sysinfo --job-results-dir {self.tmpdir.name} -- {tst}"
46+
proc = process.SubProcess(cmd_line)
47+
proc.start()
48+
init = True
49+
while init:
50+
output = proc.get_stdout()
51+
if b"STARTED" in output:
52+
init = False
53+
time.sleep(2)
54+
proc.send_signal(signal.SIGTSTP)
55+
time.sleep(1)
56+
test_log_dir = glob.glob(
57+
os.path.join(self.tmpdir.name, "job-*", "test-results", "*")
58+
)[0]
59+
lines = self._count_lines(test_log_dir)
60+
self.assertNotEqual(
61+
lines,
62+
10,
63+
"The test finished before it was paused",
64+
)
65+
time.sleep(5)
66+
self.assertEqual(
67+
lines,
68+
self._count_lines(test_log_dir),
69+
"The test was not paused",
70+
)
71+
proc.send_signal(signal.SIGTSTP)
72+
proc.wait()
73+
full_log_path = os.path.join(self.tmpdir.name, "latest", "full.log")
74+
with open(full_log_path, encoding="utf-8") as full_log_file:
75+
full_log = full_log_file.read()
76+
self.assertIn("PAUSED", full_log)
77+
self.assertIn("STARTED", full_log)
78+
self.assertEqual(
79+
self._count_lines(test_log_dir),
80+
10,
81+
"The test was not resumed",
82+
)
83+
84+
def test_pause_exec(self):
85+
with script.TemporaryScript(
86+
"sleep.sh",
87+
SLEEP_TEST_EXEC,
88+
) as tst_exec:
89+
self._check_pause(tst_exec)
90+
91+
def test_pause_instrumented(self):
92+
93+
with script.TemporaryScript(
94+
"sleep.py",
95+
SLEEP_TEST_PYTHON,
96+
) as tst_python:
97+
self._check_pause(tst_python)

0 commit comments

Comments
 (0)