Skip to content

Commit 4b94f0b

Browse files
phlogistonjohnmergify[bot]
authored andcommitted
sambacc: add configuration types for keybridge servers
Signed-off-by: John Mulligan <[email protected]>
1 parent bd68190 commit 4b94f0b

File tree

1 file changed

+178
-0
lines changed

1 file changed

+178
-0
lines changed

sambacc/config.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,16 @@ def organizational_units(self) -> typing.Iterable[OrganizationalUnitEntry]:
358358
for n, entry in enumerate(o_units):
359359
yield OrganizationalUnitEntry(self, entry, n)
360360

361+
def keybridge_config(
362+
self, name: typing.Optional[str] = None
363+
) -> typing.Optional[KeyBridgeConfig]:
364+
if not name:
365+
name = self.iconfig.get("keybridge_config")
366+
if not name:
367+
return None
368+
config = self.gconfig.data.get("keybridge", {}).get(name, {})
369+
return KeyBridgeConfig(config, name=name)
370+
361371
def __eq__(self, other: typing.Any) -> bool:
362372
if isinstance(other, InstanceConfig) and self.iconfig == other.iconfig:
363373
self_shares = _shares_data(self.gconfig, self.iconfig)
@@ -580,6 +590,174 @@ def options(self) -> dict[str, str]:
580590
return {k: v for k, v in self._pconf.items() if k not in filter_keys}
581591

582592

