Skip to content

Commit 7d6aa40

Browse files
committed
SSH CA cert feature SAT-28038
1 parent a6f3ab0 commit 7d6aa40

File tree

1 file changed

+212
-1
lines changed

1 file changed

+212
-1
lines changed

tests/foreman/destructive/test_remoteexecution.py

Lines changed: 212 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
from nailgun.entity_mixins import TaskFailedError
1818
import pytest
1919

20-
from robottelo.config import get_credentials
20+
from robottelo.config import get_credentials, settings
2121
from robottelo.hosts import get_sat_version
22+
from robottelo.logging import logger
23+
from robottelo.utils.installer import InstallerCommand
2224

2325
CAPSULE_TARGET_VERSION = f'6.{get_sat_version().minor}.z'
2426

@@ -135,3 +137,212 @@ def test_positive_use_alternate_directory(
135137
task = target_sat.cli.Task.list_tasks({'search': command})[0]
136138
search = target_sat.cli.Task.list_tasks({'search': f'id={task["id"]}'})
137139
assert search[0]['action'] == task['action']
140+
141+
142+
def create_CA(host, path='/root/CA', name=None):
143+
assert host.execute(f'mkdir -p {path}').status == 0
144+
filename = 'id_ca' if name is None else f'id_{name}_ca'
145+
assert (
146+
host.execute(
147+
f'cd {path} && if ! [ -f {filename} ]; then ssh-keygen -t ed25519 -f {filename} -N ""; fi'
148+
).status
149+
== 0
150+
)
151+
return filename
152+
153+
154+
@pytest.fixture
155+
def ca_sat(target_sat):
156+
path = "/root/CA"
157+
sat_ssh_path = '/var/lib/foreman-proxy/ssh/'
158+
filename = create_CA(target_sat, path)
159+
ca_path = f'{path}/{filename}'
160+
key_name = 'id_rsa_foreman_proxy'
161+
cert_name = f'{key_name}-cert.pub'
162+
assert (
163+
target_sat.execute(
164+
f'cd {sat_ssh_path} && cp {ca_path}.pub . && restorecon {filename}.pub && chown foreman-proxy {filename}.pub && chgrp foreman-proxy {filename}.pub'
165+
).status
166+
== 0
167+
)
168+
assert (
169+
target_sat.execute(
170+
f'cd {sat_ssh_path} && ssh-keygen -s {ca_path} -I satellite -n root {key_name}.pub && restorecon {cert_name} && chown foreman-proxy {cert_name} && chgrp foreman-proxy {cert_name}'
171+
).status
172+
== 0
173+
)
174+
return (target_sat, f'{sat_ssh_path}/{filename}.pub')
175+
176+
177+
@pytest.fixture
178+
def ca_contenthost(rhel_contenthost):
179+
path = '/root/CA'
180+
host_ssh_path = '/etc/ssh'
181+
filename = create_CA(rhel_contenthost, path, 'host')
182+
ca_path = f'{path}/{filename}'
183+
# create a host key and sign it
184+
key_name = 'ssh_host_ed25519_key'
185+
cert_name = f'{key_name}-cert.pub'
186+
assert (
187+
rhel_contenthost.execute(
188+
f'cd {host_ssh_path} && if ! [ -f ssh_host_ed25519_key ]; then ssh-keygen -t ed25519 -f {key_name} -N ""; fi'
189+
).status
190+
== 0
191+
)
192+
assert (
193+
rhel_contenthost.execute(
194+
f'cd {host_ssh_path} && ssh-keygen -s {ca_path} -I host -n {rhel_contenthost.hostname} -h {key_name}.pub'
195+
).status
196+
== 0
197+
)
198+
# setup cert usage
199+
assert (
200+
rhel_contenthost.execute(
201+
f'mkdir -p {host_ssh_path}/sshd_config.d && cd {host_ssh_path}/sshd_config.d && echo "HostCertificate {host_ssh_path}/{cert_name}" > 60-host-cert.conf'
202+
).status
203+
== 0
204+
)
205+
assert rhel_contenthost.execute('systemctl restart sshd').status == 0
206+
return (rhel_contenthost, f'{ca_path}.pub')
207+
208+
209+
@pytest.fixture
210+
def host_ca_file_on_satellite(ca_contenthost):
211+
return f'/var/lib/foreman-proxy/ssh/{ca_contenthost[1].split("/")[-1]}'
212+
213+
214+
def register_host(satellite, host):
215+
org = satellite.cli.Org.create({'name': gen_string('alpha')})
216+
# repo = settings.repos['SATCLIENT_REPO'][f'RHEL{host.os_version.major}']
217+
lce = satellite.cli_factory.make_lifecycle_environment({'organization-id': org['id']})
218+
cv = satellite.cli_factory.make_content_view({'organization-id': org['id']})
219+
satellite.cli.ContentView.publish({'id': cv['id']})
220+
cvv = satellite.cli.ContentView.info({'id': cv['id']})['versions'][0]
221+
satellite.cli.ContentView.version_promote(
222+
{'id': cvv['id'], 'to-lifecycle-environment-id': lce['id']}
223+
)
224+
ak_with_cv = satellite.cli_factory.make_activation_key(
225+
{
226+
'organization-id': org['id'],
227+
'lifecycle-environment-id': lce.id,
228+
'content-view-id': cv.id,
229+
'name': gen_string('alpha'),
230+
}
231+
)
232+
# host.register(org, None, ak_with_cv.name, satellite, repo_data=f'repo={repo}')
233+
host.register(org, None, ak_with_cv.name, satellite)
234+
235+
236+
def test_execution(satellite, host):
237+
command = "echo rex_passed $(date) > /root/test"
238+
invocation_command = satellite.cli_factory.job_invocation(
239+
{
240+
'job-template': 'Run Command - Script Default',
241+
'inputs': f'command={command}',
242+
'search-query': f"name ~ {host.hostname}",
243+
}
244+
)
245+
return satellite.cli.JobInvocation.info({'id': invocation_command['id']})
246+
247+
248+
def log_save(satellite, host):
249+
host.execute(
250+
f'journalctl -u sshd | grep {satellite.ip} | grep CA | wc -l > /root/saved_sshd_log'
251+
)
252+
253+
254+
def log_compare(satellite, host):
255+
return host.execute(
256+
f'[ $(( $(cat /root/saved_sshd_log) + 1 )) -eq $(journalctl -u sshd | grep {satellite.ip} | grep " CA " | wc -l) ]'
257+
).status
258+
259+
260+
def copy_host_CA(host, satellite, host_path, satellite_path):
261+
host_ca_file_local = f'/tmp/{gen_string("alpha")}'
262+
host.get(host_path, host_ca_file_local)
263+
satellite.put(host_ca_file_local, satellite_path)
264+
265+
266+
@pytest.mark.no_containers
267+
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
268+
def test_positive_ssh_ca_sat_only(ca_sat, rhel_contenthost):
269+
sat = ca_sat[0]
270+
host = rhel_contenthost
271+
sat_ca_file = ca_sat[1]
272+
log_save(sat, host)
273+
command = InstallerCommand(
274+
foreman_proxy_plugin_remote_execution_script_ssh_user_ca_public_keys_file=sat_ca_file,
275+
)
276+
assert sat.install(command).status == 0
277+
register_host(sat, host)
278+
result = test_execution(sat, host)
279+
# assert the run actually happened and it was authenticated using cert
280+
assert result.status == 0
281+
logger.debug(result)
282+
assert log_compare(sat, host) == 0
283+
check = host.execute('grep rex_passed /root/test')
284+
assert check.status == 0
285+
286+
287+
@pytest.mark.no_containers
288+
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
289+
def test_positive_ssh_ca_host_only(target_sat, ca_contenthost, host_ca_file_on_satellite):
290+
sat = target_sat
291+
host = ca_contenthost[0]
292+
host_ca_file = ca_contenthost[1]
293+
copy_host_CA(host, sat, host_ca_file, host_ca_file_on_satellite)
294+
log_save(sat, host)
295+
command = InstallerCommand(
296+
foreman_proxy_plugin_remote_execution_script_ssh_host_ca_public_keys_file=host_ca_file_on_satellite,
297+
)
298+
assert sat.install(command).status == 0
299+
register_host(sat, host)
300+
result = test_execution(sat, host)
301+
# assert the run actually happened and it was NOT authenticated using cert
302+
assert result['success'] == '1'
303+
logger.debug(result)
304+
assert log_compare(sat, host) != 0
305+
check = host.execute('grep rex_passed /root/test')
306+
assert check.status == 0
307+
308+
309+
@pytest.mark.no_containers
310+
@pytest.mark.rhel_ver_match([settings.content_host.default_rhel_version])
311+
def test_positive_ssh_ca_sat_and_host(ca_sat, ca_contenthost, host_ca_file_on_satellite):
312+
sat = ca_sat[0]
313+
sat_ca_file = ca_sat[1]
314+
host = ca_contenthost[0]
315+
host_ca_file = ca_contenthost[1]
316+
copy_host_CA(host, sat, host_ca_file, host_ca_file_on_satellite)
317+
log_save(sat, host)
318+
command = InstallerCommand(
319+
foreman_proxy_plugin_remote_execution_script_ssh_user_ca_public_key_file=sat_ca_file,
320+
foreman_proxy_plugin_remote_execution_script_ssh_host_ca_public_keys_file=host_ca_file_on_satellite,
321+
)
322+
assert sat.install(command).status == 0
323+
register_host(sat, host)
324+
# SSH REX
325+
result = test_execution(sat, host)
326+
# assert the run actually happened and it was authenticated using cert
327+
assert result['success'] == '1'
328+
logger.debug(result)
329+
assert log_compare(sat, host) == 0
330+
check = host.execute('grep rex_passed /root/test')
331+
assert check.status == 0
332+
# ANSIBLE REX
333+
log_save(sat, host)
334+
command = "echo rex2_passed $(date) > /root/test"
335+
invocation_command = sat.cli_factory.job_invocation(
336+
{
337+
'job-template': 'Run Command - Ansible Default',
338+
'inputs': f'command={command}',
339+
'search-query': f'name ~ {host.hostname}',
340+
}
341+
)
342+
result = sat.cli.JobInvocation.info({'id': invocation_command['id']})
343+
# assert the run actually happened and it was authenticated using cert
344+
assert result['success'] == '1'
345+
logger.debug(result)
346+
assert log_compare(sat, host) == 0
347+
check = host.execute('grep rex2_passed /root/test')
348+
assert check.status == 0

0 commit comments

Comments
 (0)