diff --git a/supervisor/addons/options.py b/supervisor/addons/options.py index 7ae4b312d01..c11cf0a9a22 100644 --- a/supervisor/addons/options.py +++ b/supervisor/addons/options.py @@ -93,15 +93,7 @@ def __call__(self, struct): typ = self.raw_schema[key] try: - if isinstance(typ, list): - # nested value list - options[key] = self._nested_validate_list(typ[0], value, key) - elif isinstance(typ, dict): - # nested value dict - options[key] = self._nested_validate_dict(typ, value, key) - else: - # normal value - options[key] = self._single_validate(typ, value, key) + options[key] = self._validate_element(typ, value, key) except (IndexError, KeyError): raise vol.Invalid( f"Type error for option '{key}' in {self._name} ({self._slug})" @@ -111,7 +103,20 @@ def __call__(self, struct): return options # pylint: disable=no-value-for-parameter - def _single_validate(self, typ: str, value: Any, key: str): + def _validate_element(self, typ: Any, value: Any, key: str) -> Any: + """Validate a value against a type specification.""" + if isinstance(typ, list): + # nested value list + return self._nested_validate_list(typ[0], value, key) + elif isinstance(typ, dict): + # nested value dict + return self._nested_validate_dict(typ, value, key) + else: + # normal value + return self._single_validate(typ, value, key) + + # pylint: disable=no-value-for-parameter + def _single_validate(self, typ: str, value: Any, key: str) -> Any: """Validate a single element.""" # if required argument if value is None: @@ -188,7 +193,9 @@ def _single_validate(self, typ: str, value: Any, key: str): f"Fatal error for option '{key}' with type '{typ}' in {self._name} ({self._slug})" ) from None - def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str): + def _nested_validate_list( + self, typ: Any, data_list: list[Any], key: str + ) -> list[Any]: """Validate nested items.""" options = [] @@ -201,17 +208,13 @@ def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str): # Process list for element in data_list: # Nested? - if isinstance(typ, dict): - c_options = self._nested_validate_dict(typ, element, key) - options.append(c_options) - else: - options.append(self._single_validate(typ, element, key)) + options.append(self._validate_element(typ, element, key)) return options def _nested_validate_dict( self, typ: dict[Any, Any], data_dict: dict[Any, Any], key: str - ): + ) -> dict[Any, Any]: """Validate nested items.""" options = {} @@ -231,12 +234,7 @@ def _nested_validate_dict( continue # Nested? - if isinstance(typ[c_key], list): - options[c_key] = self._nested_validate_list( - typ[c_key][0], c_value, c_key - ) - else: - options[c_key] = self._single_validate(typ[c_key], c_value, c_key) + options[c_key] = self._validate_element(typ[c_key], c_value, c_key) self._check_missing_options(typ, options, key) return options @@ -274,18 +272,28 @@ def __call__(self, raw_schema: dict[str, Any]) -> list[dict[str, Any]]: # read options for key, value in raw_schema.items(): - if isinstance(value, list): - # nested value list - self._nested_ui_list(ui_schema, value, key) - elif isinstance(value, dict): - # nested value dict - self._nested_ui_dict(ui_schema, value, key) - else: - # normal value - self._single_ui_option(ui_schema, value, key) + self._ui_schema_element(ui_schema, value, key) return ui_schema + def _ui_schema_element( + self, + ui_schema: list[dict[str, Any]], + value: str, + key: str, + multiple: bool = False, + ): + if isinstance(value, list): + # nested value list + assert not multiple + self._nested_ui_list(ui_schema, value, key) + elif isinstance(value, dict): + # nested value dict + self._nested_ui_dict(ui_schema, value, key, multiple) + else: + # normal value + self._single_ui_option(ui_schema, value, key, multiple) + def _single_ui_option( self, ui_schema: list[dict[str, Any]], @@ -377,10 +385,7 @@ def _nested_ui_list( _LOGGER.error("Invalid schema %s", key) return - if isinstance(element, dict): - self._nested_ui_dict(ui_schema, element, key, multiple=True) - else: - self._single_ui_option(ui_schema, element, key, multiple=True) + self._ui_schema_element(ui_schema, element, key, multiple=True) def _nested_ui_dict( self, @@ -399,11 +404,7 @@ def _nested_ui_dict( nested_schema: list[dict[str, Any]] = [] for c_key, c_value in option_dict.items(): - # Nested? - if isinstance(c_value, list): - self._nested_ui_list(nested_schema, c_value, c_key) - else: - self._single_ui_option(nested_schema, c_value, c_key) + self._ui_schema_element(nested_schema, c_value, c_key) ui_node["schema"] = nested_schema ui_schema.append(ui_node) diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index 8090e1d79e4..0dc28e2384d 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -137,7 +137,19 @@ r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$" ) -SCHEMA_ELEMENT = vol.Match(RE_SCHEMA_ELEMENT) +SCHEMA_ELEMENT = vol.Schema( + vol.Any( + vol.Match(RE_SCHEMA_ELEMENT), + [ + # A list may not directly contain another list + vol.Any( + vol.Match(RE_SCHEMA_ELEMENT), + {str: vol.Self}, + ) + ], + {str: vol.Self}, + ) +) RE_MACHINE = re.compile( r"^!?(?:" @@ -406,20 +418,7 @@ def _migrate(config: dict[str, Any]): vol.Optional(ATTR_CODENOTARY): vol.Email(), vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_SCHEMA, default={}): vol.Any( - vol.Schema( - { - str: vol.Any( - SCHEMA_ELEMENT, - [ - vol.Any( - SCHEMA_ELEMENT, - {str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}, - ) - ], - vol.Schema({str: vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}), - ) - } - ), + vol.Schema({str: SCHEMA_ELEMENT}), False, ), vol.Optional(ATTR_IMAGE): docker_image, diff --git a/tests/addons/test_config.py b/tests/addons/test_config.py index 6368d679b0f..cc957d76130 100644 --- a/tests/addons/test_config.py +++ b/tests/addons/test_config.py @@ -325,3 +325,97 @@ def test_valid_slug(): config["slug"] = "complemento telefónico" with pytest.raises(vol.Invalid): assert vd.SCHEMA_ADDON_CONFIG(config) + + +def test_valid_schema(): + """Test valid and invalid addon slugs.""" + config = load_json_fixture("basic-addon-config.json") + + # Basic types + config["schema"] = { + "bool_basic": "bool", + "mail_basic": "email", + "url_basic": "url", + "port_basic": "port", + "match_basic": "match(.*@.*)", + "list_basic": "list(option1|option2|option3)", + # device + "device_basic": "device", + "device_filter": "device(subsystem=tty)", + # str + "str_basic": "str", + "str_basic2": "str(,)", + "str_min": "str(5,)", + "str_max": "str(,10)", + "str_minmax": "str(5,10)", + # password + "password_basic": "password", + "password_basic2": "password(,)", + "password_min": "password(5,)", + "password_max": "password(,10)", + "password_minmax": "password(5,10)", + # int + "int_basic": "int", + "int_basic2": "int(,)", + "int_min": "int(5,)", + "int_max": "int(,10)", + "int_minmax": "int(5,10)", + # float + "float_basic": "float", + "float_basic2": "float(,)", + "float_min": "float(5,)", + "float_max": "float(,10)", + "float_minmax": "float(5,10)", + } + assert vd.SCHEMA_ADDON_CONFIG(config) + + # Different valid ways of nesting dicts and lists + config["schema"] = { + "str_list": ["str"], + "dict_in_list": [ + { + "required": "str", + "optional": "str?", + } + ], + "dict": { + "required": "str", + "optional": "str?", + "str_list_in_dict": ["str"], + "dict_in_list_in_dict": [ + { + "required": "str", + "optional": "str?", + "str_list_in_dict_in_list_in_dict": ["str"], + } + ], + "dict_in_dict": { + "str_list_in_dict_in_dict": ["str"], + "dict_in_list_in_dict_in_dict": [ + { + "required": "str", + "optional": "str?", + } + ], + "dict_in_dict_in_dict": { + "required": "str", + "optional": "str", + }, + }, + }, + } + assert vd.SCHEMA_ADDON_CONFIG(config) + + # List nested within dict within list + config["schema"] = {"field": [{"subfield": ["str"]}]} + assert vd.SCHEMA_ADDON_CONFIG(config) + + # No lists directly nested within each other + config["schema"] = {"field": [["str"]]} + with pytest.raises(vol.Invalid): + assert vd.SCHEMA_ADDON_CONFIG(config) + + # Field types must be valid + config["schema"] = {"field": "invalid"} + with pytest.raises(vol.Invalid): + assert vd.SCHEMA_ADDON_CONFIG(config) diff --git a/tests/addons/test_options.py b/tests/addons/test_options.py index 847cc285cc7..7625d917a97 100644 --- a/tests/addons/test_options.py +++ b/tests/addons/test_options.py @@ -129,6 +129,64 @@ def test_complex_schema_dict(coresys): )({"name": "Pascal", "password": "1234", "extend": "test"}) +def test_complex_schema_dict_and_list(coresys): + """Test with complex dict/list nested schema.""" + assert AddonOptions( + coresys, + { + "name": "str", + "packages": [ + { + "name": "str", + "options": {"optional": "bool"}, + "dependencies": [{"name": "str"}], + } + ], + }, + MOCK_ADDON_NAME, + MOCK_ADDON_SLUG, + )( + { + "name": "Pascal", + "packages": [ + { + "name": "core", + "options": {"optional": False}, + "dependencies": [{"name": "supervisor"}, {"name": "audio"}], + } + ], + } + ) + + with pytest.raises(vol.error.Invalid): + assert AddonOptions( + coresys, + { + "name": "str", + "packages": [ + { + "name": "str", + "options": {"optional": "bool"}, + "dependencies": [{"name": "str"}], + } + ], + }, + MOCK_ADDON_NAME, + MOCK_ADDON_SLUG, + )( + { + "name": "Pascal", + "packages": [ + { + "name": "core", + "options": {"optional": False}, + "dependencies": [{"name": "supervisor"}, "wrong"], + } + ], + } + ) + + def test_simple_device_schema(coresys): """Test with simple schema.""" for device in (