Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions custom_components/opnsense/pyopnsense/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ async def _get_firewall_rules(self) -> dict[str, Any]:
)
# _LOGGER.debug("[get_firewall_rules] response: %s", response)
rules: list = response.get("rows", [])
_LOGGER.debug("[get_firewall_rules] rules: %s", rules)
# _LOGGER.debug("[get_firewall_rules] rules: %s", rules)
rules_dict: dict[str, Any] = {}
for rule in rules:
if not rule.get("uuid") or "lockout" in rule.get("uuid"):
Expand Down Expand Up @@ -1046,7 +1046,7 @@ async def _get_nat_one_to_one_rules(self) -> dict[str, Any]:
)
# _LOGGER.debug("[get_nat_one_to_one_rules] response: %s", response)
rules: list = response.get("rows", [])
_LOGGER.debug("[get_nat_one_to_one_rules] rules: %s", rules)
# _LOGGER.debug("[get_nat_one_to_one_rules] rules: %s", rules)
rules_dict: dict[str, Any] = {}
for rule in rules:
if not rule.get("uuid") or "lockout" in rule.get("uuid"):
Expand Down
199 changes: 90 additions & 109 deletions custom_components/opnsense/switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,45 +52,41 @@ async def _compile_filter_switches_legacy(
A list of OPNsenseFilterSwitchLegacy entities.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("config", {}).get("filter", {}).get("rule"), list
):
rules = dict_get(state, "firewall.config.filter.rule")
if not isinstance(rules, list):
return []
entities: list = []
# filter rules
rules = dict_get(state, "firewall.config.filter.rule")
if isinstance(rules, list):
for rule in rules:
if not isinstance(rule, dict):
continue
for rule in rules:
if not isinstance(rule, dict):
continue

# do NOT add rules that are NAT rules
if "associated-rule-id" in rule:
continue
# do NOT add rules that are NAT rules
if "associated-rule-id" in rule:
continue

# not possible to disable these rules
if rule.get("description", "") == "Anti-Lockout Rule":
continue
# not possible to disable these rules
if rule.get("description", "") == "Anti-Lockout Rule":
continue

tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue
tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue

