diff --git a/custom_components/opnsense/pyopnsense/__init__.py b/custom_components/opnsense/pyopnsense/__init__.py index a0c9147b..32b9fb28 100644 --- a/custom_components/opnsense/pyopnsense/__init__.py +++ b/custom_components/opnsense/pyopnsense/__init__.py @@ -991,7 +991,7 @@ async def _get_firewall_rules(self) -> dict[str, Any]: ) # _LOGGER.debug("[get_firewall_rules] response: %s", response) rules: list = response.get("rows", []) - _LOGGER.debug("[get_firewall_rules] rules: %s", rules) + # _LOGGER.debug("[get_firewall_rules] rules: %s", rules) rules_dict: dict[str, Any] = {} for rule in rules: if not rule.get("uuid") or "lockout" in rule.get("uuid"): @@ -1046,7 +1046,7 @@ async def _get_nat_one_to_one_rules(self) -> dict[str, Any]: ) # _LOGGER.debug("[get_nat_one_to_one_rules] response: %s", response) rules: list = response.get("rows", []) - _LOGGER.debug("[get_nat_one_to_one_rules] rules: %s", rules) + # _LOGGER.debug("[get_nat_one_to_one_rules] rules: %s", rules) rules_dict: dict[str, Any] = {} for rule in rules: if not rule.get("uuid") or "lockout" in rule.get("uuid"): diff --git a/custom_components/opnsense/switch.py b/custom_components/opnsense/switch.py index bf9d4742..a44fe935 100644 --- a/custom_components/opnsense/switch.py +++ b/custom_components/opnsense/switch.py @@ -52,45 +52,41 @@ async def _compile_filter_switches_legacy( A list of OPNsenseFilterSwitchLegacy entities. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("config", {}).get("filter", {}).get("rule"), list - ): + rules = dict_get(state, "firewall.config.filter.rule") + if not isinstance(rules, list): return [] entities: list = [] - # filter rules - rules = dict_get(state, "firewall.config.filter.rule") - if isinstance(rules, list): - for rule in rules: - if not isinstance(rule, dict): - continue + for rule in rules: + if not isinstance(rule, dict): + continue - # do NOT add rules that are NAT rules - if "associated-rule-id" in rule: - continue + # do NOT add rules that are NAT rules + if "associated-rule-id" in rule: + continue - # not possible to disable these rules - if rule.get("description", "") == "Anti-Lockout Rule": - continue + # not possible to disable these rules + if rule.get("description", "") == "Anti-Lockout Rule": + continue - tracker = dict_get(rule, "created.time") - # we use tracker as the unique id - if tracker is None or len(tracker) < 1: - continue + tracker = dict_get(rule, "created.time") + # we use tracker as the unique id + if tracker is None or len(tracker) < 1: + continue - entities.append( - OPNsenseFilterSwitchLegacy( - config_entry=config_entry, - coordinator=coordinator, - entity_description=SwitchEntityDescription( - key=f"filter.{tracker}", - name=f"Filter Rule {tracker} ({rule.get('descr', '')})", - icon="mdi:play-network-outline", - # entity_category=entity_category, - device_class=SwitchDeviceClass.SWITCH, - entity_registry_enabled_default=False, - ), - ) + entities.append( + OPNsenseFilterSwitchLegacy( + config_entry=config_entry, + coordinator=coordinator, + entity_description=SwitchEntityDescription( + key=f"filter.{tracker}", + name=f"Filter Rule {tracker} ({rule.get('descr', '')})", + icon="mdi:play-network-outline", + # entity_category=entity_category, + device_class=SwitchDeviceClass.SWITCH, + entity_registry_enabled_default=False, + ), ) + ) _LOGGER.debug("[compile_filter_switches_legacy] entities: %s", len(entities)) return entities @@ -117,37 +113,32 @@ async def _compile_port_forward_switches_legacy( A list of OPNsenseNatSwitchLegacy entities for port forward rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("config", {}).get("nat", {}).get("rule"), list - ): + rules = dict_get(state, "firewall.config.nat.rule") + if not isinstance(rules, list): return [] - entities: list = [] - # nat port forward rules - rules = dict_get(state, "firewall.config.nat.rule") - if isinstance(rules, list): - for rule in rules: - if not isinstance(rule, dict): - continue + for rule in rules: + if not isinstance(rule, dict): + continue - tracker = dict_get(rule, "created.time") - # we use tracker as the unique id - if tracker is None or len(tracker) < 1: - continue + tracker = dict_get(rule, "created.time") + # we use tracker as the unique id + if tracker is None or len(tracker) < 1: + continue - entity = OPNsenseNatSwitchLegacy( - config_entry=config_entry, - coordinator=coordinator, - entity_description=SwitchEntityDescription( - key=f"nat_port_forward.{tracker}", - name=f"NAT Port Forward Rule {tracker} ({rule.get('descr', '')})", - icon="mdi:network-outline", - # entity_category=ENTITY_CATEGORY_CONFIG, - device_class=SwitchDeviceClass.SWITCH, - entity_registry_enabled_default=False, - ), - ) - entities.append(entity) + entity = OPNsenseNatSwitchLegacy( + config_entry=config_entry, + coordinator=coordinator, + entity_description=SwitchEntityDescription( + key=f"nat_port_forward.{tracker}", + name=f"NAT Port Forward Rule {tracker} ({rule.get('descr', '')})", + icon="mdi:network-outline", + # entity_category=ENTITY_CATEGORY_CONFIG, + device_class=SwitchDeviceClass.SWITCH, + entity_registry_enabled_default=False, + ), + ) + entities.append(entity) _LOGGER.debug("[compile_port_forward_switches_legacy] entities: %s", len(entities)) return entities @@ -174,41 +165,36 @@ async def _compile_nat_outbound_switches_legacy( A list of OPNsenseNatSwitchLegacy entities for outbound rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("config", {}).get("nat", {}).get("outbound", {}).get("rule"), - list, - ): + rules = dict_get(state, "firewall.config.nat.outbound.rule") + if not isinstance(rules, list): return [] entities: list = [] - # nat outbound rules - # to actually be applicable, mode must by "hybrid" or "advanced" - rules = dict_get(state, "firewall.config.nat.outbound.rule") - if isinstance(rules, list): - for rule in rules: - if not isinstance(rule, dict): - continue - tracker = dict_get(rule, "created.time") - # we use tracker as the unique id - if tracker is None or len(tracker) < 1: - continue + for rule in rules: + if not isinstance(rule, dict): + continue - if "Auto created rule" in rule.get("description", ""): - continue + tracker = dict_get(rule, "created.time") + # we use tracker as the unique id + if tracker is None or len(tracker) < 1: + continue - entity = OPNsenseNatSwitchLegacy( - config_entry=config_entry, - coordinator=coordinator, - entity_description=SwitchEntityDescription( - key=f"nat_outbound.{tracker}", - name=f"NAT Outbound Rule {tracker} ({rule.get('descr', '')})", - icon="mdi:network-outline", - # entity_category=ENTITY_CATEGORY_CONFIG, - device_class=SwitchDeviceClass.SWITCH, - entity_registry_enabled_default=False, - ), - ) - entities.append(entity) + if "Auto created rule" in rule.get("descr", ""): + continue + + entity = OPNsenseNatSwitchLegacy( + config_entry=config_entry, + coordinator=coordinator, + entity_description=SwitchEntityDescription( + key=f"nat_outbound.{tracker}", + name=f"NAT Outbound Rule {tracker} ({rule.get('descr', '')})", + icon="mdi:network-outline", + # entity_category=ENTITY_CATEGORY_CONFIG, + device_class=SwitchDeviceClass.SWITCH, + entity_registry_enabled_default=False, + ), + ) + entities.append(entity) _LOGGER.debug("[compile_nat_outbound_switches_legacy] entities: %s", len(entities)) return entities @@ -418,13 +404,12 @@ async def _compile_firewall_rules_switches( A list of OPNsenseFirewallRuleSwitch entities. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("rules"), dict - ): + rules = dict_get(state, "firewall.rules") + if not isinstance(rules, dict): return [] entities: list = [] - for rule in state.get("firewall", {}).get("rules", {}).values(): + for rule in rules.values(): if not isinstance(rule, dict): continue interface = rule.get("%interface", "") @@ -467,13 +452,12 @@ async def _compile_nat_source_rules_switches( A list of OPNsenseNATRuleSwitch entities for source NAT rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("nat", {}).get("source_nat"), dict - ): + rules = dict_get(state, "firewall.nat.source_nat") + if not isinstance(rules, dict): return [] entities: list = [] - for rule in state.get("firewall", {}).get("nat", {}).get("source_nat", {}).values(): + for rule in rules.values(): if not isinstance(rule, dict): continue entity = OPNsenseNATRuleSwitch( @@ -513,13 +497,12 @@ async def _compile_nat_destination_rules_switches( A list of OPNsenseNATRuleSwitch entities for destination NAT rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("nat", {}).get("d_nat"), dict - ): + rules = dict_get(state, "firewall.nat.d_nat") + if not isinstance(rules, dict): return [] entities: list = [] - for rule in state.get("firewall", {}).get("nat", {}).get("d_nat", {}).values(): + for rule in rules.values(): if not isinstance(rule, dict): continue entity = OPNsenseNATRuleSwitch( @@ -559,13 +542,12 @@ async def _compile_nat_one_to_one_rules_switches( A list of OPNsenseNATRuleSwitch entities for one-to-one NAT rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("nat", {}).get("one_to_one"), dict - ): + rules = dict_get(state, "firewall.nat.one_to_one") + if not isinstance(rules, dict): return [] entities: list = [] - for rule in state.get("firewall", {}).get("nat", {}).get("one_to_one", {}).values(): + for rule in rules.values(): if not isinstance(rule, dict): continue entity = OPNsenseNATRuleSwitch( @@ -605,13 +587,12 @@ async def _compile_nat_npt_rules_switches( A list of OPNsenseNATRuleSwitch entities for NPTv6 NAT rules. """ - if not isinstance(state, MutableMapping) or not isinstance( - state.get("firewall", {}).get("nat", {}).get("npt"), dict - ): + rules = dict_get(state, "firewall.nat.npt") + if not isinstance(rules, dict): return [] entities: list = [] - for rule in state.get("firewall", {}).get("nat", {}).get("npt", {}).values(): + for rule in rules.values(): if not isinstance(rule, dict): continue entity = OPNsenseNATRuleSwitch( diff --git a/tests/test_switch.py b/tests/test_switch.py index 84638937..73e63bd7 100644 --- a/tests/test_switch.py +++ b/tests/test_switch.py @@ -1384,8 +1384,8 @@ async def test_compile_nat_outbound_skips_auto_created(coordinator, make_config_ "nat": { "outbound": { "rule": [ - {"description": "Auto created rule", "created": {"time": "x1"}}, - {"description": "Manual", "created": {"time": "x2"}}, + {"descr": "Auto created rule", "created": {"time": "x1"}}, + {"descr": "Manual", "created": {"time": "x2"}}, ] } }