diff --git a/tests/conftest.py b/tests/conftest.py index f8ba410d5d..4c8cc7dbd2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -39,6 +39,7 @@ def pytest_addoption(parser): Add CLI options and modify options for pytest-ansible where needed. Note: Set the default to to None, otherwise when evaluating with `request.config.getoption("--zinventory"):` will always return true because a default will be returned. + New option have been added to the execution of the command to allow the become method. """ parser.addoption( "--zinventory", @@ -54,6 +55,36 @@ def pytest_addoption(parser): default=None, help="Str - dictionary with values {'host': 'ibm.com', 'user': 'root', 'zoau': '/usr/lpp/zoau', 'pyz': '/usr/lpp/IBM/pyz'}", ) + parser.addoption( + "--user_adm", + action="store", + default=None, + help="Str " + ) + parser.addoption( + "--user_method", + action="store", + default=None, + help="Str " + ) + parser.addoption( + "--ansible_promp", + action="store", + default=None, + help="Str " + ) + parser.addoption( + "--password", + action="store", + default=None, + help="Str " + ) + parser.addoption( + "--ssh_key", + action="store", + default=None, + help="Str " + ) @pytest.fixture(scope="session") @@ -204,3 +235,21 @@ def get_config(request): """ Call the pytest-ansible plugin to check volumes on the system and work properly a list by session.""" path = request.config.getoption("--zinventory") yield path + +@pytest.fixture(scope="function") +def get_config_raw(request): + """ Call the pytest-ansible plugin to check the options on user manager test cases.""" + path = request.config.getoption("--zinventory-raw") + yield path + +@pytest.fixture(scope='session') +def get_config_for_become(request): + """ Return as a dict the values to be used on the test cases for become method""" + become_config = { + "user" : request.config.option.user_adm, + "method" : request.config.option.user_method, + "promp" : request.config.option.ansible_promp, + "key" : request.config.option.password, + "ssh_key" : request.config.option.ssh_key + } + return become_config \ No newline at end of file diff --git a/tests/functional/modules/test_zos_fetch_func.py b/tests/functional/modules/test_zos_fetch_func.py index 222bc5888e..7cf96d11ce 100644 --- a/tests/functional/modules/test_zos_fetch_func.py +++ b/tests/functional/modules/test_zos_fetch_func.py @@ -20,11 +20,15 @@ import pytest import string import random +import yaml +import json +import subprocess import tempfile from hashlib import sha256 from ansible.utils.hashing import checksum from datetime import datetime +from shellescape import quote from shellescape import quote @@ -33,6 +37,7 @@ # pylint: disable-next=import-error from ibm_zos_core.tests.helpers.dataset import get_tmp_ds_name from ibm_zos_core.tests.helpers.utils import get_random_file_name +from ibm_zos_core.tests.helpers.users import ManagedUserType, ManagedUser __metaclass__ = type @@ -87,6 +92,61 @@ 00000003A record """ +INVENTORY = """all: + hosts: + zvm: + ansible_host: {0} + ansible_ssh_private_key_file: {1} + ansible_user: {2} + ansible_python_interpreter: {3}""" + +BECOME_USER="""- hosts: zvm + collections : + - ibm.ibm_zos_core + gather_facts: False + ignore_errors: True + vars: + ZOAU: "{0}" + PYZ: "{1}" + ansible_become_user: {2} + + ansible_become_method: {3} + ansible_su_prompt_l10n: {4} {{ansible_become_user}} + environment: + _BPXK_AUTOCVT: "ALL" + ZOAU_HOME: "{0}" + PYTHONPATH: "{0}/lib/{5}" + LIBPATH: "{0}/lib:{1}/lib:/lib:/usr/lib:." + PATH: "{0}/bin:/bin:/usr/lpp/rsusr/ported/bin:/var/bin:/usr/lpp/rsusr/ported/bin:/usr/lpp/java/java180/J8.0_64/bin:{1}/bin:" + _CEE_RUNOPTS: "FILETAG(AUTOCVT,AUTOTAG) POSIX(ON)" + _TAG_REDIR_ERR: "txt" + _TAG_REDIR_IN: "txt" + _TAG_REDIR_OUT: "txt" + LANG: "C" + PYTHONSTDINENCODING: "cp1047" + tasks: + - name: Fetch PDSE member while escalating privileges. + zos_fetch: + src: {6} + dest: /tmp/ + flat: true + become: true +""" + +ANSIBLE_CFD = """[defaults] +forks = 25 +become_password_file = ./key.txt + +[connection] +pipelining = True + +[ssh_connection] +pipelining = True + +[colors] +verbose = green""" + +KEY = "{0}" def extract_member_name(data_set): start = data_set.find("(") @@ -985,3 +1045,96 @@ def test_fetch_uss_file_relative_path_not_present_on_local_machine(ansible_zos_m finally: if os.path.exists(dest): os.remove(dest) + + +def test_become_option_with_restricted_user(ansible_zos_module, z_python_interpreter, get_config_for_become, capsys): + """ + This tests check the become method it will pass the escalation but fail to execute the module because of lack of + permissions on on the system. + """ + with capsys.disabled(): + managed_user = None + managed_user_test_case_name = "managed_user_limited_become_method" + become = get_config_for_become + try: + # Initialize the Managed user API from the pytest fixture. + managed_user = ManagedUser.from_fixture(ansible_zos_module, z_python_interpreter) + + # Important: Execute the test case with the managed users execution utility. + # For become method is better to have verbose and debug as False only available for debug options. + managed_user.execute_managed_user_become_test( + managed_user_test_case = managed_user_test_case_name, become_method = become, debug = False, + verbose = False, managed_user_type=ManagedUserType.ZOAU_LIMITED_ACCESS_OPERCMD) + + finally: + # Delete the managed user on the remote host to avoid proliferation of users. + managed_user.delete_managed_user() + +def managed_user_limited_become_method(get_config_for_become, get_config_raw, capsys): + with capsys.disabled(): + ds_name = get_tmp_ds_name() + ds_name = ds_name + "(MEMBER)" + + # Get values from the command + adm_user = get_config_for_become["user"] + method = get_config_for_become["method"] + promp = get_config_for_become["promp"] + password = get_config_for_become["key"] + + # Get values from the new configuration file + configuration = json.loads(get_config_raw) + ssh_key = configuration["ssh_key"] + hosts = configuration["host"] + user = configuration["user"] + python_path = configuration["python_interpreter"] + cut_python_path = python_path[:python_path.find('/bin')].strip() + zoau = configuration["zoau"] + python_version = cut_python_path.split('/')[2] + + try: + playbook = "playbook.yml" + inventory = "inventory.yml" + ansible_cfd = "ansible.cfd" + key_file = "key.txt" + + os.system("echo {0} > {1}".format(quote(BECOME_USER.format( + zoau, + cut_python_path, + adm_user, + method, + promp, + python_version, + ds_name, + )), playbook)) + + os.system("echo {0} > {1}".format(quote(INVENTORY.format( + hosts, + ssh_key, + user, + python_path + )), inventory)) + + os.system("echo {0} > {1}".format(quote(ANSIBLE_CFD), ansible_cfd)) + os.system("echo {0} > {1}".format(quote(KEY.format( + password + )), key_file)) + + command = "ansible-playbook -vvv -i {0} {1}".format( + inventory, + playbook + ) + + result = subprocess.run( + command, + capture_output=True, + shell=True, + timeout=120, + encoding='utf-8' + ) + + assert result.returncode == 0 + finally: + os.remove(playbook) + os.remove(inventory) + os.remove(key_file) + os.remove(ansible_cfd) \ No newline at end of file diff --git a/tests/helpers/users.py b/tests/helpers/users.py index 4b370faef9..a292b0cde8 100644 --- a/tests/helpers/users.py +++ b/tests/helpers/users.py @@ -149,7 +149,7 @@ def managed_user_test_demo_how_to_use_managed_user(ansible_zos_module): Who am I AFTER asking for a managed user = LJBXMONV """ - def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: str = None, pyz_path: str = None, pythonpath: str = None, volumes: str = None, hostpattern: str = None) -> None: + def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: str = None, pyz_path: str = None, pythonpath: str = None, volumes: str = None, python_interpreter: str=None, hostpattern: str = None) -> None: """ Initialize class with necessary parameters. @@ -168,6 +168,7 @@ def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: s self._pyz_path = pyz_path self._pythonpath = pythonpath self._volumes = volumes + self._python_interpreter = python_interpreter self._hostpattern = "all" # can also get it from options host_pattern self._managed_racf_user = None self._managed_user_group = None @@ -183,6 +184,7 @@ def from_fixture(cls, pytest_module_fixture, pytest_interpreter_fixture): inventory_hosts = pytest_module_fixture["options"]["inventory_manager"]._inventory.hosts inventory_list = list(inventory_hosts.values())[0].vars.get('ansible_python_interpreter').split(";") environment_vars = pytest_interpreter_fixture[0] + python_interpreter = pytest_interpreter_fixture[1] zoau_path = environment_vars.get("ZOAU_HOME") pythonpath = environment_vars.get("PYTHONPATH") @@ -190,10 +192,10 @@ def from_fixture(cls, pytest_module_fixture, pytest_interpreter_fixture): # TODO: To make this dynamic, we need to update AC and then also test with the new fixture because # the legacy fixture is using a VOLUMES keyword while raw fixture uses extra_args. Best to move - # volumes to extra_args. + # volumes to extra_args. volumes = "000000,222222" hostpattern = pytest_module_fixture["options"]["host_pattern"] - return cls(model_user, remote_host, zoau_path, pyz_path, pythonpath, volumes, hostpattern) + return cls(model_user, remote_host, zoau_path, pyz_path, pythonpath, volumes, python_interpreter, hostpattern) def _connect(self, remote_host:str , model_user: str, command: str) -> List[str]: @@ -440,6 +442,82 @@ def execute_managed_user_test(self, managed_user_test_case: str, debug: bool = F print(result.stdout) + def execute_managed_user_become_test(self, managed_user_test_case: str, become_method: dict[str, str], debug: bool = False, verbose: bool = False, managed_user_type: ManagedUserType = None) -> None: + """ + Executes the test case using articulated pytest options when the test case needs a managed user. This is required + to execute any test that needs a manage user, a wrapper test case should call this method, the 'managed_user_test_case' + must begin with 'managed_user_' as opposed to 'test_', this because the pytest command built will override the ini + with this value. For running playbooks with become method. + + Parameters + ---------- + managed_user_test_case (str) + The managed user test case that begins with 'managed_user_' + become_method (Dict[str, str]) + Key value pair of user command and options to set playbook for become method. + debug (str) + Enable verbose output for pytest, the equivalent command line option of '-s'. + verbose (str) + Enables pytest verbosity level 4 (-vvvv) + + Raises + ------ + Exception - if the test case fails (non-zero RC from pytest/subprocess), the stdout and stderr are returned for evaluation. + ValueError - if the managed user is not created, you must call `self._managed_racf_user()`. + + See Also + -------- + :py:member:`_create_managed_user` required before this function can be used as a managed user needs to exist. + """ + + if managed_user_test_case is None or not managed_user_test_case.startswith("managed_user_"): + raise ValueError("Test cases using a managed user must begin with 'managed_user_' to be collected for execution.") + + # if not self._managed_racf_user: + # raise ValueError("No managed user has been created, please ensure that the method `self._managed_racf_user()` has been called prior.") + + self._create_managed_user(managed_user_type) + + # Get the file path of the caller function + calling_test_path = inspect.getfile(inspect.currentframe().f_back) + + # Get the test case name that this code is being run from, this is not an function arg. + # managed_user_test_case = inspect.stack()[1][3] + + testcase = f"{calling_test_path}::{managed_user_test_case}" + # hostpattern = "all" # can also get it from options host_pattern + capture = " -s" + verbosity = " -vvvv" + + escaped_user = re.escape(self._managed_racf_user) + + inventory: dict [str, str] = {} + inventory.update({'host': self._remote_host}) + inventory.update({'user': self._managed_racf_user}) + inventory.update({'zoau': self._zoau_path}) # get this from fixture + inventory.update({'pyz': self._pyz_path}) # get this from fixture + inventory.update({'python_interpreter': self._python_interpreter}) # get this from fixture + inventory.update({'ssh_key': f"/tmp/{escaped_user}/id_rsa"}) + extra_args = {} + extra_args.update({'extra_args':{'volumes':self._volumes.split(",")}}) # get this from fixture + inventory.update(extra_args) + + node_inventory = json.dumps(inventory) + + # Get options for become method + user = become_method["user"] + method = become_method["method"] + promp = become_method["promp"] + key = become_method["key"] + # Carefully crafted 'pytest' command to be allow for it to be called from anther test driven by pytest and uses the zinventory-raw fixture. + pytest_cmd = f"""pytest {testcase} --override-ini "python_functions=managed_user_" --host-pattern={self._hostpattern}{capture if debug else ""}{verbosity if verbose else ""} --zinventory-raw='{node_inventory}' --user_adm {user} --user_method {method} --ansible_promp '{promp}' --password {key}""" + result = subprocess.run(pytest_cmd, capture_output=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True) + if result.returncode != 0: + raise Exception(result.stdout + result.stderr) + else: + print(result.stdout) + + def delete_managed_user(self) -> None: """ Delete managed user from z/OS managed node. Performs clean up of the remote