Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions sambacc/commands/addc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sambacc import smbconf_api
from sambacc import smbconf_samba

from .cli import best_waiter, CommandBuilder, Context, Fail
from .cli import Context, Fail, best_waiter, commands

try:
import dns
Expand All @@ -43,10 +43,8 @@
_populated: str = "/var/lib/samba/POPULATED"
_provisioned: str = "/etc/samba/smb.conf"

dccommands = CommandBuilder()


@dccommands.command(name="summary")
@commands.command(name="summary")
def summary(ctx: Context) -> None:
print("Hello", ctx)

Expand Down Expand Up @@ -192,7 +190,7 @@ def _prep_krb5_conf(ctx: Context) -> None:
shutil.copy("/var/lib/samba/private/krb5.conf", "/etc/krb5.conf")


@dccommands.command(name="run", arg_func=_run_container_args)
@commands.command(name="run", arg_func=_run_container_args)
def run(ctx: Context) -> None:
_logger.info("Running AD DC container")
if _dosetup(ctx, "wait-domain"):
Expand Down
84 changes: 78 additions & 6 deletions sambacc/commands/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from collections import namedtuple
import argparse
import importlib
import inspect
import logging
import typing

Expand Down Expand Up @@ -45,13 +47,11 @@ class Parser(typing.Protocol):

def set_defaults(self, **kwargs: typing.Any) -> None:
"""Set a default value for an argument parser."""
... # pragma: no cover

def add_argument(
self, *args: typing.Any, **kwargs: typing.Any
) -> typing.Any:
"""Add an argument to be parsed."""
... # pragma: no cover


Command = namedtuple("Command", "name cmd_func arg_func cmd_help")
Expand All @@ -74,6 +74,58 @@ def toggle_option(parser: Parser, arg: str, dest: str, helpfmt: str) -> Parser:
return parser


def ceph_id(
value: typing.Union[str, dict[str, typing.Any]]
) -> dict[str, typing.Any]:
"""Parse a string value into a dict containing ceph id values.
The input should contain name= or rados_id= to identify the kind
of name being provided. As a shortcut a bare name can be provided
and the code will guess at the kind.
"""
if not isinstance(value, str):
return value
if value == "?":
# A hack to avoid putting tons of ceph specific info in the normal
# help output. There's probably a better way to do this but it
# gets the job done for now.
raise argparse.ArgumentTypeError(
"requested help:"
" Specify names in the form"
" --ceph-id=[key=value][,key=value][,...]."
' Valid keys include "name" to set the exact name and "rados_id"'
' to specify a name that lacks the "client." prefix (that will'
"automatically get added)."
" Alternatively, specify just the name to allow the system to"
" guess if the name is prefixed already or not."
)
result: dict[str, typing.Any] = {}
# complex mode
if "=" in value:
for part in value.split(","):
if "=" not in part:
raise argparse.ArgumentTypeError(
f"unexpected value for ceph-id: {value!r}"
)
key, val = part.split("=", 1)
if key == "name":
result["client_name"] = val
result["full_name"] = True
elif key == "rados_id":
result["client_name"] = val
result["full_name"] = False
else:
b = f"unexpected key {key!r} in value for ceph-id: {value!r}"
raise argparse.ArgumentTypeError(b)
else:
# this shorthand is meant mainly for lazy humans (me) when running test
# images manually. The key-value form above is meant for automation.
result["client_name"] = value
# assume that if the name starts with client. it's the full name and
# avoid having the ceph library double up an create client.client.x.
result["full_name"] = value.startswith("client.")
return result


def get_help(cmd: Command) -> str:
if cmd.cmd_help is not None:
return cmd.cmd_help
Expand Down Expand Up @@ -124,6 +176,30 @@ def dict(self) -> dict[str, Command]:
"""Return a dict mapping command names to Command object."""
return {c.name: c for c in self._commands}

def include(
self, modname: str, *, package: str = "", check: bool = True
) -> None:
"""Import a python module to add commands to this command builder.
If check is true and no new commands are added by the import, raise an
error.
"""
if modname.startswith(".") and not package:
package = "sambacc.commands"
mod = importlib.import_module(modname, package=package)
if not check:
return
loaded_fns = {c.cmd_func for c in self._commands}
mod_fns = {fn for _, fn in inspect.getmembers(mod, inspect.isfunction)}
if not mod_fns.intersection(loaded_fns):
raise Fail(f"import from {modname} did not add any new commands")

def include_multiple(
self, modnames: typing.Iterable[str], *, package: str = ""
) -> None:
"""Run the include function on multiple module names."""
for modname in modnames:
self.include(modname, package=package)


class Context(typing.Protocol):
"""Protocol type for CLI Context.
Expand All @@ -138,22 +214,18 @@ class Context(typing.Protocol):
@property
def cli(self) -> argparse.Namespace:
"""Return a parsed command line namespace object."""
... # pragma: no cover

@property
def instance_config(self) -> config.InstanceConfig:
"""Return an instance config based on cli params and env."""
... # pragma: no cover

@property
def require_validation(self) -> typing.Optional[bool]:
"""Return true if configuration needs validation."""
... # pragma: no cover

@property
def opener(self) -> opener.Opener:
"""Return an appropriate opener object for this instance."""
... # pragma: no cover


def best_waiter(
Expand Down
Loading
Loading