Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 339 additions & 1 deletion tests/foreman/destructive/test_remoteexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,93 @@
from nailgun.entity_mixins import TaskFailedError
import pytest

from robottelo.config import get_credentials
from robottelo.config import get_credentials, settings
from robottelo.exceptions import CLIFactoryError
from robottelo.hosts import get_sat_version
from robottelo.logging import logger
from robottelo.utils.installer import InstallerCommand

CAPSULE_TARGET_VERSION = f'6.{get_sat_version().minor}.z'

pytestmark = pytest.mark.destructive


def create_CA(host, path='/root/CA', name=None):
"""Create a new CA keypair"""
assert host.execute(f'mkdir -p {path}').status == 0
filename = 'id_ca' if name is None else f'id_{name}_ca'
assert (
host.execute(
f'cd {path} && if ! [ -f {filename} ]; then ssh-keygen -t ed25519 -f {filename} -N ""; fi'
).status
== 0
)
return filename


@pytest.fixture
def ca_sat(target_sat):
"""Setup SSH key certs and CA public key on Satellite"""
path = "/root/CA"
sat_ssh_path = '/var/lib/foreman-proxy/ssh/'
filename = create_CA(target_sat, path)
ca_path = f'{path}/{filename}'
key_name = 'id_rsa_foreman_proxy'
cert_name = f'{key_name}-cert.pub'
assert (
target_sat.execute(
f'cd {sat_ssh_path} && cp {ca_path}.pub . && restorecon {filename}.pub && chown foreman-proxy {filename}.pub && chgrp foreman-proxy {filename}.pub'
).status
== 0
)
assert (
target_sat.execute(
f'cd {sat_ssh_path} && ssh-keygen -s {ca_path} -I satellite -n root {key_name}.pub && restorecon {cert_name} && chown foreman-proxy {cert_name} && chgrp foreman-proxy {cert_name}'
).status
== 0
)
return (target_sat, f'{sat_ssh_path}/{filename}.pub')


@pytest.fixture
def ca_contenthost(rhel_contenthost):
"""Setup SSH key certs and CA public key on content host"""
path = '/root/CA'
host_ssh_path = '/etc/ssh'
filename = create_CA(rhel_contenthost, path, 'host')
ca_path = f'{path}/{filename}'
# create a host key and sign it
key_name = 'ssh_host_ed25519_key'
cert_name = f'{key_name}-cert.pub'
assert (
rhel_contenthost.execute(
f'cd {host_ssh_path} && if ! [ -f {key_name} ]; then ssh-keygen -t ed25519 -f {key_name} -N ""; fi'
).status
== 0
)
assert (
rhel_contenthost.execute(
f'cd {host_ssh_path} && ssh-keygen -s {ca_path} -I host -n {rhel_contenthost.hostname} -h {key_name}.pub'
).status
== 0
)
# setup cert usage
assert (
rhel_contenthost.execute(
f'mkdir -p {host_ssh_path}/sshd_config.d && cd {host_ssh_path}/sshd_config.d && echo "HostCertificate {host_ssh_path}/{cert_name}" > 60-host-cert.conf'
).status
== 0
)
assert rhel_contenthost.execute('systemctl restart sshd').status == 0
return (rhel_contenthost, f'{ca_path}.pub')


@pytest.fixture
def host_ca_file_on_satellite(ca_contenthost):
"""Return path of CA public key on Satellite"""
return f'/var/lib/foreman-proxy/ssh/{ca_contenthost[1].split("/")[-1]}'


