Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions alts/shared/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import IntEnum

__all__ = [
'API_VERSION',
'ARCHITECTURES',
Expand Down Expand Up @@ -60,3 +62,10 @@
'hostbased',
'publickey',
]


class TapStatusEnum(IntEnum):
FAILED = 0
DONE = 1
TODO = 2
SKIPPED = 3
2 changes: 1 addition & 1 deletion alts/shared/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def __init__(self, **data):
]
keepalive_interval: int = 30 # unit in seconds
commands_exec_timeout: int = 30 # unit in seconds
provision_timeout: int = 600 # 10 minutes in seconds
provision_timeout: int = 1200 # 20 minutes in seconds
tests_exec_timeout: int = 1800 # 30 minutes in seconds
deprecated_ansible_venv: str = get_abspath('~/ansible_env')
epel_release_urls: Dict[str, str] = {
Expand Down
31 changes: 31 additions & 0 deletions alts/shared/utils/log_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from tempfile import NamedTemporaryFile
from typing import IO, Tuple


def get_temp_log_files(prefix: str) -> Tuple[IO, IO]:
temp_file_kwargs = {
'delete': False,
'mode': 'w+',
'prefix': prefix,
}
return (
NamedTemporaryFile(**temp_file_kwargs, suffix='.stdout.log'),
NamedTemporaryFile(**temp_file_kwargs, suffix='.stderr.log'),
)


def read_and_cleanup_temp_log_files(
out_file: IO[str],
err_file: IO[str],
) -> Tuple[str, str]:
out = err = ''
for file, log in zip(
(out_file, err_file),
(out, err),
):
file.seek(0)
log += f'\n{file.read()}'
file.close()
os.unlink(file.name)
return out, err
18 changes: 18 additions & 0 deletions alts/shared/utils/plumbum_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time

from plumbum import ProcessTimedOut
from plumbum.commands.modifiers import Future


def wait_bg_process(future: Future, timeout: int):
# For some reason, plumbum.commands.modifiers.Future.wait method
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting. Which version of plumbum? I'm on 1.8.2 and everything looks fine here, see:

>>> runner = ( local['tail'].with_env(FOO='bar').run_bg(args=['-f', '/dev/null'], timeout=2, retcode=None,) )
>>> runner.wait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/modifiers.py", line 43, in wait
    self._returncode, self._stdout, self._stderr = run_proc(
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/processes.py", line 299, in run_proc
    return _check_process(proc, retcode, timeout, stdout, stderr)
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/processes.py", line 17, in _check_process
    proc.verify(retcode, timeout, stdout, stderr)
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/machines/base.py", line 15, in verify
    raise ProcessTimedOut(
plumbum.commands.processes.ProcessTimedOut: ('Process did not terminate within 2 seconds', ['/usr/bin/tail', '-f', '/dev/null'])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works in that way, but when you're trying to run a background process in celery or multiprocessing.Pool run_bg doesn't take timeout into account

# doesn't take timeout into account, so here is a workaround for it
start_time = time.time()
while not future.poll():
if time.time() - start_time > timeout:
future.proc.terminate()
raise ProcessTimedOut(
f"Process did not terminate within {timeout} seconds",
future.proc.argv,
)
time.sleep(0.1)
16 changes: 14 additions & 2 deletions alts/worker/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

from alts.shared.models import AsyncSSHParams, CommandResult
from alts.shared.utils.asyncssh import AsyncSSHClient, LongRunSSHClient
from alts.shared.utils.log_utils import (
get_temp_log_files,
read_and_cleanup_temp_log_files,
)
from alts.shared.utils.plumbum_utils import wait_bg_process


def measure_stage(stage: str):
Expand Down Expand Up @@ -196,6 +201,8 @@ def run_docker_command(
if env_vars:
for var in env_vars:
additional_env_vars.extend(('-e', var))
out_file, err_file = get_temp_log_files(str(self.container_name))
stdout = stderr = ''
try:
runner = (
local['docker']
Expand All @@ -209,11 +216,12 @@ def run_docker_command(
self.binary_name,
*cmd_args,
],
timeout=self.timeout,
retcode=None,
stdout=out_file,
stderr=err_file,
)
)
runner.wait()
wait_bg_process(runner, self.timeout or 30)
stdout = runner.stdout
stderr = runner.stderr
exit_code = runner.returncode
Expand All @@ -224,6 +232,10 @@ def run_docker_command(
except Exception:
self.logger.exception('Cannot run docker command:')
exit_code, stdout, stderr = 1, '', format_exc()
finally:
out, err = read_and_cleanup_temp_log_files(out_file, err_file)
stdout += out
stderr += err
return CommandResult(
exit_code=exit_code,
stdout=stdout,
Expand Down
35 changes: 23 additions & 12 deletions alts/worker/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
git_reset_hard,
prepare_gerrit_command,
)
from alts.shared.utils.log_utils import (
get_temp_log_files,
read_and_cleanup_temp_log_files,
)
from alts.shared.utils.plumbum_utils import wait_bg_process
from alts.worker import CONFIG, RESOURCES_DIR
from alts.worker.executors.ansible import AnsibleExecutor
from alts.worker.executors.bats import BatsExecutor
Expand Down Expand Up @@ -572,21 +577,22 @@ def run_ansible_command(
self, args: Union[tuple, list], retcode_none: bool = False,
timeout: int = CONFIG.provision_timeout
):
run_kwargs = {
'args': args,
'timeout': timeout
}
run_kwargs = {'args': args}
if retcode_none:
run_kwargs['retcode'] = None
cmd = local[self.ansible_playbook_binary].with_cwd(self._work_dir)
formulated_cmd = cmd.formulate(args=run_kwargs.get('args', ()))
exception_happened = False
cmd_pid = None
out_file, err_file = get_temp_log_files(str(self._task_id))
stdout = stderr = ''
try:
future = cmd.run_bg(**run_kwargs)
future = cmd.run_bg(**run_kwargs, stdout=out_file, stderr=err_file)
cmd_pid = future.proc.pid
future.wait()
exit_code, stdout, stderr = future.returncode, future.stdout, future.stderr
wait_bg_process(future, timeout)
exit_code, stdout, stderr = (
future.returncode, future.stdout, future.stderr
)
except ProcessExecutionError as e:
stdout = e.stdout
stderr = e.stderr
Expand All @@ -598,13 +604,17 @@ def run_ansible_command(
exit_code = COMMAND_TIMEOUT_EXIT_CODE
exception_happened = True
except Exception as e:
self._logger.error(
'Unknown error happened during %s execution: %s',
self._logger.exception(
'Unknown error happened during execution: %s',
formulated_cmd
)
stdout = ''
stderr = str(e)
exit_code = 255
finally:
out, err = read_and_cleanup_temp_log_files(out_file, err_file)
stdout += out
stderr += err

if exception_happened and cmd_pid:
try:
Expand Down Expand Up @@ -922,7 +932,7 @@ def install_package(
module_stream=module_stream,
module_version=module_version,
semi_verbose=semi_verbose,
verbose=verbose,
verbose=self._verbose or verbose,
allow_fail=allow_fail,
)

Expand Down Expand Up @@ -1185,7 +1195,8 @@ def ensure_package_is_installed(
package_name,
package_version=package_version,
package_epoch=package_epoch,
semi_verbose=True
semi_verbose=True,
verbose=self._verbose,
)

def get_init_script(self, tests_dir: Path) -> Optional[Path]:
Expand Down Expand Up @@ -1537,7 +1548,7 @@ def setup(self, skip_provision: bool = False):
self.initialize_terraform()
self.start_env()
if not skip_provision:
self.initial_provision()
self.initial_provision(verbose=self._verbose)

def teardown(self, publish_artifacts: bool = True):
try:
Expand Down
2 changes: 1 addition & 1 deletion alts/worker/runners/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
vm_alive: bool = False,
artifacts_uploader: Optional[BaseLogsUploader] = None,
package_channel: Optional[str] = None,
verbose: bool = False,
verbose: bool = True,
):
"""
Docker environment class initialization.
Expand Down
2 changes: 1 addition & 1 deletion alts/worker/runners/opennebula.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
test_configuration: Optional[dict] = None,
test_flavor: Optional[Dict[str, str]] = None,
vm_alive: bool = False,
verbose: bool = False,
verbose: bool = True,
):
super().__init__(
task_id,
Expand Down
107 changes: 99 additions & 8 deletions alts/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import random
import time
import urllib.parse
import base64
import gzip
from celery.contrib.abortable import AbortableTask
from collections import defaultdict
from socket import timeout
Expand All @@ -26,7 +28,7 @@
from urllib3 import Retry
from urllib3.exceptions import TimeoutError

from alts.shared.constants import API_VERSION, DEFAULT_REQUEST_TIMEOUT
from alts.shared.constants import API_VERSION, DEFAULT_REQUEST_TIMEOUT, TapStatusEnum
from alts.shared.exceptions import (
InstallPackageError,
PackageIntegrityTestsError,
Expand Down Expand Up @@ -89,6 +91,88 @@ def are_tap_tests_success(tests_output: str):
return errors == 0


def parse_tap_output(text):
"""
Parses TAP test output and returns list of TAP-formatted entities.
Returns list of dicts with detailed status report for each test in file.

Parameters
----------
text: str or unicode or bytes
Test output from VM

Returns
-------
list

"""
def to_unicode(s):
if isinstance(s, bytes):
return s.decode('utf8')
if isinstance(s, str):
return s
return str(s)

def get_diagnostic(tap_item):
diagnostics = []
index = raw_data.index(tap_item) + 1
while (
index < len(raw_data)
and raw_data[index].category == 'diagnostic'
):
diagnostics.append(raw_data[index].text)
index += 1
return "\n".join(diagnostics)

try:
prepared_text = to_unicode(text).replace('\r\n', '\n')
except TypeError:
prepared_text = to_unicode(text.replace(b'\r\n', b'\n'))
tap_parser = tap.parser.Parser()
try:
raw_data = list(tap_parser.parse_text(prepared_text))
except Exception:
return

tap_output = []
if not all((item.category == 'unknown' for item in raw_data)):
for test_result in raw_data:
if test_result.category != 'test':
continue
test_name = test_result.description
if not test_name:
test_name = test_result.directive.text
status = TapStatusEnum.FAILED
if test_result.todo:
status = TapStatusEnum.TODO
elif test_result.skip:
status = TapStatusEnum.SKIPPED
elif test_result.ok:
status = TapStatusEnum.DONE
tap_output.append({
'test_name': test_name,
'status': status,
'diagnostic': get_diagnostic(test_result),
})
return tap_output


def parse_and_compress_stage_results(stage_data: dict, log_name: str = ''):
code = stage_data.get('exit_code')
out = stage_data.get('stdout', '')
err = stage_data.get('stderr', '')
log = f'Exit code: {code}\nStdout:\n{out}\nStderr:\n{err}'
result = {
'exit_code': code,
'compressed_log': base64.b64encode(
gzip.compress(log.encode()),
).decode('utf-8'),
}
if out and 'bats' in log_name:
result['tap_results'] = parse_tap_output(out)
return result


class RetryableTask(AbortableTask):
autoretry_for = AUTO_RETRY_EXCEPTIONS
max_retries = 5
Expand Down Expand Up @@ -164,7 +248,7 @@ def set_artifacts_when_stage_has_unexpected_exception(
'package_channel': task_params.get('package_channel', 'beta'),
'test_configuration': task_params.get('test_configuration', {}),
'test_flavor': task_params.get('test_flavor', {}),
'vm_alive': task_params.get('vm_alive')
'vm_alive': task_params.get('vm_alive'),
}

runner_class = RUNNER_MAPPING[task_params['runner_type']]
Expand Down Expand Up @@ -270,18 +354,25 @@ def set_artifacts_when_stage_has_unexpected_exception(
if stage not in TESTS_SECTIONS_NAMES:
stage_info = {'success': is_success(stage_data)}
if CONFIG.logs_uploader_config.skip_artifacts_upload:
stage_info.update(stage_data)
stage_info.update(
parse_and_compress_stage_results(
stage_data,
log_name=stage,
),
)
summary[stage] = stage_info
continue
if stage not in summary:
summary[stage] = {}
for inner_stage, inner_data in stage_data.items():
stage_info = {
'success': is_success(inner_data),
'output': inner_data['stdout'],
}
stage_info = {'success': is_success(inner_data)}
if CONFIG.logs_uploader_config.skip_artifacts_upload:
stage_info.update(inner_data)
stage_info.update(
parse_and_compress_stage_results(
inner_data,
log_name=inner_stage,
),
)
summary[stage][inner_stage] = stage_info
if runner.uploaded_logs:
summary['logs'] = runner.uploaded_logs
Expand Down
Loading