|
| 1 | +"""Configuration management for RP""" |
| 2 | + |
| 3 | +from typing import Dict |
| 4 | + |
| 5 | +from oidcrp.logging import configure_logging |
| 6 | +from oidcrp.util import get_http_params |
| 7 | +from oidcrp.util import load_yaml_config |
| 8 | +from oidcrp.util import lower_or_upper |
| 9 | + |
| 10 | +try: |
| 11 | + from secrets import token_urlsafe as rnd_token |
| 12 | +except ImportError: |
| 13 | + from oidcendpoint import rndstr as rnd_token |
| 14 | + |
| 15 | + |
| 16 | +class Configuration: |
| 17 | + """RP Configuration""" |
| 18 | + |
| 19 | + def __init__(self, conf: Dict) -> None: |
| 20 | + self.logger = configure_logging(config=conf.get('logging')).getChild(__name__) |
| 21 | + |
| 22 | + # server info |
| 23 | + self.domain = lower_or_upper(conf, "domain") |
| 24 | + self.port = lower_or_upper(conf, "port") |
| 25 | + for param in ["server_name", "base_url"]: |
| 26 | + _pre = lower_or_upper(conf, param) |
| 27 | + if _pre: |
| 28 | + if '{domain}' in _pre: |
| 29 | + setattr(self, param, _pre.format(domain=self.domain, port=self.port)) |
| 30 | + else: |
| 31 | + setattr(self, param, _pre) |
| 32 | + |
| 33 | + # HTTP params |
| 34 | + _params = get_http_params(conf.get("http_params")) |
| 35 | + if _params: |
| 36 | + self.httpc_params = _params |
| 37 | + else: |
| 38 | + _params = {'verify', lower_or_upper(conf, "verify_ssl", True)} |
| 39 | + |
| 40 | + self.web_conf = lower_or_upper(conf, "webserver") |
| 41 | + |
| 42 | + # diverse |
| 43 | + for param in ["html_home", "session_cookie_name", "preferred_url_scheme", |
| 44 | + "services", "federation"]: |
| 45 | + setattr(self, param, lower_or_upper(conf, param)) |
| 46 | + |
| 47 | + rp_keys_conf = lower_or_upper(conf, 'rp_keys') |
| 48 | + if rp_keys_conf is None: |
| 49 | + rp_keys_conf = lower_or_upper(conf, 'oidc_keys') |
| 50 | + setattr(self, "rp_keys", rp_keys_conf) |
| 51 | + |
| 52 | + _clients = lower_or_upper(conf, "clients") |
| 53 | + for key, spec in _clients.items(): |
| 54 | + if key == "": |
| 55 | + continue |
| 56 | + if not spec.get("redirect_uris"): |
| 57 | + continue |
| 58 | + |
| 59 | + _redirects = [] |
| 60 | + for _r in spec["redirect_uris"]: |
| 61 | + if '{domain}' in _r: |
| 62 | + _redirects.append(_r.format(domain=self.domain, port=self.port)) |
| 63 | + else: |
| 64 | + _redirects.append(_r) |
| 65 | + spec["redirect_uris"] = _redirects |
| 66 | + |
| 67 | + setattr(self, "clients", _clients) |
| 68 | + |
| 69 | + hash_seed = lower_or_upper(conf, 'hash_seed') |
| 70 | + if not hash_seed: |
| 71 | + hash_seed = rnd_token(32) |
| 72 | + setattr(self, "hash_seed", hash_seed) |
| 73 | + |
| 74 | + @classmethod |
| 75 | + def create_from_config_file(cls, filename: str): |
| 76 | + """Load configuration as YAML""" |
| 77 | + return cls(load_yaml_config(filename)) |
0 commit comments