Skip to content

[Enabler][2097]Add_template_for_test_with_privilege_escalation #2118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def pytest_addoption(parser):
Add CLI options and modify options for pytest-ansible where needed.
Note: Set the default to to None, otherwise when evaluating with `request.config.getoption("--zinventory"):`
will always return true because a default will be returned.
New option have been added to the execution of the command to allow the become method.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New option has been added (not have) to allow the become method to apply to the command.

"""
parser.addoption(
"--zinventory",
Expand All @@ -54,6 +55,36 @@ def pytest_addoption(parser):
default=None,
help="Str - dictionary with values {'host': 'ibm.com', 'user': 'root', 'zoau': '/usr/lpp/zoau', 'pyz': '/usr/lpp/IBM/pyz'}",
)
parser.addoption(
"--user_adm",
action="store",
default=None,
help="Str "
)
parser.addoption(
"--user_method",
action="store",
default=None,
help="Str "
)
parser.addoption(
"--ansible_promp",
action="store",
default=None,
help="Str "
)
parser.addoption(
"--password",
action="store",
default=None,
help="Str "
)
parser.addoption(
"--ssh_key",
action="store",
default=None,
help="Str "
)


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -204,3 +235,21 @@ def get_config(request):
""" Call the pytest-ansible plugin to check volumes on the system and work properly a list by session."""
path = request.config.getoption("--zinventory")
yield path

@pytest.fixture(scope="function")
def get_config_raw(request):
""" Call the pytest-ansible plugin to check the options on user manager test cases."""
path = request.config.getoption("--zinventory-raw")
yield path

@pytest.fixture(scope='session')
def get_config_for_become(request):
""" Return as a dict the values to be used on the test cases for become method"""
become_config = {
"user" : request.config.option.user_adm,
"method" : request.config.option.user_method,
"promp" : request.config.option.ansible_promp,
"key" : request.config.option.password,
"ssh_key" : request.config.option.ssh_key
}
return become_config
153 changes: 153 additions & 0 deletions tests/functional/modules/test_zos_fetch_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import pytest
import string
import random
import yaml
import json
import subprocess
import tempfile

from hashlib import sha256
from ansible.utils.hashing import checksum
from datetime import datetime
from shellescape import quote

from shellescape import quote

Expand All @@ -33,6 +37,7 @@
# pylint: disable-next=import-error
from ibm_zos_core.tests.helpers.dataset import get_tmp_ds_name
from ibm_zos_core.tests.helpers.utils import get_random_file_name
from ibm_zos_core.tests.helpers.users import ManagedUserType, ManagedUser

__metaclass__ = type

Expand Down Expand Up @@ -87,6 +92,61 @@
00000003A record
"""

INVENTORY = """all:
hosts:
zvm:
ansible_host: {0}
ansible_ssh_private_key_file: {1}
ansible_user: {2}
ansible_python_interpreter: {3}"""

BECOME_USER="""- hosts: zvm
collections :
- ibm.ibm_zos_core
gather_facts: False
ignore_errors: True
vars:
ZOAU: "{0}"
PYZ: "{1}"
ansible_become_user: {2}

ansible_become_method: {3}
ansible_su_prompt_l10n: {4} {{ansible_become_user}}
environment:
_BPXK_AUTOCVT: "ALL"
ZOAU_HOME: "{0}"
PYTHONPATH: "{0}/lib/{5}"
LIBPATH: "{0}/lib:{1}/lib:/lib:/usr/lib:."
PATH: "{0}/bin:/bin:/usr/lpp/rsusr/ported/bin:/var/bin:/usr/lpp/rsusr/ported/bin:/usr/lpp/java/java180/J8.0_64/bin:{1}/bin:"
_CEE_RUNOPTS: "FILETAG(AUTOCVT,AUTOTAG) POSIX(ON)"
_TAG_REDIR_ERR: "txt"
_TAG_REDIR_IN: "txt"
_TAG_REDIR_OUT: "txt"
LANG: "C"
PYTHONSTDINENCODING: "cp1047"
tasks:
- name: Fetch PDSE member while escalating privileges.
zos_fetch:
src: {6}
dest: /tmp/
flat: true
become: true
"""

ANSIBLE_CFD = """[defaults]
forks = 25
become_password_file = ./key.txt

[connection]
pipelining = True

[ssh_connection]
pipelining = True

[colors]
verbose = green"""