593+
class KeyBridgeScopeConfig:
594+
"""Configuration of a single keybridge scope. Fields may or may not
595+
be populated depending on the type of scope.
596+
"""
597+
598+
def __init__(self, scope_cfg: dict) -> None:
599+
self._cfg = scope_cfg
600+
601+
@property
602+
def name(self) -> str:
603+
return str(self._cfg.get("name", ""))
604+
605+
@property
606+
def type_name(self) -> str:
607+
return self.name.split(".")[0]
608+
609+
@property
610+
def subname(self) -> str:
611+
return self.name.split(".", 1)[-1]
612+
613+
@property
614+
def hostnames(self) -> list[str]:
615+
return list(self._cfg.get("hostnames", []))
616+
617+
@property
618+
def port(self) -> int:
619+
return int(self._cfg.get("port", -1))
620+
621+
@property
622+
def tls_paths(self) -> dict[str, str]:
623+
_tls_paths = self._cfg.get("tls_paths", {})
624+
if not _tls_paths:
625+
return {}
626+
# ensure dict keys are consistent and nothing extra
627+
return {
628+
"cert": str(_tls_paths.get("cert", "")),
629+
"key": str(_tls_paths.get("key", "")),
630+
"ca_cert": str(_tls_paths.get("ca_cert", "")),
631+
}
632+
633+
634+
class KeyBridgeVerifyConfig:
635+
"""Configuration for keybridge peer verification. Each check_*
636+
value can be missing - and not checked - or a range of ints or
637+
a list ints.
638+
"""
639+
640+
def __init__(self, verify_cfg: dict) -> None:
641+
self._cfg = verify_cfg
642+
643+
def __repr__(self) -> str:
644+
return f"{self.__class__.__name__}({self._cfg!r})"
645+
646+
@property
647+
def check_pid(self) -> typing.Union[None, range, typing.Collection[int]]:
648+
return self.parameter(self._cfg.get("check_pid"))
649+
650+
@property
651+
def check_uid(self) -> typing.Union[None, range, typing.Collection[int]]:
652+
return self.parameter(self._cfg.get("check_uid"))
653+
654+
@property
655+
def check_gid(self) -> typing.Union[None, range, typing.Collection[int]]:
656+
return self.parameter(self._cfg.get("check_gid"))
657+
658+
@classmethod
659+
def parameter(
660+
cls,
661+
value: typing.Optional[str],
662+
) -> typing.Union[None, range, typing.Collection[int]]:
663+
"""Convert a value into a PID/UID/GID range or collection.
664+
Formats are:
665+
<num>: a single int to match
666+
<num>-<num>: a range of values
667+
<num>+: a range of values ending at a 32 bit max
668+
<num>[,<num>][,<num>...]: a list of allowed ints
669+
"""
670+
if value is None:
671+
return None
672+
673+
if value.endswith("+"):
674+
value = value[:-1]
675+
if value.isdigit():
676+
# allow any 32 bit int over value
677+
return range(int(value), 1 << 32)
678+
679+
if "-" in value:
680+
parts = value.split("-", 1)
681+
if len(parts) != 2:
682+
raise ValueError("ranges must take the form <int>-<int>")
683+
if parts[0].isdigit() and parts[1].isdigit():
684+
return range(int(parts[0]), int(parts[1]))
685+
raise ValueError("ranges must start and end with integers")
686+
687+
if "," in value:
688+
parts = value.split(",")
689+
if not all(s.isdigit() for s in parts):
690+
ValueError("lists must contain only integers")
691+
return set(int(s) for s in parts)
692+
693+
if value.isdigit():
694+
return {int(value)}
695+
696+
raise ValueError("not a valid check value")
697+
698+
699+
class KeyBridgeConfig:
700+
"""Return a keybridge server configuration object.
701+
A configuration consists of zero or more scopes and an optional
702+
peer verification config.
703+
"""
704+
705+
def __init__(self, kbconf: dict, name: str = "") -> None:
706+
self._kbconf = kbconf
707+
self._name = name
708+
709+
def scopes(self) -> list[KeyBridgeScopeConfig]:
710+
"""Return a list of scope configuration objects."""
711+
_scopes = self._kbconf.get("scopes", [])
712+
return [KeyBridgeScopeConfig(s) for s in _scopes]
713+
714+
def verify(self) -> typing.Optional[KeyBridgeVerifyConfig]:
715+
"""Return a configuration for a general peer verification wrapper."""
716+
_verify = self._kbconf.get("verify_peer")
717+
if not _verify:
718+
return None
719+
return KeyBridgeVerifyConfig(_verify)
720+
721+
def update_mem_scope(self) -> None:
722+
"""Create or update a mem scope in this configuration."""
723+
if "mem" in {s.type_name for s in self.scopes()}:
724+
return
725+
self._kbconf.setdefault("scopes", []).append({"name": "mem"})
726+
727+
def update_kmip_scope(
728+
self,
729+
hostnames: typing.Optional[list[str]] = None,
730+
port: typing.Optional[int] = None,
731+
tls_cert: typing.Optional[str] = None,
732+
tls_key: typing.Optional[str] = None,
733+
tls_ca_cert: typing.Optional[str] = None,
734+
subname: str = "1",
735+
) -> None:
736+
"""Create or update a kmip scope in this configuration. If the kmip
737+
scope already exists any non-None argument to this function will update
738+
the corresponding configuration value in the matching scope.
739+
"""
740+
_scopes = {s.type_name: s._cfg for s in self.scopes()}
741+
if "kmip" in _scopes:
742+
scope = _scopes["kmip"]
743+
else:
744+
scope = {"name": f"kmip.{subname}"}
745+
self._kbconf.setdefault("scopes", []).append(scope)
746+
if hostnames:
747+
scope["hostnames"] = hostnames
748+
if port and port > 0:
749+
scope["port"] = port
750+
_tls = (tls_cert, tls_key, tls_ca_cert)
751+
if any(_tls) and not all(_tls):
752+
raise ValueError("specify all TLS files or none")
753+
elif all(_tls):
754+
scope["tls_paths"] = {
755+
"cert": tls_cert,
756+
"key": tls_key,
757+
"ca_cert": tls_ca_cert,
758+
}
759+
760+
583761
def _shares_data(gconfig: GlobalConfig, iconfig: dict) -> list:
584762
try:
585763
shares = iconfig["shares"]

0 commit comments

Comments
 (0)