diff --git a/samples/python/second/src/.gitignore b/samples/python/second/src/.gitignore new file mode 100644 index 0000000..9a66ff3 --- /dev/null +++ b/samples/python/second/src/.gitignore @@ -0,0 +1,111 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +*.py text eol=lf \ No newline at end of file diff --git a/samples/python/second/src/commands/common.py b/samples/python/second/src/commands/common.py new file mode 100644 index 0000000..37911e8 --- /dev/null +++ b/samples/python/second/src/commands/common.py @@ -0,0 +1,57 @@ +import click + +def common(function): + """Add common options to click commands""" + function = click.option( + "--input", + "input_json", + help="JSON input data with settings", + required=False, + type=str, + )(function) + + function = click.option( + "--username", + help="Username for authentication", + type=str, + required=False, + )(function) + + function = click.option( + "--password", + help="Password for authentication", + type=str, + required=False, + hide_input=True, + )(function) + + # Add additional user properties + function = click.option( + "--uid", + help="User ID number", + type=int, + required=False, + )(function) + + function = click.option( + "--gid", + help="Primary group ID", + type=int, + required=False, + )(function) + + function = click.option( + "--home", + help="Home directory path", + type=str, + required=False, + )(function) + + function = click.option( + "--shell", + help="Login shell path", + type=str, + required=False, + )(function) + + return function \ No newline at end of file diff --git a/samples/python/second/src/commands/delete.py b/samples/python/second/src/commands/delete.py new file mode 100644 index 0000000..1a20b4c --- /dev/null +++ b/samples/python/second/src/commands/delete.py @@ -0,0 +1,40 @@ +import click +import sys +from typing import Optional +from utils.utils import delete_user, collect_input_data +from utils.logger import dfl_logger as Logger + + +@click.command() +@click.option("--username", "-u", help="Username of the user to delete", type=str) +@click.option("--input", "input_json", help="JSON input with username", type=str) +@click.option( + "-w", + "--what-if", + is_flag=True, + help="Show what would happen without making changes", +) +def delete(username: Optional[str], input_json: Optional[str], what_if: bool): + """ + Delete a Linux user. + + This command deletes a specified Linux user from the system. + """ + try: + data = collect_input_data(username=username, input_json=input_json) + + username_to_delete = data.get("username") + + if not username_to_delete: + Logger.error("Username is required to delete a user.", target="delete") + sys.exit(1) + + Logger.info( + f"Processing delete request for user: {username_to_delete}", target="delete" + ) + + delete_user(username=username_to_delete, what_if=what_if) + + except Exception as e: + Logger.error(f"Failed to process delete command: {str(e)}", target="delete") + sys.exit(1) diff --git a/samples/python/second/src/commands/export.py b/samples/python/second/src/commands/export.py new file mode 100644 index 0000000..0e66e4d --- /dev/null +++ b/samples/python/second/src/commands/export.py @@ -0,0 +1,13 @@ +import click +from models.dsc_user import User + +@click.command() +def export(): + """ + Export Linux user information. + + This command exports information about Linux users in JSON format. + """ + + user = User() + user.export() \ No newline at end of file diff --git a/samples/python/second/src/commands/get.py b/samples/python/second/src/commands/get.py new file mode 100644 index 0000000..3596fb1 --- /dev/null +++ b/samples/python/second/src/commands/get.py @@ -0,0 +1,28 @@ +import click +from utils.utils import get_input, get_requested_properties +from commands.common import common + + + +@click.command() +@common +def get(username, password, input_json, uid, gid, home, shell): + """ + Get Linux user information. + + This command takes either command line parameters or a JSON input to retrieve + information about a Linux user. + """ + user = get_input( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell + ) + + requested_properties = get_requested_properties(user) + + user.get_current_state(requested_properties) \ No newline at end of file diff --git a/samples/python/second/src/commands/root.py b/samples/python/second/src/commands/root.py new file mode 100644 index 0000000..9f7f8ca --- /dev/null +++ b/samples/python/second/src/commands/root.py @@ -0,0 +1,27 @@ +import click +import sys +import os +from commands.get import get +from commands.set import set +from commands.delete import delete +from commands.export import export + +@click.group(invoke_without_command=True) +@click.pass_context +def root_command(ctx): + """Linux User Management CLI.""" + + ctx.ensure_object(dict) + + if os.name != 'nt' and hasattr(os, 'geteuid') and os.geteuid() != 0: + click.echo("This program requires root privileges to manage users. Please run with sudo.") + sys.exit(1) + + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + +# Register all available commands +root_command.add_command(get, name="get") +root_command.add_command(set, name="set") +root_command.add_command(delete, name="delete") +root_command.add_command(export, name="export") \ No newline at end of file diff --git a/samples/python/second/src/commands/set.py b/samples/python/second/src/commands/set.py new file mode 100644 index 0000000..3754e9b --- /dev/null +++ b/samples/python/second/src/commands/set.py @@ -0,0 +1,37 @@ +import click +from typing import Optional +from commands.common import common +from utils.utils import get_input + +@click.command() +@common +@click.option('-w', '--what-if', is_flag=True, help='Show what would happen without making changes') +def set( + username: Optional[str], + password: Optional[str], + input_json: Optional[str], + uid: Optional[int], + gid: Optional[int], + home: Optional[str], + shell: Optional[str], + what_if: bool +): + """ + Create or update a Linux user. + + This command takes either command line parameters or a JSON input to create or + modify a Linux user. If the user already exists, it will be updated. + + Note: Group membership is read-only and cannot be modified with this command. + """ + user = get_input( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell + ) + + user.modify(what_if=what_if) \ No newline at end of file diff --git a/samples/python/second/src/main.py b/samples/python/second/src/main.py new file mode 100644 index 0000000..ae05d7b --- /dev/null +++ b/samples/python/second/src/main.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +from commands.root import root_command + +if __name__ == "__main__": + root_command() \ No newline at end of file diff --git a/samples/python/second/src/models/dsc_user.py b/samples/python/second/src/models/dsc_user.py new file mode 100644 index 0000000..02645c4 --- /dev/null +++ b/samples/python/second/src/models/dsc_user.py @@ -0,0 +1,180 @@ +import json +from typing import Dict, Any +from utils.utils import check_user_exist, create_user, update_user, get_passwd_entry, get_user_groups +import sys +import pwd +from utils.logger import dfl_logger as Logger + +class User: + """Linux User resource that can be managed via DSC.""" + def __init__(self): + self.username = "" + self.password = "" + self.uid = None + self.gid = None + self.home = "" + self.shell = "" + self.groups = None + self._exist = False + + def export(self): + """Export all users on the system.""" + result = [] + + try: + for passwd_entry in pwd.getpwall(): + username = passwd_entry.pw_name + user = User() + user.username = username + user.uid = passwd_entry.pw_uid + user.gid = passwd_entry.pw_gid + user.home = passwd_entry.pw_dir + user.shell = passwd_entry.pw_shell + user._exist = True + + # Get groups for this user + try: + user_groups = get_user_groups(username) + user.groups = user_groups + except Exception: + user.groups = [] + + result.append(self.to_dict(user)) + + print(json.dumps(result, separators=(',', ':'))) + + except Exception as e: + Logger.error(f"Error occurred while exporting users: {e}", target="Export") + sys.exit(1) + + + def get_current_state(self, requested_properties=None): + result = {} + + try: + user = check_user_exist(self.username) + + if user is not None: + self._exist = True + result["_exist"] = True + + if user: + passwd_entry = get_passwd_entry(self.username) + user_groups = get_user_groups(self.username) + + self.uid = user.get('uid', 0) + self.gid = user.get('gid', 0) + self.groups = user_groups + self.home = passwd_entry.get('home', '') if passwd_entry else '' + self.shell = passwd_entry.get('shell', '/bin/bash') if passwd_entry else '/bin/bash' + + if requested_properties: + for prop_name, expected_value in requested_properties: + if prop_name in ['username', '_exist']: + continue + current_value = getattr(self, prop_name) + if expected_value != current_value: + self._exist = False + break + + if requested_properties: + for prop_name, _ in requested_properties: + if hasattr(self, prop_name): + result[prop_name] = getattr(self, prop_name) + else: + result.update({ + "uid": self.uid, + "gid": self.gid, + "groups": self.groups, + "home": self.home, + "shell": self.shell + }) + + if not self._exist: + print(json.dumps({ + "username": self.username, + "_exist": False, + })) + else: + print(self.to_json(self)) + + except Exception as e: + Logger.error(f"Error occurred while getting current state for user '{self.username}': {e}", target="GetCurrentState") + sys.exit(1) + + def modify(self, what_if: bool = False) -> None: + exists = check_user_exist(self.username) + + if what_if: + if exists: + print(json.dumps({ + "username": self.username, + "_metadata": { + "whatIf": [ + f"User '{self.username}' exists and will be updated." + ] + } + })) + else: + print(json.dumps({ + "username": self.username, + "_metadata": { + "whatIf": [ + f"User '{self.username}' does not exist and will be created." + ] + } + })) + return + + if exists: + update_user( + username=self.username, + uid=self.uid, + gid=self.gid, + home=self.home, + shell=self.shell, + password=self.password + ) + else: + create_user( + username=self.username, + uid=self.uid, + gid=self.gid, + home=self.home, + shell=self.shell, + password=self.password + ) + + @classmethod + def to_dict(cls, item: 'User') -> Dict[str, Any]: + """Convert User to dictionary for serialization.""" + result = { + "username": item.username, + "uid": item.uid, + "gid": item.gid, + "home": item.home, + "shell": item.shell, + "groups": item.groups, + "_exist": item._exist + } + + return result + + @classmethod + def to_json(cls, user: 'User') -> str: + return json.dumps(cls.to_dict(user), separators=(',', ':')) + + @classmethod + def from_json(cls, json_str: str) -> 'User': + """Create a User object from JSON string.""" + data = json.loads(json_str) + user = cls() + user.username = data.get('username', '') + user.password = data.get('password', '') + user.uid = data.get('uid') + user.gid = data.get('gid') + user.home = data.get('home') + user.shell = data.get('shell', '/bin/bash') + user.groups = data.get('groups', []) + user._exist = data.get('_exist', False) + return user \ No newline at end of file diff --git a/samples/python/second/src/requirements.txt b/samples/python/second/src/requirements.txt new file mode 100644 index 0000000..ff94205 --- /dev/null +++ b/samples/python/second/src/requirements.txt @@ -0,0 +1,3 @@ +# Core dependencies +click>=8.0.0 +jsonschema>=4.0.0 \ No newline at end of file diff --git a/samples/python/second/src/schema/schema.py b/samples/python/second/src/schema/schema.py new file mode 100644 index 0000000..81269a5 --- /dev/null +++ b/samples/python/second/src/schema/schema.py @@ -0,0 +1,48 @@ +import os +import json +import sys +import jsonschema +from typing import Dict, Any +from utils.logger import dfl_logger as logger + +# Default schema with username as required field +DEFAULT_USER_SCHEMA = { + "type": "object", + "properties": { + "username": {"type": "string"}, + "password": {"type": ["string", "null"]}, + "uid": {"type": ["integer", "null"]}, + "gid": {"type": ["integer", "null"]}, + "home": {"type": ["string", "null"]}, + "shell": {"type": "string"}, + "groups": { + "type": "array", + "items": {"type": "string"}, + "readOnly": True + } + }, + "required": ["username"], + "additionalProperties": False +} + +def get_schema() -> Dict[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), '..', '.dsc.resource.json') + + if os.path.exists(schema_path): + try: + with open(schema_path, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.error(f"Failed to load schema from {schema_path}: {e}") + sys.exit(1) + + return DEFAULT_USER_SCHEMA + +def validate_user_data(data: Dict[str, Any]) -> None: + schema = get_schema() + + try: + jsonschema.validate(instance=data, schema=schema) + except jsonschema.exceptions.ValidationError as e: + logger.error(f"Schema validation failed: {e.message}") + sys.exit(1) \ No newline at end of file diff --git a/samples/python/second/src/tuxctl.dsc.resource.json b/samples/python/second/src/tuxctl.dsc.resource.json new file mode 100644 index 0000000..a44569d --- /dev/null +++ b/samples/python/second/src/tuxctl.dsc.resource.json @@ -0,0 +1,60 @@ +{ + "$schema": "https://aka.ms/dsc/schemas/v3/bundled/resource/manifest.json", + "type": "DSC.UserManagement/TuxCtl", + "description": "DSC resource to manage users on Linux", + "tags": [ + "Linux", + "UserManagement" + ], + "version": "0.1.0", + "schema": { + "embedded": { + "$schema": "https://json-schema.org/draft/2020-12/schema#", + "$id": "https://raw.githubusercontent.com/PowerShell/DSC/main/schemas/resources/DSC/UserManagement/TuxCtl/v0.1.0/schema.json", + "title": "Linux User Management", + "description": "Manages Linux users", + "type": "object", + "required": ["username"], + "additionalProperties": false, + "properties": { + "username": { + "type": "string", + "title": "Username", + "description": "Defines the name of the Linux user" + }, + "password": { + "type": ["string", "null"], + "title": "Password", + "description": "The user's password" + }, + "uid": { + "type": ["integer", "null"], + "title": "User ID", + "description": "The user's numeric ID" + }, + "gid": { + "type": ["integer", "null"], + "title": "Group ID", + "description": "The user's primary group ID" + }, + "home": { + "type": ["string", "null"], + "title": "Home Directory", + "description": "The user's home directory path" + }, + "shell": { + "type": "string", + "title": "Shell", + "description": "The user's login shell" + }, + "groups": { + "type": "array", + "items": {"type": "string"}, + "readOnly": true, + "title": "Groups", + "description": "List of groups the user belongs to" + } + } + } + } +} \ No newline at end of file diff --git a/samples/python/second/src/utils/logger.py b/samples/python/second/src/utils/logger.py new file mode 100644 index 0000000..0029ca8 --- /dev/null +++ b/samples/python/second/src/utils/logger.py @@ -0,0 +1,102 @@ +import json +import sys +import datetime +import inspect +from typing import Dict, Any +from enum import Enum + + +class LogLevel(Enum): + """Enumeration for log levels""" + TRACE = "TRACE" + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + +class Logger: + """ + A structured JSON logger class that outputs messages to stderr. + + Features: + - JSON formatted output + - Configurable log levels + - Automatic timestamp generation + - Caller information tracking + - Customizable output stream + """ + + def __init__(self, output_stream=None, include_caller_info: bool = True): + self.output_stream = output_stream or sys.stderr + self.include_caller_info = include_caller_info + + def _get_caller_info(self) -> Dict[str, Any]: + if not self.include_caller_info: + return {} + + try: + # Get the frame of the caller (skip internal methods) + frame = inspect.currentframe() + for _ in range(3): # Skip _get_caller_info, _log, and the log level method + frame = frame.f_back + if frame is None: + break + + if frame: + return { + "file": frame.f_code.co_filename.split('\\')[-1], # Just filename + "line": frame.f_lineno, + "function": frame.f_code.co_name + } + except Exception: + pass + + return {} + + def _log(self, level: LogLevel, message: str, target: str = None, **kwargs): + log_entry = { + "timestamp": datetime.datetime.now().isoformat() + "Z", + "level": level.value, + "fields": {"message": message}, + "target": target or "unknown" + } + + # Add caller information if enabled + caller_info = self._get_caller_info() + if caller_info: + log_entry["line_number"] = caller_info.get("line", "Unknown") + log_entry["file"] = caller_info.get("file", "Unknown") + log_entry["function"] = caller_info.get("function", "Unknown") + + # Add any additional fields to the fields section + if kwargs: + log_entry["fields"].update(kwargs) + + try: + json_output = json.dumps(log_entry, separators=(",", ":")) + self.output_stream.write(json_output + '\n') + self.output_stream.flush() + except Exception as e: + # Fallback to basic error output + fallback_msg = f"[LOG ERROR] Failed to write log: {str(e)}\n" + self.output_stream.write(fallback_msg) + self.output_stream.flush() + + def trace(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.TRACE, message, target, **kwargs) + + def debug(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.DEBUG, message, target, **kwargs) + + def info(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.INFO, message, target, **kwargs) + + def warning(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.WARNING, message, target, **kwargs) + + def error(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.ERROR, message, target, **kwargs) + + +dfl_logger = Logger() \ No newline at end of file diff --git a/samples/python/second/src/utils/utils.py b/samples/python/second/src/utils/utils.py new file mode 100644 index 0000000..7e7ceba --- /dev/null +++ b/samples/python/second/src/utils/utils.py @@ -0,0 +1,513 @@ +import sys +import json +import subprocess +import grp +import pwd +import os +from schema.schema import validate_user_data +from typing import Dict, Any, Optional +from utils.logger import dfl_logger as Logger + + +def read_stdin() -> str: + return sys.stdin.read().strip() + + +def parse_json_input(data: str) -> Dict[str, Any]: + try: + return json.loads(data) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON input: {str(e)}") + +def get_input( + username: Optional[str] = None, + password: Optional[str] = None, + input_json: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +): + from models.dsc_user import User + + combined_data = collect_input_data( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell, + ) + + validate_user_data(combined_data) + + user = User() + + for key, value in combined_data.items(): + if hasattr(user, key): + setattr(user, key, value) + + return user + + +def collect_input_data( + username: Optional[str] = None, + password: Optional[str] = None, + input_json: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> Dict[str, Any]: + combined_data = {} + + # Process JSON input if provided + if input_json: + try: + if input_json.startswith("{"): + json_data = parse_json_input(input_json) + else: + with open(input_json, "r") as f: + json_data = json.load(f) + + combined_data.update(json_data) + + except (ValueError, IOError, json.JSONDecodeError) as e: + raise ValueError(f"Failed to parse JSON input: {str(e)}") + else: + # Try to read from stdin if no JSON input was explicitly provided + try: + if not sys.stdin.isatty(): + stdin_data = read_stdin() + if stdin_data: + json_data = parse_json_input(stdin_data) + combined_data.update(json_data) + except (ValueError, EOFError) as e: + pass + + cli_data = {} + + if username is not None: + cli_data["username"] = username + + if password is not None: + cli_data["password"] = password + + if uid is not None: + cli_data["uid"] = uid + + if gid is not None: + cli_data["gid"] = gid + + if home is not None: + cli_data["home"] = home + + if shell is not None: + cli_data["shell"] = shell + + if cli_data: + combined_data.update(cli_data) + + return combined_data + + +def check_user_exist(user_id: int) -> Optional[Dict[str, Any]]: + """ + Check if a user exists in the system and return user information. Can be used with both username and UID. + + Args: + user_id: The username or ID to check + + Returns: + Dictionary with username, uid, and gid if user exists, None otherwise + """ + try: + result = subprocess.run( + ["id", str(user_id)], + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + if result.returncode == 0: + output = result.stdout.strip() + + uid_part = output.split()[0] # uid=1000(username) + username = uid_part.split("(")[1].rstrip(")") + uid = int(uid_part.split("=")[1].split("(")[0]) + + gid_part = output.split()[1] # gid=1000(groupname) + gid = int(gid_part.split("=")[1].split("(")[0]) + + return {"username": username, "uid": uid, "gid": gid} + except (subprocess.SubprocessError, ValueError, IndexError): + pass + return None + + +def create_user( + username: str, + password: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> None: + cmd = ["adduser", "--quiet"] + + if home: + cmd.extend(["--home", home]) + + if shell: + cmd.extend(["--shell", shell]) + if uid is not None: + cmd.extend(["--uid", str(uid)]) + + if gid is not None: + try: + group_name = grp.getgrgid(gid).gr_name + cmd.extend(["--gid", group_name]) + except KeyError: + Logger.error( + "Group ID does not exist", + "user_management", + command="adduser", + gid=gid, + ) + sys.exit(3) + + cmd.append(username) + Logger.info( + "Creating user", "user_management", command="adduser", username=username + ) + + result = run_command( + cmd, prevent_input=True, command_name="Create user", username=username + ) + + if result and result.returncode == 0 and password: + set_password(username, password) + + +def set_password(username: str, password: str) -> None: + """ + Set or update a user's password. + + Args: + username: The username to set password for + password: The new password + """ + if not username or not password: + Logger.error( + "Username and password are required for setting password", + "user_management", + username=username, + ) + sys.exit(4) + + Logger.info("Setting password for user", "user_management", username=username) + + input_str = f"{username}:{password}\n" + + try: + run_command( + ["chpasswd"], + input_str=input_str, + command_name="Set password", + username=username, + ) + except RuntimeError as e: + error_msg = str(e) + sanitized_error = error_msg.replace(password, "********") + Logger.error( + "Failed to set password", + "user_management", + username=username, + error=sanitized_error, + ) + sys.exit(3) + + except subprocess.CalledProcessError as e: + error_msg = e.stderr.decode("utf-8") if hasattr(e, "stderr") else str(e) + # Don't include the password in error messages + sanitized_error = error_msg.replace(password, "********") + Logger.error( + "Failed to set password", + "user_management", + username=username, + error=sanitized_error, + ) + sys.exit(3) + except Exception as e: + Logger.error( + "Error setting password", "user_management", username=username, error=str(e) + ) + sys.exit(3) + + +def update_user( + username: str, + password: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> None: + cmd = ["usermod"] + + if uid is not None: + cmd.extend(["-u", str(uid)]) + + if gid is not None: + cmd.extend(["-g", str(gid)]) + + if home: + cmd.extend(["-d", home, "-m"]) + + if shell: + cmd.extend(["-s", shell]) + + if password: + print("Setting password for user", username) + set_password(username, password) + + if len(cmd) == 1: + Logger.debug( + "No changes specified for user", "user_management", username=username + ) + return + + cmd.append(username) + + Logger.info( + "Updating user", "user_management", command="usermod", username=username + ) + + run_command(cmd, command_name="Update user", username=username) + + +def delete_user(username: str, what_if: bool = False) -> None: + user = check_user_exist(username) + if not user: + return + + cmd = ["userdel"] + cmd.append(username) + + if what_if: + print( + json.dumps( + { + "username": username, + "_metadata": { + "whatIf": [f"User '{username}' exists and will be deleted."] + }, + } + ) + ) + return + + Logger.info( + "Deleting user", "user_management", command="delete_user", username=username + ) + + run_command(cmd, command_name="Delete user", username=username) + + +def export_user() -> str: + """ + Export all users in JSON format. + + Returns: + JSON string containing all user information + """ + users = [] + + try: + for user_entry in pwd.getpwall(): + user_info = { + "username": user_entry.pw_name, + "uid": user_entry.pw_uid, + "gid": user_entry.pw_gid, + "home": user_entry.pw_dir, + "shell": user_entry.pw_shell, + "groups": get_user_groups(user_entry.pw_name), + } + users.append(user_info) + + return json.dumps(users, indent=2) + except Exception as e: + raise RuntimeError(f"Failed to export users: {str(e)}") + + +def get_passwd_entry(username: str) -> Dict[str, Any]: + """ + Get user info from /etc/passwd. + + Args: + username: The username to look up + + Returns: + Dictionary with user information from passwd file + """ + try: + result = subprocess.run( + ["grep", f"^{username}:", "/etc/passwd"], + check=False, + stdout=subprocess.PIPE, + text=True, + ) + if result.returncode == 0: + parts = result.stdout.strip().split(":") + return { + "username": parts[0], + "uid": int(parts[2]), + "gid": int(parts[3]), + "gecos": parts[4], + "home": parts[5], + "shell": parts[6], + } + except Exception: + pass + return {} + + +def get_user_groups(username: str) -> list: + """ + Get all groups the user belongs to. + + Args: + username: The username to check group membership for + + Returns: + List of group names + """ + try: + result = subprocess.run( + ["groups", username], check=False, stdout=subprocess.PIPE, text=True + ) + if result.returncode == 0: + # Output format: "username : group1 group2 group3" + return result.stdout.split(":", 1)[1].strip().split() + + # Alternative method using Python's grp module + groups = [] + for group in grp.getgrall(): + if username in group.gr_mem: + groups.append(group.gr_name) + + try: + pw_entry = pwd.getpwnam(username) + primary_group = grp.getgrgid(pw_entry.pw_gid).gr_name + if primary_group not in groups: + groups.append(primary_group) + except KeyError: + pass + + return groups + except Exception: + pass + return [] + + +def get_requested_properties(user) -> list: + """ + Get a list of properties that have non-None and non-empty values from a user object. + + Args: + user: User object to inspect + + Returns: + List of property names that have values + """ + requested_properties = [] + + for attr in dir(user): + if not attr.startswith("_") and not callable(getattr(user, attr)): + user_value = getattr(user, attr) + if user_value is not None and user_value != "": + requested_properties.append((attr, user_value)) + return requested_properties + + +def run_command( + cmd: list, + input_str: bytes = None, + check: bool = True, + prevent_input: bool = False, + command_name: str = None, + username: str = None, + universal_newlines: bool = True, +) -> subprocess.CompletedProcess: + """ + Run a system command with consistent error handling and logging. + + Args: + cmd: Command and arguments as a list + input_str: Optional input to send to process stdin + check: Whether to check return code and raise exception + prevent_input: Whether to redirect stdin from /dev/null + command_name: Name of command for logging + username: Username for logging + universal_newlines: Convert output to strings + + Returns: + CompletedProcess instance with return code, stdout, stderr + + Raises: + RuntimeError: If command fails and check is True + """ + + try: + kwargs = { + "check": check, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "universal_newlines": universal_newlines, + } + + # If input provided, pass it to process + if input_str: + kwargs["input"] = input_str + + # If preventing input, redirect stdin from /dev/null + if prevent_input: + with open(os.devnull, "r") as DEVNULL: + kwargs["stdin"] = DEVNULL + return subprocess.run(cmd, **kwargs) + else: + return subprocess.run(cmd, **kwargs) + + except subprocess.CalledProcessError as e: + error_msg = e.stderr if universal_newlines else e.stderr.decode("utf-8") + if command_name: + Logger.error( + f"{command_name} failed", + "user_management", + command=cmd[0], + error=error_msg, + username=username, + ) + if check: + Logger.error( + f"Command '{cmd[0]}' failed", + "user_management", + command=cmd[0], + error=error_msg, + username=username, + ) + return e + except Exception as e: + if command_name: + Logger.error( + f"{command_name} failed unexpectedly", + "user_management", + command=cmd[0], + error=str(e), + username=username, + ) + if check: + sys.exit(3) + return None diff --git a/samples/python/second/tests/acceptance.tests.ps1 b/samples/python/second/tests/acceptance.tests.ps1 new file mode 100644 index 0000000..f0a9797 --- /dev/null +++ b/samples/python/second/tests/acceptance.tests.ps1 @@ -0,0 +1,85 @@ +$global:executable = Join-Path (Split-Path -Parent $PSScriptRoot) 'src' 'main.py' + +Write-Verbose -Message "Executable path: $global:executable" -Verbose + +Describe "TuxCtl acceptance tests - Help command" -Skip:(!$IsLinux) { + It "Should display help information when --help is passed" { + $result = & $global:executable --help + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + } +} + +Describe "TuxCtl acceptance tests - Get command" -Skip:(!$IsLinux) { + It "Should get a username using the --username option" { + $result = & sudo $global:executable get --username root | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } + + It "Should work with all options" -Skip:(!($IsLinux)) { + $result = & sudo $global:executable get --username root --uid 0 --gid 0 --home /root --shell /bin/bash | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } + + It "Should work with JSON input" { + $in = @{username = "root"; uid = 0; gid = 0; shell = "/bin/bash"; home = "/root"} | ConvertTo-Json + $result = & sudo $global:executable get --input $in | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } +} + +Describe "TuxCtl acceptance tests - Set command" -Skip:(!$IsLinux) { + It "Should set a username using the --username option" { + & sudo $global:executable set --username testuser --password randompassword + $LASTEXITCODE | Should -Be 0 + + # Check if the user was created + $result = & sudo $global:executable get --username testuser | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result.username | Should -Be "testuser" + } +} + +Describe "TuxCtl acceptance tests - Delete command" -Skip:(!$IsLinux) { + It "Should delete a user using the --username option" { + & sudo $global:executable delete --username testuser + $LASTEXITCODE | Should -Be 0 + + # Check if the user was deleted + $result = & sudo $global:executable get --username testuser | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result._exist | Should -Be $false + } +} + +Describe "TuxCtl acceptance tests - Export command" -Skip:(!$IsLinux) { + It "Should list all users" { + $result = & sudo $global:executable export | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result.Count | Should -BeGreaterThan 1 + } +} \ No newline at end of file