diff --git a/sambacc/commands/addc.py b/sambacc/commands/addc.py index 9c03578a..1cbcf02f 100644 --- a/sambacc/commands/addc.py +++ b/sambacc/commands/addc.py @@ -26,7 +26,7 @@ from sambacc import smbconf_api from sambacc import smbconf_samba -from .cli import best_waiter, CommandBuilder, Context, Fail +from .cli import Context, Fail, best_waiter, commands try: import dns @@ -43,10 +43,8 @@ _populated: str = "/var/lib/samba/POPULATED" _provisioned: str = "/etc/samba/smb.conf" -dccommands = CommandBuilder() - -@dccommands.command(name="summary") +@commands.command(name="summary") def summary(ctx: Context) -> None: print("Hello", ctx) @@ -192,7 +190,7 @@ def _prep_krb5_conf(ctx: Context) -> None: shutil.copy("/var/lib/samba/private/krb5.conf", "/etc/krb5.conf") -@dccommands.command(name="run", arg_func=_run_container_args) +@commands.command(name="run", arg_func=_run_container_args) def run(ctx: Context) -> None: _logger.info("Running AD DC container") if _dosetup(ctx, "wait-domain"): diff --git a/sambacc/commands/cli.py b/sambacc/commands/cli.py index 5eab1c7f..2b0d52de 100644 --- a/sambacc/commands/cli.py +++ b/sambacc/commands/cli.py @@ -18,6 +18,8 @@ from collections import namedtuple import argparse +import importlib +import inspect import logging import typing @@ -45,13 +47,11 @@ class Parser(typing.Protocol): def set_defaults(self, **kwargs: typing.Any) -> None: """Set a default value for an argument parser.""" - ... # pragma: no cover def add_argument( self, *args: typing.Any, **kwargs: typing.Any ) -> typing.Any: """Add an argument to be parsed.""" - ... # pragma: no cover Command = namedtuple("Command", "name cmd_func arg_func cmd_help") @@ -74,6 +74,58 @@ def toggle_option(parser: Parser, arg: str, dest: str, helpfmt: str) -> Parser: return parser +def ceph_id( + value: typing.Union[str, dict[str, typing.Any]] +) -> dict[str, typing.Any]: + """Parse a string value into a dict containing ceph id values. + The input should contain name= or rados_id= to identify the kind + of name being provided. As a shortcut a bare name can be provided + and the code will guess at the kind. + """ + if not isinstance(value, str): + return value + if value == "?": + # A hack to avoid putting tons of ceph specific info in the normal + # help output. There's probably a better way to do this but it + # gets the job done for now. + raise argparse.ArgumentTypeError( + "requested help:" + " Specify names in the form" + " --ceph-id=[key=value][,key=value][,...]." + ' Valid keys include "name" to set the exact name and "rados_id"' + ' to specify a name that lacks the "client." prefix (that will' + "automatically get added)." + " Alternatively, specify just the name to allow the system to" + " guess if the name is prefixed already or not." + ) + result: dict[str, typing.Any] = {} + # complex mode + if "=" in value: + for part in value.split(","): + if "=" not in part: + raise argparse.ArgumentTypeError( + f"unexpected value for ceph-id: {value!r}" + ) + key, val = part.split("=", 1) + if key == "name": + result["client_name"] = val + result["full_name"] = True + elif key == "rados_id": + result["client_name"] = val + result["full_name"] = False + else: + b = f"unexpected key {key!r} in value for ceph-id: {value!r}" + raise argparse.ArgumentTypeError(b) + else: + # this shorthand is meant mainly for lazy humans (me) when running test + # images manually. The key-value form above is meant for automation. + result["client_name"] = value + # assume that if the name starts with client. it's the full name and + # avoid having the ceph library double up an create client.client.x. + result["full_name"] = value.startswith("client.") + return result + + def get_help(cmd: Command) -> str: if cmd.cmd_help is not None: return cmd.cmd_help @@ -124,6 +176,30 @@ def dict(self) -> dict[str, Command]: """Return a dict mapping command names to Command object.""" return {c.name: c for c in self._commands} + def include( + self, modname: str, *, package: str = "", check: bool = True + ) -> None: + """Import a python module to add commands to this command builder. + If check is true and no new commands are added by the import, raise an + error. + """ + if modname.startswith(".") and not package: + package = "sambacc.commands" + mod = importlib.import_module(modname, package=package) + if not check: + return + loaded_fns = {c.cmd_func for c in self._commands} + mod_fns = {fn for _, fn in inspect.getmembers(mod, inspect.isfunction)} + if not mod_fns.intersection(loaded_fns): + raise Fail(f"import from {modname} did not add any new commands") + + def include_multiple( + self, modnames: typing.Iterable[str], *, package: str = "" + ) -> None: + """Run the include function on multiple module names.""" + for modname in modnames: + self.include(modname, package=package) + class Context(typing.Protocol): """Protocol type for CLI Context. @@ -138,22 +214,18 @@ class Context(typing.Protocol): @property def cli(self) -> argparse.Namespace: """Return a parsed command line namespace object.""" - ... # pragma: no cover @property def instance_config(self) -> config.InstanceConfig: """Return an instance config based on cli params and env.""" - ... # pragma: no cover @property def require_validation(self) -> typing.Optional[bool]: """Return true if configuration needs validation.""" - ... # pragma: no cover @property def opener(self) -> opener.Opener: """Return an appropriate opener object for this instance.""" - ... # pragma: no cover def best_waiter( diff --git a/sambacc/commands/common.py b/sambacc/commands/common.py new file mode 100644 index 00000000..1658cc93 --- /dev/null +++ b/sambacc/commands/common.py @@ -0,0 +1,281 @@ +# +# sambacc: a samba container configuration tool +# Copyright (C) 2025 John Mulligan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# + +import argparse +import json +import logging +import os +import time +import typing + +from sambacc import config +from sambacc import opener +from sambacc import rados_opener +from sambacc import samba_cmds +from sambacc import url_opener + +from . import skips +from .cli import Parser, ceph_id + +DEFAULT_CONFIG = "/etc/samba/container/config.json" +DEFAULT_JOIN_MARKER = "/var/lib/samba/container-join-marker.json" + + +class CommandContext: + """CLI Context for standard samba-container commands.""" + + def __init__(self, cli_args: argparse.Namespace): + self._cli = cli_args + self._iconfig: typing.Optional[config.InstanceConfig] = None + self.expects_ctdb = False + self._opener: typing.Optional[opener.Opener] = None + + @property + def cli(self) -> argparse.Namespace: + return self._cli + + @property + def instance_config(self) -> config.InstanceConfig: + if self._iconfig is None: + cfgs = self.cli.config or [] + self._iconfig = config.read_config_files( + cfgs, + require_validation=self.require_validation, + opener=self.opener, + ).get(self.cli.identity) + return self._iconfig + + @property + def require_validation(self) -> typing.Optional[bool]: + if self.cli.validate_config in ("required", "true"): + return True + if self.cli.validate_config == "false": + return False + return None + + @property + def opener(self) -> opener.Opener: + if self._opener is None: + self._opener = opener.FallbackOpener([url_opener.URLOpener()]) + return self._opener + + +def split_entries(value: str) -> list[str]: + """Split a env var up into separate strings. The string can be + an "old school" colon seperated list of values (like PATH). + Or, it can be JSON-formatted if it starts and ends with square + brackets ('[...]'). Strings are the only permitted type within + this JSON-formatted list. + """ + out: list[str] = [] + if not isinstance(value, str): + raise ValueError(value) + if not value: + return out + # in order to cleanly allow passing uris as config "paths" we can't + # simply split on colons. Avoid coming up with a hokey custom scheme + # and enter "JSON-mode" if the env var starts and ends with brackets + # hinting it contains a JSON list. + v = value.rstrip(None) # permit trailing whitespace (trailing only!) + if v[0] == "[" and v[-1] == "]": + for item in json.loads(v): + if not isinstance(item, str): + raise ValueError("Variable JSON must be a list of strings") + out.append(item) + else: + # backwards compatibilty mode with `PATH` like syntax + for part in value.split(":"): + out.append(part) + return out + + +def from_env( + ns: argparse.Namespace, + var: str, + ename: str, + default: typing.Any = None, + convert_env: typing.Optional[typing.Callable] = None, + convert_value: typing.Optional[typing.Callable] = str, +) -> None: + """Bind an environment variable to a command line option. This allows + certain cli options to be set from env vars if the cli option is + not directly provided. + """ + value = getattr(ns, var, None) + if not value: + value = os.environ.get(ename, "") + if convert_env is not None: + value = convert_env(value) + if convert_value is not None: + value = convert_value(value) + if value: + setattr(ns, var, value) + + +def env_to_cli(cli: argparse.Namespace) -> None: + """Configure the sambacc default command line option to environment + variable mappings. + """ + from_env( + cli, + "config", + "SAMBACC_CONFIG", + convert_env=split_entries, + convert_value=None, + default=DEFAULT_CONFIG, + ) + from_env( + cli, + "join_files", + "SAMBACC_JOIN_FILES", + convert_env=split_entries, + convert_value=None, + ) + from_env(cli, "identity", "SAMBA_CONTAINER_ID") + from_env(cli, "username", "JOIN_USERNAME") + from_env(cli, "password", "INSECURE_JOIN_PASSWORD") + from_env(cli, "samba_debug_level", "SAMBA_DEBUG_LEVEL") + from_env(cli, "validate_config", "SAMBACC_VALIDATE_CONFIG") + from_env(cli, "ceph_id", "SAMBACC_CEPH_ID", convert_value=ceph_id) + + +def pre_action(cli: argparse.Namespace) -> None: + """Handle debugging/diagnostic related options before the target + action of the command is performed. + """ + if cli.debug_delay: + time.sleep(int(cli.debug_delay)) + if cli.samba_debug_level: + samba_cmds.set_global_debug(cli.samba_debug_level) + if cli.samba_command_prefix: + samba_cmds.set_global_prefix([cli.samba_command_prefix]) + + # should there be an option to force {en,dis}able rados? + # Right now we just always try to enable rados when possible. + rados_opener.enable_rados( + url_opener.URLOpener, + client_name=cli.ceph_id.get("client_name", ""), + full_name=cli.ceph_id.get("full_name", False), + ) + + +def enable_logging(cli: argparse.Namespace) -> None: + """Configure sambacc command line logging.""" + level = logging.DEBUG if cli.debug else logging.INFO + logger = logging.getLogger() + logger.setLevel(level) + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter("{asctime}: {levelname}: {message}", style="{") + ) + handler.setLevel(level) + logger.addHandler(handler) + + +def global_args(parser: Parser) -> None: + """Configure sambacc default global command line arguments.""" + parser.add_argument( + "--config", + action="append", + help=( + "Specify source configuration" + " (can also be set in the environment by SAMBACC_CONFIG)." + ), + ) + parser.add_argument( + "--identity", + help=( + "A string identifying the local identity" + " (can also be set in the environment by SAMBA_CONTAINER_ID)." + ), + ) + parser.add_argument( + "--etc-passwd-path", + default="/etc/passwd", + help="Specify a path for the passwd file.", + ) + parser.add_argument( + "--etc-group-path", + default="/etc/group", + help="Specify a path for the group file.", + ) + parser.add_argument( + "--username", + default="Administrator", + help="Specify a user name for domain access.", + ) + parser.add_argument( + "--password", default="", help="Specify a password for domain access." + ) + parser.add_argument( + "--debug-delay", + type=int, + help="Delay activity for a specified number of seconds.", + ) + parser.add_argument( + "--join-marker", + default=DEFAULT_JOIN_MARKER, + help="Path to a file used to indicate a join has been peformed.", + ) + parser.add_argument( + "--samba-debug-level", + choices=[str(v) for v in range(0, 11)], + help="Specify samba debug level for commands.", + ) + parser.add_argument( + "--samba-command-prefix", + help="Wrap samba commands within a supplied command prefix", + ) + parser.add_argument( + "--skip-if", + dest="skip_conditions", + action="append", + type=skips.parse, + help=( + "Skip execution based on a condition. Conditions include" + " 'file:[!]', 'env:(==|!=)', and 'always:'." + " (Pass `?` for more details)" + ), + ) + parser.add_argument( + "--skip-if-file", + action="append", + dest="skip_conditions", + type=skips.SkipFile.parse, + help="(DEPRECATED) Perform no action if the specified path exists.", + ) + parser.add_argument( + "--validate-config", + choices=("auto", "required", "true", "false"), + help="Perform schema based validation of configuration.", + ) + parser.add_argument( + "--ceph-id", + type=ceph_id, + help=( + "Specify a user/client ID to ceph libraries" + "(can also be set in the environment by SAMBACC_CEPH_ID." + " Ignored if Ceph RADOS libraries are not present or unused." + " Pass `?` for more details)." + ), + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug level logging of sambacc.", + ) diff --git a/sambacc/commands/dcmain.py b/sambacc/commands/dcmain.py index da133215..589a08cf 100644 --- a/sambacc/commands/dcmain.py +++ b/sambacc/commands/dcmain.py @@ -20,8 +20,8 @@ from . import addc from . import skips -from .cli import Fail -from .main import ( +from .cli import Fail, commands +from .common import ( CommandContext, enable_logging, env_to_cli, @@ -34,7 +34,7 @@ def main(args: typing.Optional[typing.Sequence[str]] = None) -> None: - cli = addc.dccommands.assemble(arg_func=global_args).parse_args(args) + cli = commands.assemble(arg_func=global_args).parse_args(args) env_to_cli(cli) enable_logging(cli) if not cli.identity: diff --git a/sambacc/commands/main.py b/sambacc/commands/main.py index 6996bc0c..b720a050 100644 --- a/sambacc/commands/main.py +++ b/sambacc/commands/main.py @@ -16,314 +16,28 @@ # along with this program. If not, see # -import argparse -import json -import logging -import os -import time import typing -from sambacc import config -from sambacc import opener -from sambacc import rados_opener -from sambacc import samba_cmds -from sambacc import url_opener -from . import check # noqa: F401 from . import config as config_cmds -from . import ctdb # noqa: F401 -from . import dns # noqa: F401 -from . import initialize # noqa: F401 -from . import join # noqa: F401 -from . import run # noqa: F401 from . import skips -from . import users # noqa: F401 -from .cli import commands, Fail, Parser - -DEFAULT_CONFIG = "/etc/samba/container/config.json" -DEFAULT_JOIN_MARKER = "/var/lib/samba/container-join-marker.json" +from .cli import commands, Fail +from .common import ( + CommandContext, + enable_logging, + env_to_cli, + global_args, + pre_action, +) default_cfunc = config_cmds.print_config -def global_args(parser: Parser) -> None: - parser.add_argument( - "--config", - action="append", - help=( - "Specify source configuration" - " (can also be set in the environment by SAMBACC_CONFIG)." - ), - ) - parser.add_argument( - "--identity", - help=( - "A string identifying the local identity" - " (can also be set in the environment by SAMBA_CONTAINER_ID)." - ), - ) - parser.add_argument( - "--etc-passwd-path", - default="/etc/passwd", - help="Specify a path for the passwd file.", - ) - parser.add_argument( - "--etc-group-path", - default="/etc/group", - help="Specify a path for the group file.", - ) - parser.add_argument( - "--username", - default="Administrator", - help="Specify a user name for domain access.", - ) - parser.add_argument( - "--password", default="", help="Specify a password for domain access." - ) - parser.add_argument( - "--debug-delay", - type=int, - help="Delay activity for a specified number of seconds.", - ) - parser.add_argument( - "--join-marker", - default=DEFAULT_JOIN_MARKER, - help="Path to a file used to indicate a join has been peformed.", - ) - parser.add_argument( - "--samba-debug-level", - choices=[str(v) for v in range(0, 11)], - help="Specify samba debug level for commands.", - ) - parser.add_argument( - "--samba-command-prefix", - help="Wrap samba commands within a supplied command prefix", - ) - parser.add_argument( - "--skip-if", - dest="skip_conditions", - action="append", - type=skips.parse, - help=( - "Skip execution based on a condition. Conditions include" - " 'file:[!]', 'env:(==|!=)', and 'always:'." - " (Pass `?` for more details)" - ), - ) - parser.add_argument( - "--skip-if-file", - action="append", - dest="skip_conditions", - type=skips.SkipFile.parse, - help="(DEPRECATED) Perform no action if the specified path exists.", - ) - parser.add_argument( - "--validate-config", - choices=("auto", "required", "true", "false"), - help="Perform schema based validation of configuration.", - ) - parser.add_argument( - "--ceph-id", - type=_ceph_id, - help=( - "Specify a user/client ID to ceph libraries" - "(can also be set in the environment by SAMBACC_CEPH_ID." - " Ignored if Ceph RADOS libraries are not present or unused." - " Pass `?` for more details)." - ), - ) - parser.add_argument( - "--debug", - action="store_true", - help="Enable debug level logging of sambacc.", - ) - - -def _ceph_id( - value: typing.Union[str, dict[str, typing.Any]] -) -> dict[str, typing.Any]: - if not isinstance(value, str): - return value - if value == "?": - # A hack to avoid putting tons of ceph specific info in the normal - # help output. There's probably a better way to do this but it - # gets the job done for now. - raise argparse.ArgumentTypeError( - "requested help:" - " Specify names in the form" - " --ceph-id=[key=value][,key=value][,...]." - ' Valid keys include "name" to set the exact name and "rados_id"' - ' to specify a name that lacks the "client." prefix (that will' - "automatically get added)." - " Alternatively, specify just the name to allow the system to" - " guess if the name is prefixed already or not." - ) - result: dict[str, typing.Any] = {} - # complex mode - if "=" in value: - for part in value.split(","): - if "=" not in part: - raise argparse.ArgumentTypeError( - f"unexpected value for ceph-id: {value!r}" - ) - key, val = part.split("=", 1) - if key == "name": - result["client_name"] = val - result["full_name"] = True - elif key == "rados_id": - result["client_name"] = val - result["full_name"] = False - else: - b = f"unexpected key {key!r} in value for ceph-id: {value!r}" - raise argparse.ArgumentTypeError(b) - else: - # this shorthand is meant mainly for lazy humans (me) when running test - # images manually. The key-value form above is meant for automation. - result["client_name"] = value - # assume that if the name starts with client. it's the full name and - # avoid having the ceph library double up an create client.client.x. - result["full_name"] = value.startswith("client.") - return result - - -def from_env( - ns: typing.Any, - var: str, - ename: str, - default: typing.Any = None, - convert_env: typing.Optional[typing.Callable] = None, - convert_value: typing.Optional[typing.Callable] = str, -) -> None: - value = getattr(ns, var, None) - if not value: - value = os.environ.get(ename, "") - if convert_env is not None: - value = convert_env(value) - if convert_value is not None: - value = convert_value(value) - if value: - setattr(ns, var, value) - - -def split_entries(value): - out = [] - if not isinstance(value, str): - raise ValueError(value) - if not value: - return out - # in order to cleanly allow passing uris as config "paths" we can't - # simply split on colons. Avoid coming up with a hokey custom scheme - # and enter "JSON-mode" if the env var starts and ends with brackets - # hinting it contains a JSON list. - v = value.rstrip(None) # permit trailing whitespace (trailing only!) - if v[0] == "[" and v[-1] == "]": - for item in json.loads(v): - if not isinstance(item, str): - raise ValueError("Variable JSON must be a list of strings") - out.append(item) - else: - # backwards compatibilty mode with `PATH` like syntax - for part in value.split(":"): - out.append(part) - return out - - -def env_to_cli(cli: typing.Any) -> None: - from_env( - cli, - "config", - "SAMBACC_CONFIG", - convert_env=split_entries, - convert_value=None, - default=DEFAULT_CONFIG, - ) - from_env( - cli, - "join_files", - "SAMBACC_JOIN_FILES", - convert_env=split_entries, - convert_value=None, - ) - from_env(cli, "identity", "SAMBA_CONTAINER_ID") - from_env(cli, "username", "JOIN_USERNAME") - from_env(cli, "password", "INSECURE_JOIN_PASSWORD") - from_env(cli, "samba_debug_level", "SAMBA_DEBUG_LEVEL") - from_env(cli, "validate_config", "SAMBACC_VALIDATE_CONFIG") - from_env(cli, "ceph_id", "SAMBACC_CEPH_ID", convert_value=_ceph_id) - - -class CommandContext: - """CLI Context for standard samba-container commands.""" - - def __init__(self, cli_args: argparse.Namespace): - self._cli = cli_args - self._iconfig: typing.Optional[config.InstanceConfig] = None - self.expects_ctdb = False - self._opener: typing.Optional[opener.Opener] = None - - @property - def cli(self) -> argparse.Namespace: - return self._cli - - @property - def instance_config(self) -> config.InstanceConfig: - if self._iconfig is None: - cfgs = self.cli.config or [] - self._iconfig = config.read_config_files( - cfgs, - require_validation=self.require_validation, - opener=self.opener, - ).get(self.cli.identity) - return self._iconfig - - @property - def require_validation(self) -> typing.Optional[bool]: - if self.cli.validate_config in ("required", "true"): - return True - if self.cli.validate_config == "false": - return False - return None - - @property - def opener(self) -> opener.Opener: - if self._opener is None: - self._opener = opener.FallbackOpener([url_opener.URLOpener()]) - return self._opener - - -def pre_action(cli: typing.Any) -> None: - """Handle debugging/diagnostic related options before the target - action of the command is performed. - """ - if cli.debug_delay: - time.sleep(int(cli.debug_delay)) - if cli.samba_debug_level: - samba_cmds.set_global_debug(cli.samba_debug_level) - if cli.samba_command_prefix: - samba_cmds.set_global_prefix([cli.samba_command_prefix]) - - # should there be an option to force {en,dis}able rados? - # Right now we just always try to enable rados when possible. - rados_opener.enable_rados( - url_opener.URLOpener, - client_name=cli.ceph_id.get("client_name", ""), - full_name=cli.ceph_id.get("full_name", False), - ) - - -def enable_logging(cli: typing.Any) -> None: - level = logging.DEBUG if cli.debug else logging.INFO - logger = logging.getLogger() - logger.setLevel(level) - handler = logging.StreamHandler() - handler.setFormatter( - logging.Formatter("{asctime}: {levelname}: {message}", style="{") +def main(args: typing.Optional[typing.Sequence[str]] = None) -> None: + commands.include_multiple( + [".check", ".ctdb", ".dns", ".initialize", ".join", ".run", ".users"] ) - handler.setLevel(level) - logger.addHandler(handler) - -def main(args: typing.Optional[typing.Sequence[str]] = None) -> None: cli = commands.assemble(arg_func=global_args).parse_args(args) env_to_cli(cli) enable_logging(cli) diff --git a/tox.ini b/tox.ini index 4a4eb17e..7f30cb18 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] -envlist = formatting, {py3,py39}-mypy, py3, py39, schemacheck, py3-sys +envlist = flake8, formatting, {py3,py39}-mypy, py3, py39, schemacheck, py3-sys isolated_build = True [testenv] @@ -51,14 +51,19 @@ allowlist_externals = /usr/bin/py.test [testenv:formatting] -description = Check the formatting for the source files +description = Check the style/formatting for the source files deps = - flake8 black>=24, <25 commands = - flake8 sambacc tests black --check -v . +[testenv:flake8] +description = Basic python linting for the source files +deps = + flake8 +commands = + flake8 sambacc tests + [testenv:schemacheck] description = Check the JSON Schema files are valid deps =