def test_negative_run_capsule_upgrade_playbook_on_satellite(target_sat):
"""Run Capsule Upgrade playbook against the Satellite itself

Expand Down Expand Up @@ -135,3 +214,262 @@ def test_positive_use_alternate_directory(
task = target_sat.cli.Task.list_tasks({'search': command})[0]
search = target_sat.cli.Task.list_tasks({'search': f'id={task["id"]}'})
assert search[0]['action'] == task['action']


def register_host(satellite, host, org, cockpit=False):
"""Register a content host to Satellite"""
if cockpit:
rhelver = host.os_version.major
if rhelver > 7:
repo = [settings.repos[f'rhel{rhelver}_os']['baseos']]
else:
repo = [settings.repos['rhel7_os'], settings.repos['rhel7_extras']]
else:
repo = []
satellite.register_host_custom_repo(org, host, repo)
return org


def test_execution(satellite, host):
"""Run a job invocation and return its results"""
command = "echo rex_passed $(date) > /root/test"
invocation_command = satellite.cli_factory.job_invocation(
{
'job-template': 'Run Command - Script Default',
'inputs': f'command={command}',
'search-query': f"name ~ {host.hostname}",
}
)
return satellite.cli.JobInvocation.info({'id': invocation_command['id']})


def log_count(satellite, host):
"""Return number of lines mentioning CA was used in sshd log,
for later use
"""
Comment on lines +247 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, non-blocking

Suggested change
"""Return number of lines mentioning CA was used in sshd log,
for later use
"""
"""Return number of lines mentioning CA was used in sshd log"""

return int(
host.execute(
f'journalctl -u sshd | grep {satellite.ip_addr} | grep CA | wc -l'
).stdout.strip()
)


def copy_host_CA(host, satellite, host_path, satellite_path):
"""Copy CA public key from host to Satellite (for use in installer)"""
host_ca_file_local = f'/tmp/{gen_string("alpha")}'
host.get(host_path, host_ca_file_local)
satellite.put(host_ca_file_local, satellite_path)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move the fixtures above the first test in the module


@pytest.mark.no_containers
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
def test_positive_ssh_ca_sat_only(ca_sat, rhel_contenthost, function_org):
"""Setup Satellite's SSH cert, register host and run REX on that host

:id: 353a21bf-f379-440a-9dc6-e17bf6414713

:expectedresults: Verify the job has been run successfully against the host, Sat's public key hasn't been added to host's authorized_keys and CA verification has been used instead

:parametrized: yes
"""
sat = ca_sat[0]
host = rhel_contenthost
sat_ca_file = ca_sat[1]
saved_count = log_count(sat, host)
command = InstallerCommand(
foreman_proxy_plugin_remote_execution_script_ssh_user_ca_public_key_file=sat_ca_file,
)
assert sat.install(command).status == 0
register_host(sat, host, function_org, cockpit=True)
result = test_execution(sat, host)
# assert the run actually happened and it was authenticated using cert
assert result['success'] == '1'
logger.debug(result)
assert log_count(sat, host) == saved_count + 1
check = host.execute('grep rex_passed /root/test')
assert check.status == 0
check = host.execute(f'grep {sat.hostname} /root/.ssh/authorized_keys')
assert check.status != 0


@pytest.mark.no_containers
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
def test_negative_ssh_ca_sat_wrong_cert(ca_sat, rhel_contenthost, function_org):
"""Setup Satellite's SSH cert, register host, setup incorrect cert on Satellite and run REX on the host

:id: 7ddd170b-d489-4e2a-93af-ccf0c1a9d4ca

:expectedresults: Verify the job has failed against the host

:parametrized: yes
"""
sat = ca_sat[0]
host = rhel_contenthost
sat_ca_file = ca_sat[1]
command = InstallerCommand(
foreman_proxy_plugin_remote_execution_script_ssh_user_ca_public_key_file=sat_ca_file,
)
assert sat.install(command).status == 0
register_host(sat, host, function_org, cockpit=True)

# create a different cert for the Satellite, with a wrong principal
sat_ssh_path = '/var/lib/foreman-proxy/ssh/'
key_name = 'id_rsa_foreman_proxy'
cert_name = f'{key_name}-cert.pub'
assert (
sat.execute(
f'cd {sat_ssh_path} && ssh-keygen -s /root/CA/id_ca -I satellite -n wronguser {key_name}.pub && restorecon {cert_name} && chown foreman-proxy {cert_name} && chgrp foreman-proxy {cert_name}'
).status
== 0
)

# assert the run fails
with pytest.raises(CLIFactoryError) as err:
test_execution(sat, host)
assert 'A sub task failed' in err.value.args[0]
check = host.execute('grep rex_passed /root/test')
assert check.status != 0


@pytest.mark.no_containers
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
def test_positive_ssh_ca_host_only(
target_sat, ca_contenthost, host_ca_file_on_satellite, function_org
):
"""Setup host's SSH cert, add CA to Sat, register host and run REX on that host

:id: 0ad9bbf7-0be5-49ca-8d79-969242b6b9bc

:expectedresults: Verify the job has been run successfully against the host

:parametrized: yes
"""
sat = target_sat
host = ca_contenthost[0]
host_ca_file = ca_contenthost[1]
copy_host_CA(host, sat, host_ca_file, host_ca_file_on_satellite)
saved_count = log_count(sat, host)
command = InstallerCommand(
foreman_proxy_plugin_remote_execution_script_ssh_host_ca_public_keys_file=host_ca_file_on_satellite,
)
assert sat.install(command).status == 0
register_host(sat, host, function_org, cockpit=True)
result = test_execution(sat, host)
# assert the run actually happened and it was NOT authenticated using cert
assert result['success'] == '1'
logger.debug(result)
assert log_count(sat, host) == saved_count
check = host.execute('grep rex_passed /root/test')
assert check.status == 0


@pytest.mark.no_containers
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
def test_negative_ssh_ca_host_wrong_cert(
target_sat, ca_contenthost, host_ca_file_on_satellite, function_org
):
"""Setup host's SSH cert, add a different CA to Sat, register host and run REX on that host

