diff --git a/sssd_test_framework/hosts/base.py b/sssd_test_framework/hosts/base.py index 6128a42b..154d7a51 100644 --- a/sssd_test_framework/hosts/base.py +++ b/sssd_test_framework/hosts/base.py @@ -314,3 +314,34 @@ def get_package_version(self, package: str = "sssd", raise_on_error: bool = True vers["update"] = int(v_match.group(5)) if v_match.group(5) else 0 vers["release"] = v_match.group(6) if v_match.group(6) else "" return vers + + def compare_package_version(self, other_version: dict, package: str = "sssd") -> int: + """ + Compare installed package version with other version. + + :param other_version: Version dictionary to compare + keys: major, minor, patch, prerelease, update, release + :param package: Package name (default: sssd) + :return: -1 if installed < other, 0 if equal, 1 if installed > other + """ + + def version_tuple(ver): + # Compose a tuple for comparable versioning, keeping prerelease (str) sortable + return ( + ver.get("major", 0), + ver.get("minor", 0), + ver.get("patch", 0), + ver.get("prerelease", ""), + ver.get("update", 0), + ver.get("release", ""), + ) + + installed_ver = self.get_package_version(package) + t_installed = version_tuple(installed_ver) + t_other = version_tuple(other_version) + if t_installed < t_other: + return -1 + elif t_installed > t_other: + return 1 + else: + return 0 diff --git a/sssd_test_framework/roles/client.py b/sssd_test_framework/roles/client.py index 1851daba..df02db98 100644 --- a/sssd_test_framework/roles/client.py +++ b/sssd_test_framework/roles/client.py @@ -10,7 +10,7 @@ from ..utils.automount import AutomountUtils from ..utils.gdm import GDM from ..utils.ldb import LDBUtils -from ..utils.local_users import LocalUsersUtils +from ..utils.local_users import LocalGroup, LocalSudoRule, LocalUser, LocalUsersUtils from ..utils.realmd import RealmUtils from ..utils.sbus import DBUSDestination, DBUSKnownBus from ..utils.smartcard import SmartCardUtils @@ -156,3 +156,37 @@ def sss_ssh_authorizedkeys(self, *args: str) -> ProcessResult: :rtype: ProcessResult """ return self.host.conn.exec(["sss_ssh_authorizedkeys", *args], raise_on_error=False) + + def user(self, name: str) -> LocalUser: + """ + Get user object. + + :param name: User name. + :type name: str + :return: New user object. + :rtype: LocalUser + """ + + return LocalUser(self.local, name) + + def group(self, name: str) -> LocalGroup: + """ + Get group object. + :param name: Group name. + :type name: str + :return: New group object. + :rtype: LocalGroup + """ + + return LocalGroup(self.local, name) + + def sudorule(self, name: str) -> LocalSudoRule: + """ + Get sudo rule object. + :param name: Sudo rule name. + :type name: str + :return: New sudo rule object. + :rtype: LocalSudoRule + """ + + return LocalSudoRule(self.local, name) diff --git a/sssd_test_framework/utils/authentication.py b/sssd_test_framework/utils/authentication.py index dc0804ef..e65224b3 100644 --- a/sssd_test_framework/utils/authentication.py +++ b/sssd_test_framework/utils/authentication.py @@ -1100,6 +1100,38 @@ def run(self, username: str, password: str | None = None, *, command: str) -> bo return result.rc == 0 + def run_advanced( + self, username: str, password: str | None = None, *, parameters: list[str] | None = None, command: str + ) -> ProcessResult: + """ + Execute sudo command with parameters. + + :param username: Username that calls sudo. + :type username: str + :param password: User password, defaults to None + :type password: str | None, optional + :param parameters: List of parameters to sudo. + :type parameters: list[str] | None + :param command: Command to execute (make sure to properly escape any quotes). + :type command: str + :return: Command result. + :rtype: ProcessResult + """ + if parameters is None: + parameters = [] + if password is not None: + parameters.append("--stdin") + if password is not None: + result = self.host.conn.run( + f'su - "{username}" -c "sudo {" ".join(parameters)} {command}"', input=password, raise_on_error=False + ) + else: + result = self.host.conn.run( + f'su - "{username}" -c "sudo {" ".join(parameters)} {command}"', raise_on_error=False + ) + + return result + def list(self, username: str, password: str | None = None, *, expected: list[str] | None = None) -> bool: """ List commands that the user can run under sudo. diff --git a/sssd_test_framework/utils/local_users.py b/sssd_test_framework/utils/local_users.py index 067fc8d7..7d158c8a 100644 --- a/sssd_test_framework/utils/local_users.py +++ b/sssd_test_framework/utils/local_users.py @@ -2,6 +2,8 @@ from __future__ import annotations +from typing import Any + import jc from pytest_mh import MultihostHost, MultihostUtility from pytest_mh.cli import CLIBuilder, CLIBuilderArgs @@ -12,6 +14,7 @@ "LocalGroup", "LocalUser", "LocalUsersUtils", + "LocalSudoRule", ] @@ -35,6 +38,7 @@ def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: self.fs: LinuxFileSystem = fs self._users: list[str] = [] self._groups: list[str] = [] + self._sudorules: list[LocalSudoRule] = [] def teardown(self) -> None: """ @@ -53,6 +57,9 @@ def teardown(self) -> None: if cmd: self.host.conn.run("set -e\n\n" + cmd) + for rule in self._sudorules[:]: + rule.delete() + super().teardown() def user(self, name: str) -> LocalUser: @@ -129,6 +136,12 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: self.util = util self.name = name + def __str__(self): + """ + Returns a string representation of the LocalUser. + """ + return self.name + def add( self, *, @@ -276,6 +289,12 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: self.util = util self.name = name + def __str__(self): + """ + Returns a string representation of the LocalGroup. + """ + return self.name + def add( self, *, @@ -421,3 +440,178 @@ def remove_members(self, members: list[LocalUser]) -> LocalGroup: self.util.host.conn.run("set -ex\n" + cmd, log_level=ProcessLogLevel.Error) return self + + +class LocalSudoRule(object): + """ + Local sudo rule management. + """ + + default_user: str = "ALL" + default_host: str = "ALL" + default_command: str = "ALL" + + def __init__(self, util: LocalUsersUtils, name: str) -> None: + """ + :param util: LocalUsersUtils util object. + :param name: Sudo rule name. + :type name: str + """ + self.name = name + self.util = util + self.__rule: dict[str, Any] = dict() + self.filename: str | None = None + self.rule_str: str | None = None + + def __str__(self): + """ + Returns a string representation of the LocalSudoRule. + """ + if self.rule_str: + return self.rule_str + else: + return self.name + + @staticmethod + def _format_list(item: str | Any | list[str | Any], add_percent: bool = False) -> str: + """ + Format the item as a string. + + :param item: object to be formatted + :type item: str | Any| list[str | Any] + :param add_percent: If true, prepend % to the item, defaults to False + :type add_percent: bool, optional + :return: Formatted string. + :rtype: str + """ + if isinstance(item, list): + result = ", ".join(f"%{str(x)}" if isinstance(x, LocalGroup) and add_percent else str(x) for x in item) + else: + if isinstance(item, LocalGroup) and add_percent: + result = f"%{str(item)}" + else: + result = str(item) + return result + + def add( + self, + *, + user: str | LocalUser | LocalGroup | list[str | LocalUser | LocalGroup] | Any | None = default_user, + host: str | list[str] | Any | None = default_host, + command: str | list[str] | Any | None = default_command, + option: str | list[str] | None = None, + runasuser: str | LocalUser | list[str | LocalUser] | None = None, + runasgroup: str | LocalGroup | list[str | LocalGroup] | None = None, + order: int | None = None, + nopasswd: bool | None = None, + ) -> LocalSudoRule: + """ + Create new sudo rule. + + :param user: sudoUser attribute, defaults to ALL + :type user: str | LocalUser | LocalGroup | list[str | LocalUser | LocalGroup] + :param host: sudoHost attribute, defaults to ALL + :type host: str | list[str], + :param command: sudoCommand attribute, defaults to ALL + :type command: str | list[str], + :param option: sudoOption attribute, defaults to None + :type option: str | list[str] | None, optional + :param runasuser: sudoRunAsUser attribute, defaults to None + :type runasuser: str | LocalUser | list[str | LocalUser] | None, optional + :param runasgroup: sudoRunAsGroup attribute, defaults to None + :type runasgroup: str | LocalGroup | list[str | LocalGroup] | None, optional + :param order: sudoOrder attribute, defaults to None + :type order: int | None, optional + :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) + :type nopasswd: bool | None, optional + :return: New sudo rule object. + :rtype: LocalSudoRule + """ + orderstr = f"{order:02d}" if order is not None else str(len(self.util._sudorules)) + if self.filename is None: + self.filename = f"{orderstr}_{self.name}" + + # Remember arguments so we can use them in modify if needed + self.__rule = dict[str, Any]( + user=user, + host=host, + command=command, + option=option, + runasuser=runasuser, + runasgroup=runasgroup, + order=order, + nopasswd=nopasswd, + ) + run_as_str = "" + if runasuser or runasgroup: + run_as_str += "(" + if runasuser: + run_as_str += LocalSudoRule._format_list(runasuser) + if runasgroup: + run_as_str += f":{LocalSudoRule._format_list(runasgroup)}" + run_as_str += ")" + user_str = LocalSudoRule._format_list(user, add_percent=True) + host_str = LocalSudoRule._format_list(host) + tagspec_str = "NOPASSWD:" if nopasswd else "" + command_str = LocalSudoRule._format_list(command) + rule_str = f"{user_str} {host_str}={run_as_str} {tagspec_str} {command_str}\n" + self.rule_str = rule_str + self.util.fs.write(f"/etc/sudoers.d/{self.filename}", self.rule_str) + self.util._sudorules.append(self) + return self + + def modify( + self, + *, + user: str | LocalUser | LocalGroup | list[str | LocalUser | LocalGroup] | None = None, + host: str | list[str] | None = None, + command: str | list[str] | None = None, + option: str | list[str] | None = None, + runasuser: str | LocalUser | list[str | LocalUser] | None = None, + runasgroup: str | LocalGroup | list[str | LocalGroup] | None = None, + order: int | None = None, + nopasswd: bool | None = None, + ) -> LocalSudoRule: + """ + Modify existing Local sudo rule. + + :param user: sudoUser attribute, defaults to None + :type user: str | LocalUser | LocalGroup | list[str | LocalUser | LocalGroup] | None, optional + :param host: sudoHost attribute, defaults to None + :type host: str | list[str] | None, optional + :param command: sudoCommand attribute defaults to None + :type command: str | list[str] | None, optional + :param option: sudoOption attribute, defaults to None + :type option: str | list[str] | None, optional + :param runasuser: sudoRunAsUser attribute, defaults to None + :type runasuser: str | LocalUser | list[str | LocalUser] | None, optional + :param runasgroup: sudoRunAsGroup attribute, defaults to None + :type runasgroup: str | LocalGroup | list[str | LocalGroup] | None, optional + :param order: sudoOrder attribute, defaults to None + :type order: int | None, optional + :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) + :type nopasswd: bool | None, optional + :return: New sudo rule object. + :rtype: LocalSudoRule + """ + self.delete() + self.add( + user=user if user is not None else self.__rule.get("user"), + host=host if host is not None else self.__rule.get("host"), + command=command if command is not None else self.__rule.get("command"), + option=option if option is not None else self.__rule.get("option"), + runasuser=runasuser if runasuser is not None else self.__rule.get("runasuser"), + runasgroup=runasgroup if runasgroup is not None else self.__rule.get("runasgroup"), + order=order if order is not None else self.__rule.get("order"), + nopasswd=nopasswd if nopasswd is not None else self.__rule.get("nopasswd"), + ) + return self + + def delete(self) -> None: + """ + Delete local sudo rule. + """ + if self.filename: + self.util.fs.rm(f"/etc/sudoers.d/{self.filename}") + if self in self.util._sudorules: + self.util._sudorules.remove(self)