KEY = "{0}"

def extract_member_name(data_set):
start = data_set.find("(")
Expand Down Expand Up @@ -985,3 +1045,96 @@ def test_fetch_uss_file_relative_path_not_present_on_local_machine(ansible_zos_m
finally:
if os.path.exists(dest):
os.remove(dest)


def test_become_option_with_restricted_user(ansible_zos_module, z_python_interpreter, get_config_for_become, capsys):
"""
This tests check the become method it will pass the escalation but fail to execute the module because of lack of
permissions on on the system.
"""
with capsys.disabled():
managed_user = None
managed_user_test_case_name = "managed_user_limited_become_method"
become = get_config_for_become
try:
# Initialize the Managed user API from the pytest fixture.
managed_user = ManagedUser.from_fixture(ansible_zos_module, z_python_interpreter)

# Important: Execute the test case with the managed users execution utility.
# For become method is better to have verbose and debug as False only available for debug options.
managed_user.execute_managed_user_become_test(
managed_user_test_case = managed_user_test_case_name, become_method = become, debug = False,
verbose = False, managed_user_type=ManagedUserType.ZOAU_LIMITED_ACCESS_OPERCMD)

finally:
# Delete the managed user on the remote host to avoid proliferation of users.
managed_user.delete_managed_user()

def managed_user_limited_become_method(get_config_for_become, get_config_raw, capsys):
with capsys.disabled():
ds_name = get_tmp_ds_name()
ds_name = ds_name + "(MEMBER)"

# Get values from the command
adm_user = get_config_for_become["user"]
method = get_config_for_become["method"]
promp = get_config_for_become["promp"]
password = get_config_for_become["key"]
ssh_key = get_config_for_become["ssh_key"]

# Get values from the new configuration file
configuration = json.loads(get_config_raw)
hosts = configuration["host"]
user = configuration["user"]
python_path = configuration["python_interpreter"]
cut_python_path = python_path[:python_path.find('/bin')].strip()
zoau = configuration["zoau"]
python_version = cut_python_path.split('/')[2]

try:
playbook = "playbook.yml"
inventory = "inventory.yml"
ansible_cfd = "ansible.cfd"
key_file = "key.txt"

os.system("echo {0} > {1}".format(quote(BECOME_USER.format(
zoau,
cut_python_path,
adm_user,
method,
promp,
python_version,
ds_name,
)), playbook))

os.system("echo {0} > {1}".format(quote(INVENTORY.format(
hosts,
ssh_key,
user,
python_path
)), inventory))

os.system("echo {0} > {1}".format(quote(ANSIBLE_CFD), ansible_cfd))
os.system("echo {0} > {1}".format(quote(KEY.format(
password
)), key_file))

command = "ansible-playbook -vvv -i {0} {1}".format(
inventory,
playbook
)

result = subprocess.run(
command,
capture_output=True,
shell=True,
timeout=120,
encoding='utf-8'
)

assert result.returncode == 0
finally:
os.remove(playbook)
os.remove(inventory)
os.remove(key_file)
os.remove(ansible_cfd)
80 changes: 78 additions & 2 deletions tests/helpers/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def managed_user_test_demo_how_to_use_managed_user(ansible_zos_module):
Who am I AFTER asking for a managed user = LJBXMONV
"""

def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: str = None, pyz_path: str = None, pythonpath: str = None, volumes: str = None, hostpattern: str = None) -> None:
def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: str = None, pyz_path: str = None, pythonpath: str = None, volumes: str = None, python_interpreter: str=None, hostpattern: str = None) -> None:
"""
Initialize class with necessary parameters.

Expand All @@ -168,6 +168,7 @@ def __init__(self, model_user: str = None, remote_host: str = None, zoau_path: s
self._pyz_path = pyz_path
self._pythonpath = pythonpath
self._volumes = volumes
self._python_interpreter = python_interpreter
self._hostpattern = "all" # can also get it from options host_pattern
self._managed_racf_user = None
self._managed_user_group = None
Expand All @@ -183,6 +184,7 @@ def from_fixture(cls, pytest_module_fixture, pytest_interpreter_fixture):
inventory_hosts = pytest_module_fixture["options"]["inventory_manager"]._inventory.hosts
inventory_list = list(inventory_hosts.values())[0].vars.get('ansible_python_interpreter').split(";")
environment_vars = pytest_interpreter_fixture[0]
python_interpreter = pytest_interpreter_fixture[1]

zoau_path = environment_vars.get("ZOAU_HOME")
pythonpath = environment_vars.get("PYTHONPATH")
Expand All @@ -193,7 +195,7 @@ def from_fixture(cls, pytest_module_fixture, pytest_interpreter_fixture):
# volumes to extra_args.
volumes = "000000,222222"
hostpattern = pytest_module_fixture["options"]["host_pattern"]
return cls(model_user, remote_host, zoau_path, pyz_path, pythonpath, volumes, hostpattern)
return cls(model_user, remote_host, zoau_path, pyz_path, pythonpath, volumes, python_interpreter, hostpattern)


def _connect(self, remote_host:str , model_user: str, command: str) -> List[str]:
Expand Down Expand Up @@ -440,6 +442,80 @@ def execute_managed_user_test(self, managed_user_test_case: str, debug: bool = F
print(result.stdout)


def execute_managed_user_become_test(self, managed_user_test_case: str, become_method: dict[str, str], debug: bool = False, verbose: bool = False, managed_user_type: ManagedUserType = None) -> None:
"""
Executes the test case using articulated pytest options when the test case needs a managed user. This is required
to execute any test that needs a manage user, a wrapper test case should call this method, the 'managed_user_test_case'
must begin with 'managed_user_' as opposed to 'test_', this because the pytest command built will override the ini
with this value. For running playbooks with become method.

Parameters
----------
managed_user_test_case (str)
The managed user test case that begins with 'managed_user_'
become_method (Dict[str, str])
Key value pair of user command and options to set playbook for become method.
debug (str)
Enable verbose output for pytest, the equivalent command line option of '-s'.
verbose (str)
Enables pytest verbosity level 4 (-vvvv)

Raises
------
Exception - if the test case fails (non-zero RC from pytest/subprocess), the stdout and stderr are returned for evaluation.
ValueError - if the managed user is not created, you must call `self._managed_racf_user()`.

See Also
--------
:py:member:`_create_managed_user` required before this function can be used as a managed user needs to exist.
"""

if managed_user_test_case is None or not managed_user_test_case.startswith("managed_user_"):
raise ValueError("Test cases using a managed user must begin with 'managed_user_' to be collected for execution.")

# if not self._managed_racf_user:
# raise ValueError("No managed user has been created, please ensure that the method `self._managed_racf_user()` has been called prior.")

self._create_managed_user(managed_user_type)

# Get the file path of the caller function
calling_test_path = inspect.getfile(inspect.currentframe().f_back)

# Get the test case name that this code is being run from, this is not an function arg.
# managed_user_test_case = inspect.stack()[1][3]

testcase = f"{calling_test_path}::{managed_user_test_case}"
# hostpattern = "all" # can also get it from options host_pattern
capture = " -s"
verbosity = " -vvvv"

inventory: dict [str, str] = {}
inventory.update({'host': self._remote_host})
inventory.update({'user': self._managed_racf_user})
inventory.update({'zoau': self._zoau_path}) # get this from fixture
inventory.update({'pyz': self._pyz_path}) # get this from fixture
inventory.update({'python_interpreter': self._python_interpreter}) # get this from fixture
extra_args = {}
extra_args.update({'extra_args':{'volumes':self._volumes.split(",")}}) # get this from fixture
inventory.update(extra_args)

node_inventory = json.dumps(inventory)

# Get options for become method
user = become_method["user"]
method = become_method["method"]
promp = become_method["promp"]
key = become_method["key"]
ssh_key = become_method["ssh_key"]
# Carefully crafted 'pytest' command to be allow for it to be called from anther test driven by pytest and uses the zinventory-raw fixture.
pytest_cmd = f"""pytest {testcase} --override-ini "python_functions=managed_user_" --host-pattern={self._hostpattern}{capture if debug else ""}{verbosity if verbose else ""} --zinventory-raw='{node_inventory}' --user_adm {user} --user_method {method} --ansible_promp '{promp}' --password {key} --ssh_key '{ssh_key}'"""
result = subprocess.run(pytest_cmd, capture_output=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
if result.returncode != 0:
raise Exception(result.stdout + result.stderr)
else:
print(result.stdout)


def delete_managed_user(self) -> None:
"""
Delete managed user from z/OS managed node. Performs clean up of the remote
Expand Down