:id: 9e23d27d-a3a8-4c0d-9a0f-892d392aa660

:expectedresults: Verify the job has failed against the host

:parametrized: yes
"""
sat = target_sat
host = ca_contenthost[0]
fake_ca_dir = '/'.join(host_ca_file_on_satellite.split('/')[:-1])
fake_ca_filename = host_ca_file_on_satellite.split('/')[-1]
sat.execute(
f'cd {fake_ca_dir} && {{ rm -f {fake_ca_filename}; ssh-keygen -t ed25519 -f {fake_ca_filename} -N ""; }}'
)
command = InstallerCommand(
foreman_proxy_plugin_remote_execution_script_ssh_host_ca_public_keys_file=host_ca_file_on_satellite,
)
assert sat.install(command).status == 0
register_host(sat, host, function_org, cockpit=True)
# assert the run failed
with pytest.raises(CLIFactoryError) as err:
test_execution(sat, host)
assert 'A sub task failed' in err.value.args[0]
check = host.execute('grep rex_passed /root/test')
assert check.status != 0


@pytest.mark.e2e
@pytest.mark.no_containers
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
def test_positive_ssh_ca_sat_and_host_ssh_ansible_cockpit(
ca_sat, ca_contenthost, host_ca_file_on_satellite, function_org
):
"""Setup Satellite's SSH cert, setup host's SSH cert, add CA to Sat, register host and run REX on that host

:id: 97c17417-3b20-4876-bf9c-7219a91acee2

:expectedresults: Verify the job has been run successfully against the host, Sat's public key hasn't been added to host's authorized_keys and CA verification has been used instead


:parametrized: yes
"""
sat = ca_sat[0]
sat_ca_file = ca_sat[1]
host = ca_contenthost[0]
host_ca_file = ca_contenthost[1]
copy_host_CA(host, sat, host_ca_file, host_ca_file_on_satellite)
# setup CA
saved_count = log_count(sat, host)
command = InstallerCommand(
foreman_proxy_plugin_remote_execution_script_ssh_user_ca_public_key_file=sat_ca_file,
foreman_proxy_plugin_remote_execution_script_ssh_host_ca_public_keys_file=host_ca_file_on_satellite,
)
assert sat.install(command).status == 0
register_host(sat, host, function_org, cockpit=True)
# SSH REX
result = test_execution(sat, host)
# assert the run actually happened and it was authenticated using cert
assert result['success'] == '1'
logger.debug(result)
assert log_count(sat, host) == saved_count + 1
check = host.execute('grep rex_passed /root/test')
assert check.status == 0
check = host.execute(f'grep {sat.hostname} /root/.ssh/authorized_keys')
assert check.status != 0
# ANSIBLE REX
saved_count = log_count(sat, host)
command = "echo rex2_passed $(date) > /root/test"
invocation_command = sat.cli_factory.job_invocation(
{
'job-template': 'Run Command - Ansible Default',
'inputs': f'command={command}',
'search-query': f'name ~ {host.hostname}',
}
)
result = sat.cli.JobInvocation.info({'id': invocation_command['id']})
# assert the run actually happened and it was authenticated using cert
assert result['success'] == '1'
logger.debug(result)
assert log_count(sat, host) == saved_count + 1
check = host.execute('grep rex2_passed /root/test')
assert check.status == 0
# COCKPIT
saved_count = log_count(sat, host)
# setup Satellite for cockpit
sat.register_to_cdn()
sat.install_cockpit()
sat.cli.Service.restart()
# setup cockpit on host
host.install_cockpit()
# run cockpit
# note that merely opening a cockpit in UI connects to ssh already
# note that this is a UI part, as opposed to the previous parts and previous SSH CA tests
with sat.ui_session() as session:
session.organization.select(org_name=function_org.name)
hostname_inside_cockpit = session.host.get_webconsole_content(
entity_name=host.hostname,
rhel_version=host.os_version.major,
)
assert host.hostname in hostname_inside_cockpit, (
f'cockpit page shows hostname {hostname_inside_cockpit} instead of {host.hostname}'
)
# assert the cockpit access was authenticated using cert
assert log_count(sat, host) == saved_count + 1