entities.append(
OPNsenseFilterSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"filter.{tracker}",
name=f"Filter Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:play-network-outline",
# entity_category=entity_category,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
entities.append(
OPNsenseFilterSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"filter.{tracker}",
name=f"Filter Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:play-network-outline",
# entity_category=entity_category,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
)
_LOGGER.debug("[compile_filter_switches_legacy] entities: %s", len(entities))
return entities

Expand All @@ -117,37 +113,32 @@ async def _compile_port_forward_switches_legacy(
A list of OPNsenseNatSwitchLegacy entities for port forward rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("config", {}).get("nat", {}).get("rule"), list
):
rules = dict_get(state, "firewall.config.nat.rule")
if not isinstance(rules, list):
return []

entities: list = []
# nat port forward rules
rules = dict_get(state, "firewall.config.nat.rule")
if isinstance(rules, list):
for rule in rules:
if not isinstance(rule, dict):
continue
for rule in rules:
if not isinstance(rule, dict):
continue

tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue
tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue

entity = OPNsenseNatSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"nat_port_forward.{tracker}",
name=f"NAT Port Forward Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:network-outline",
# entity_category=ENTITY_CATEGORY_CONFIG,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
entities.append(entity)
entity = OPNsenseNatSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"nat_port_forward.{tracker}",
name=f"NAT Port Forward Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:network-outline",
# entity_category=ENTITY_CATEGORY_CONFIG,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
entities.append(entity)
_LOGGER.debug("[compile_port_forward_switches_legacy] entities: %s", len(entities))
return entities

Expand All @@ -174,41 +165,36 @@ async def _compile_nat_outbound_switches_legacy(
A list of OPNsenseNatSwitchLegacy entities for outbound rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("config", {}).get("nat", {}).get("outbound", {}).get("rule"),
list,
):
rules = dict_get(state, "firewall.config.nat.outbound.rule")
if not isinstance(rules, list):
return []
entities: list = []
# nat outbound rules
# to actually be applicable, mode must by "hybrid" or "advanced"
rules = dict_get(state, "firewall.config.nat.outbound.rule")
if isinstance(rules, list):
for rule in rules:
if not isinstance(rule, dict):
continue

tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue
for rule in rules:
if not isinstance(rule, dict):
continue

if "Auto created rule" in rule.get("description", ""):
continue
tracker = dict_get(rule, "created.time")
# we use tracker as the unique id
if tracker is None or len(tracker) < 1:
continue

entity = OPNsenseNatSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"nat_outbound.{tracker}",
name=f"NAT Outbound Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:network-outline",
# entity_category=ENTITY_CATEGORY_CONFIG,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
entities.append(entity)
if "Auto created rule" in rule.get("descr", ""):
continue

entity = OPNsenseNatSwitchLegacy(
config_entry=config_entry,
coordinator=coordinator,
entity_description=SwitchEntityDescription(
key=f"nat_outbound.{tracker}",
name=f"NAT Outbound Rule {tracker} ({rule.get('descr', '')})",
icon="mdi:network-outline",
# entity_category=ENTITY_CATEGORY_CONFIG,
device_class=SwitchDeviceClass.SWITCH,
entity_registry_enabled_default=False,
),
)
entities.append(entity)
_LOGGER.debug("[compile_nat_outbound_switches_legacy] entities: %s", len(entities))
return entities

Expand Down Expand Up @@ -418,13 +404,12 @@ async def _compile_firewall_rules_switches(
A list of OPNsenseFirewallRuleSwitch entities.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("rules"), dict
):
rules = dict_get(state, "firewall.rules")
if not isinstance(rules, dict):
return []

entities: list = []
for rule in state.get("firewall", {}).get("rules", {}).values():
for rule in rules.values():
if not isinstance(rule, dict):
continue
interface = rule.get("%interface", "")
Expand Down Expand Up @@ -467,13 +452,12 @@ async def _compile_nat_source_rules_switches(
A list of OPNsenseNATRuleSwitch entities for source NAT rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("nat", {}).get("source_nat"), dict
):
rules = dict_get(state, "firewall.nat.source_nat")
if not isinstance(rules, dict):
return []

entities: list = []
for rule in state.get("firewall", {}).get("nat", {}).get("source_nat", {}).values():
for rule in rules.values():
if not isinstance(rule, dict):
continue
entity = OPNsenseNATRuleSwitch(
Expand Down Expand Up @@ -513,13 +497,12 @@ async def _compile_nat_destination_rules_switches(
A list of OPNsenseNATRuleSwitch entities for destination NAT rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("nat", {}).get("d_nat"), dict
):
rules = dict_get(state, "firewall.nat.d_nat")
if not isinstance(rules, dict):
return []

entities: list = []
for rule in state.get("firewall", {}).get("nat", {}).get("d_nat", {}).values():
for rule in rules.values():
if not isinstance(rule, dict):
continue
entity = OPNsenseNATRuleSwitch(
Expand Down Expand Up @@ -559,13 +542,12 @@ async def _compile_nat_one_to_one_rules_switches(
A list of OPNsenseNATRuleSwitch entities for one-to-one NAT rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("nat", {}).get("one_to_one"), dict
):
rules = dict_get(state, "firewall.nat.one_to_one")
if not isinstance(rules, dict):
return []

entities: list = []
for rule in state.get("firewall", {}).get("nat", {}).get("one_to_one", {}).values():
for rule in rules.values():
if not isinstance(rule, dict):
continue
entity = OPNsenseNATRuleSwitch(
Expand Down Expand Up @@ -605,13 +587,12 @@ async def _compile_nat_npt_rules_switches(
A list of OPNsenseNATRuleSwitch entities for NPTv6 NAT rules.

"""
if not isinstance(state, MutableMapping) or not isinstance(
state.get("firewall", {}).get("nat", {}).get("npt"), dict
):
rules = dict_get(state, "firewall.nat.npt")
if not isinstance(rules, dict):
return []

entities: list = []
for rule in state.get("firewall", {}).get("nat", {}).get("npt", {}).values():
for rule in rules.values():
if not isinstance(rule, dict):
continue
entity = OPNsenseNATRuleSwitch(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,8 @@ async def test_compile_nat_outbound_skips_auto_created(coordinator, make_config_
"nat": {
"outbound": {
"rule": [
{"description": "Auto created rule", "created": {"time": "x1"}},
{"description": "Manual", "created": {"time": "x2"}},
{"descr": "Auto created rule", "created": {"time": "x1"}},
{"descr": "Manual", "created": {"time": "x2"}},
]
}
}
Expand Down
Loading