diff --git a/meta/runtime.yml b/meta/runtime.yml index 719b0017..065cd7f1 100644 --- a/meta/runtime.yml +++ b/meta/runtime.yml @@ -19,6 +19,8 @@ action_groups: - proxmox_cluster_join_info - proxmox_disk - proxmox_domain_info + - proxmox_firewall + - proxmox_firewall_info - proxmox_group - proxmox_group_info - proxmox_kvm diff --git a/plugins/module_utils/proxmox.py b/plugins/module_utils/proxmox.py index 0455e793..fc38907e 100644 --- a/plugins/module_utils/proxmox.py +++ b/plugins/module_utils/proxmox.py @@ -73,6 +73,53 @@ def ansible_to_proxmox_bool(value): return 1 if value else 0 +def compare_list_of_dicts(existing_list, new_list, uid, params_to_ignore=None): + """ Compare 2 list of dicts + Use case - for firewall rules we will be getting a list of rules from user. + We want to filter out which rules needs to be updated and which rules are completely new and needs to be created + + :param existing_list: Existing values example - list of existing rules + :param new_list: New values example - list of rules passed to module + :param uid: unique identifier in dict. It should always be present in both lists - in case of firewall rules it's pos + :param params_to_ignore: list of params we want to ignore which are present in existing_list's dict. + In case of firewall rules we want to ignore ['digest', 'ipversion'] + + :return: returns 2 list items 1st is the list of items which are completely new and needs to be created + 2nd is a list of items which needs to be updated + """ + if params_to_ignore is None: + params_to_ignore = list() + items_to_update = [] + new_list = [{k: v for k, v in item.items() if v is not None and k not in params_to_ignore} for item in new_list] + + if existing_list is None: + items_to_create = new_list + items_to_update = list() + return items_to_create, items_to_update + + existing_list = {x[uid]: x for x in existing_list} + new_list = {x[uid]: x for x in new_list} + + common_uids = set(existing_list.keys()).intersection(set(new_list.keys())) + missing_uids = set(new_list.keys()) - set(existing_list.keys()) + items_to_create = [new_list[uid] for uid in missing_uids] + + for uid in common_uids: + # If new rule has a parameter that is not present in existing rule we need to update + if set(new_list[uid].keys()) - set(existing_list[uid].keys()) != set(): + items_to_update.append(new_list[uid]) + continue + + # If existing rule param value doesn't match new rule param OR + # If existing rule has a param that is not present in new rule except for params in params_to_ignore + for existing_rule_param, existing_parm_value in existing_list[uid].items(): + if (existing_rule_param not in params_to_ignore and + new_list[uid].get(existing_rule_param) != existing_parm_value): + items_to_update.append(new_list[uid]) + + return items_to_create, items_to_update + + class ProxmoxAnsible(object): """Base class for Proxmox modules""" TASK_TIMED_OUT = 'timeout expired' diff --git a/plugins/module_utils/proxmox_sdn.py b/plugins/module_utils/proxmox_sdn.py index 3ed65854..7e764a1f 100644 --- a/plugins/module_utils/proxmox_sdn.py +++ b/plugins/module_utils/proxmox_sdn.py @@ -99,3 +99,47 @@ def get_zones(self, zone_type: str = None) -> List[Dict]: self.module.fail_json( msg=f'Failed to retrieve zone information from cluster: {e}' ) + + def get_aliases(self, firewall_obj): + """Get aliases for IP/CIDR at given firewall endpoint level + + :param firewall_obj: Firewall endpoint as a ProxmoxResource e.g. self.proxmox_api.cluster().firewall + If it is None it'll return an empty list + :return: List of aliases and corresponding IP/CIDR + """ + if firewall_obj is None: + return list() + try: + return firewall_obj().aliases().get() + except Exception as e: + self.module.fail_json( + msg=f'Failed to retrieve aliases - {e}' + ) + + def get_fw_rules(self, rules_obj, pos=None): + """Get firewall rules at given rules endpoint level + + :param rules_obj: Firewall Rules endpoint as a ProxmoxResource e.g. self.proxmox_api.cluster().firewall().rules + :param pos: Rule position if it is None it'll return all rules + :return: Firewall rules as a list of dict + """ + if pos is not None: + pos = str(pos) + try: + return rules_obj(pos).get() + except Exception as e: + self.module.fail_json( + msg=f'Failed to retrieve firewall rules: {e}' + ) + + def get_groups(self): + """Get firewall security groups + + :return: list of groups + """ + try: + return [x['group'] for x in self.proxmox_api.cluster().firewall().groups().get()] + except Exception as e: + self.module.fail_json( + msg=f'Failed to retrieve firewall security groups: {e}' + ) diff --git a/plugins/modules/proxmox_firewall.py b/plugins/modules/proxmox_firewall.py new file mode 100644 index 00000000..c5023dcc --- /dev/null +++ b/plugins/modules/proxmox_firewall.py @@ -0,0 +1,685 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2025, Jana Hoch +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = r""" +module: proxmox_firewall +short_description: Manage firewall rules in Proxmox +version_added: "1.4.0" +description: + - create/update/delete FW rules at cluster/group/vnet/node/vm level + - Create/delete firewall security groups + - Create/delete aliases +author: 'Jana Hoch (!UNKNOWN)' +attributes: + check_mode: + support: none + diff_mode: + support: none +options: + state: + description: + - Create/update/delete firewall rules or security group. + type: str + choices: + - present + - absent + default: present + update: + description: + - If O(state=present) and if one or more rule/alias already exists it will update them. + type: bool + default: true + level: + description: + - Level at which the firewall rule applies. + type: str + choices: + - cluster + - group + - vnet + - node + - vm + default: cluster + node: + description: + - Name of the node. + - Only needed when O(level=node). + type: str + vmid: + description: + - ID of the VM to which the rule applies. + - Only needed when O(level=vm). + type: int + vnet: + description: + - Name of the virtual network for the rule. + - Only needed when O(level=vnet). + type: str + pos: + description: + - Position of the rule in the list. + - Only needed if O(state=absent). + type: int + group_conf: + description: + - Whether security group should be created or deleted. + type: bool + default: false + group: + description: + - Name of the group to which the rule belongs. + - Only needed when O(level=group) or O(group_conf=true). + type: str + comment: + description: + - Comment for security group. + - Only needed when creating group. + type: str + aliases: + description: + - List of aliases. + - Alias can only be created/updated/deleted at cluster or VM level. + type: list + elements: dict + suboptions: + name: + description: Alias name. + type: str + required: true + cidr: + description: + - CIDR for alias. + - Only needed when O(state=present) or O(state=update). + type: str + required: false + comment: + description: Comment for alias. + type: str + required: false + rules: + description: + - List of individual rules to be applied. + type: list + elements: dict + suboptions: + action: + description: + - Rule action ('ACCEPT', 'DROP', 'REJECT') or security group name. + type: str + required: true + type: + description: + - Rule type. + choices: + - in + - out + - forward + - group + type: str + required: true + comment: + description: + - Optional comment for the specific rule. + type: str + dest: + description: + - Restrict packet destination address. + - This can refer to a single IP address, an IP set ('+ipsetname') or an IP alias definition. + - You can also specify an address range like '20.34.101.207-201.3.9.99', or a list of IP addresses and networks (entries are separated by comma). + - Please do not mix IPv4 and IPv6 addresses inside such lists. + type: str + digest: + description: + - Prevent changes if current configuration file has a different digest. + - This can be used to prevent concurrent modifications. + - If not provided we will calculate at runtime. + type: str + dport: + description: + - Restrict TCP/UDP destination port. + - You can use service names or simple numbers (0-65535), as defined in '/etc/services'. + - Port ranges can be specified with '\d+:\d+', for example '80:85', and you can use comma separated list to match several ports or ranges. + type: str + enable: + description: + - Enable or disable the rule. + type: bool + icmp_type: + description: + - Specify icmp-type. Only valid if proto equals 'icmp' or 'icmpv6'/'ipv6-icmp'. + type: str + iface: + description: + - Network interface name. You have to use network configuration key names for VMs and containers ('net\d+'). + - Host related rules can use arbitrary strings. + type: str + log: + description: + - Logging level for the rule. + choices: + - emerg + - alert + - crit + - err + - warning + - notice + - info + - debug + - nolog + type: str + macro: + description: + - Use predefined standard macro. + type: str + pos: + description: + - Position of the rule in the list. + type: int + required: true + proto: + description: + - IP protocol. You can use protocol names ('tcp'/'udp') or simple numbers, as defined in '/etc/protocols'. + type: str + source: + description: + - Restrict packet source address. + - This can refer to a single IP address, an IP set ('+ipsetname') or an IP alias definition. + - You can also specify an address range like '20.34.101.207-201.3.9.99', or a list of IP addresses and networks (entries are separated by comma). + - Please do not mix IPv4 and IPv6 addresses inside such lists. + type: str + sport: + description: + - Restrict TCP/UDP source port. + - You can use service names or simple numbers (0-65535), as defined in '/etc/services'. + - Port ranges can be specified with '\d+:\d+', for example '80:85', and you can use comma separated list to match several ports or ranges. + type: str +extends_documentation_fragment: + - community.proxmox.proxmox.actiongroup_proxmox + - community.proxmox.proxmox.documentation + - community.proxmox.attributes +""" + +EXAMPLES = r""" +- name: Create firewall rules at cluster level + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + level: cluster + state: present + rules: + - type: out + action: ACCEPT + source: 1.1.1.1 + log: nolog + pos: 9 + enable: true + - type: out + action: ACCEPT + source: 1.0.0.1 + pos: 10 + enable: true + +- name: Update Cluster level firewall rules + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + level: cluster + state: present + update: true + rules: + - type: out + action: ACCEPT + source: 8.8.8.8 + log: nolog + pos: 9 + enable: false + - type: out + action: ACCEPT + source: 8.8.4.4 + pos: 10 + enable: false + +- name: Delete cluster level firewall rule at pos 10 + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + level: cluster + state: absent + pos: 10 + +- name: Create security group + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + group_conf: true + state: present + group: test + +- name: Delete security group + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + group_conf: true + state: absent + group: test + +- name: Create FW aliases + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + state: present + aliases: + - name: test1 + cidr: '10.10.1.0/24' + - name: test2 + cidr: '10.10.2.0/24' + +- name: Update FW aliases + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + state: present + update: true + aliases: + - name: test1 + cidr: '10.10.1.0/28' + - name: test2 + cidr: '10.10.2.0/28' + +- name: Delete FW aliases + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + state: absent + aliases: + - name: test1 + - name: test2 +""" + +RETURN = r""" +group: + description: group name which was created/deleted + returned: on success + type: str + sample: + test +""" + +from ansible.module_utils.basic import AnsibleModule +from ansible_collections.community.proxmox.plugins.module_utils.proxmox_sdn import ProxmoxSdnAnsible +from ansible_collections.community.proxmox.plugins.module_utils.proxmox import ( + proxmox_auth_argument_spec, + ansible_to_proxmox_bool, + compare_list_of_dicts +) + + +def get_proxmox_args(): + return dict( + state=dict(type="str", choices=["present", "absent"], default="present"), + update=dict(type="bool", default=True), + level=dict(type="str", choices=["cluster", "node", "vm", "vnet", "group"], default="cluster", required=False), + node=dict(type="str", required=False), + vmid=dict(type="int", required=False), + vnet=dict(type="str", required=False), + pos=dict(type="int", required=False), + group_conf=dict(type="bool", default=False), + group=dict(type="str", required=False), + comment=dict(type="str", required=False), + aliases=dict( + type="list", + elements="dict", + required=False, + options=dict( + name=dict(type="str", required=True), + cidr=dict(type="str", required=False), + comment=dict(type="str", required=False) + ) + ), + rules=dict( + type="list", + elements="dict", + required=False, + options=dict( + action=dict(type="str", required=True), + type=dict(type="str", choices=["in", "out", "forward", "group"], required=True), + comment=dict(type="str", required=False), + dest=dict(type="str", required=False), + digest=dict(type="str", required=False), + dport=dict(type="str", required=False), + enable=dict(type="bool", required=False), + icmp_type=dict(type="str", required=False), + iface=dict(type="str", required=False), + log=dict(type="str", + choices=["emerg", "alert", "crit", "err", "warning", "notice", "info", "debug", "nolog"], + required=False), + macro=dict(type="str", required=False), + pos=dict(type="int", required=True), + proto=dict(type="str", required=False), + source=dict(type="str", required=False), + sport=dict(type="str", required=False) + ) + ) + ) + + +def get_ansible_module(): + module_args = proxmox_auth_argument_spec() + module_args.update(get_proxmox_args()) + + return AnsibleModule( + argument_spec=module_args, + required_if=[ + ('group_conf', True, ['group']), + ('level', 'vm', ['vmid']), + ('level', 'node', ['node']), + ('level', 'vnet', ['vnet']), + ('level', 'group', ['group']), + ], + mutually_exclusive=[ + ('aliases', 'rules'), + ] + ) + + +class ProxmoxFirewallAnsible(ProxmoxSdnAnsible): + def __init__(self, module): + super(ProxmoxFirewallAnsible, self).__init__(module) + self.params = module.params + + def validate_params(self): + if self.params.get('state') == 'present': + if self.params.get('group_conf') != bool(self.params.get('rules') or self.params.get('aliases')): + return True + else: + self.module.fail_json( + msg="When state is present either group_conf should be true or rules/aliases must be present but not both" + ) + elif self.params.get('state') == 'absent': + if self.params.get('group_conf') != bool( + (self.params.get('pos') is not None) or self.params.get('aliases')): + return True + else: + self.module.fail_json( + msg="When state is absent either group_conf should be true or pos/aliases must be present but not both" + ) + + def run(self): + self.validate_params() + + state = self.params.get("state") + update = self.params.get("update") + level = self.params.get("level") + aliases = self.params.get("aliases") + rules = self.params.get("rules") + group = self.params.get("group") + group_conf = self.params.get("group_conf") + + if rules: + for rule in rules: + rule['icmp-type'] = rule.get('icmp_type') + rule['enable'] = ansible_to_proxmox_bool(rule.get('enable')) + del rule['icmp_type'] + + if level == "vm": + vm = self.get_vm(vmid=self.params.get('vmid')) + node = self.proxmox_api.nodes(vm['node']) + virt = node(vm['type']) + firewall_obj = virt(str(vm['vmid'])).firewall + rules_obj = firewall_obj().rules + + elif level == "node": + firewall_obj = self.proxmox_api.nodes(self.params.get('node')).firewall + rules_obj = firewall_obj().rules + + elif level == "vnet": + firewall_obj = self.proxmox_api.cluster().sdn().vnets(self.params.get('vnet')).firewall + rules_obj = firewall_obj().rules + + elif level == "group": + firewall_obj = None + rules_obj = self.proxmox_api.cluster().firewall().groups(group) + + else: + firewall_obj = self.proxmox_api.cluster().firewall + rules_obj = firewall_obj().rules + + if state == "present": + if group_conf: + self.group_present(group=group, comment=self.params.get('comment')) + if rules: + self.fw_rules_present(rules_obj=rules_obj, rules=rules, update=update) + if aliases: + self.aliases_present(firewall_obj=firewall_obj, level=level, aliases=aliases, update=update) + elif state == "absent": + if self.params.get('pos') is not None: + self.fw_rule_absent(rules_obj=rules_obj, pos=self.params.get('pos')) + if group_conf: + self.group_absent(group_name=group) + if aliases: + self.aliases_absent(firewall_obj=firewall_obj, aliases=aliases) + + def aliases_present(self, firewall_obj, level, aliases, update): + if not firewall_obj or level not in ['cluster', 'vm']: + self.module.fail_json( + msg='Aliases can only be created at cluster or VM level' + ) + + aliases_to_create, aliases_to_update = compare_list_of_dicts( + existing_list=self.get_aliases(firewall_obj=firewall_obj), + new_list=aliases, + uid='name', + params_to_ignore=['digest', 'ipversion'] + ) + + if len(aliases_to_create) == 0 and len(aliases_to_update) == 0: + self.module.exit_json(changed=False, msg='No need to create/update any aliases') + elif len(aliases_to_update) > 0 and not update: + self.module.fail_json( + msg=f"Need to update aliases - {[x['name'] for x in aliases_to_update]} but update is false" + ) + + for alias in aliases_to_create: + try: + firewall_obj().aliases().post(**alias) + except Exception as e: + self.module.fail_json( + msg=f"Failed to create Alias {alias['name']} - {e}" + ) + for alias in aliases_to_update: + try: + firewall_obj().aliases(alias['name']).put(**alias) + except Exception as e: + self.module.fail_json( + msg=f"Failed to update Alias {alias['name']} - {e}" + ) + + self.module.exit_json(changed=True, msg="Aliases created/updated") + + def aliases_absent(self, firewall_obj, aliases): + existing_aliases = set([x.get('name') for x in self.get_aliases(firewall_obj=firewall_obj)]) + aliases = set([x.get('name') for x in aliases]) + aliases_to_delete = list(existing_aliases.intersection(aliases)) + + if len(aliases_to_delete) == 0: + self.module.exit_json( + changed=False, + msg="No need to delete any alias" + ) + for alias_name in aliases_to_delete: + try: + alias_obj = getattr(firewall_obj().aliases(), alias_name) + alias_obj().delete() + except Exception as e: + self.module.fail_json( + msg=f"Failed to delete alias {alias_name} - {e}" + ) + self.module.exit_json( + changed=True, + msg="Successfully deleted aliases" + ) + + def group_present(self, group, comment=None): + if group in self.get_groups(): + self.module.exit_json( + changed=False, group=group, msg=f"security group {group} already exists" + ) + try: + self.proxmox_api.cluster().firewall().groups.post(group=group, comment=comment) + self.module.exit_json( + changed=True, group=group, msg=f'successfully created security group {group}' + ) + except Exception as e: + self.module.fail_json( + msg=f'Failed to create security group: {e}' + ) + + def group_absent(self, group_name): + if group_name not in self.get_groups(): + self.module.exit_json( + changed=False, group=group_name, msg=f"security group {group_name} already doesn't exists" + ) + try: + group = getattr(self.proxmox_api.cluster().firewall().groups(), group_name) + group.delete() + self.module.exit_json( + changed=True, group=group_name, msg=f'successfully deleted security group {group_name}' + ) + except Exception as e: + self.module.fail_json( + msg=f'Failed to delete security group {group_name}: {e}' + ) + + def fw_rule_absent(self, rules_obj, pos): + try: + for item in self.get_fw_rules(rules_obj): + if item.get('pos') == pos: + break + else: + self.module.exit_json( + changed=False, msg="Firewall rule already doesn't exist" + ) + rule_obj = getattr(rules_obj(), str(pos)) + digest = rule_obj.get().get('digest') + rule_obj.delete(pos=pos, digest=digest) + + self.module.exit_json( + changed=True, msg='successfully deleted firewall rules' + ) + except Exception as e: + self.module.fail_json( + msg=f'Failed to delete firewall rule at pos {pos}: {e}' + ) + + def fw_rules_present(self, rules_obj, rules, update): + existing_rules = self.get_fw_rules(rules_obj=rules_obj) + rules_to_create, rules_to_update = compare_list_of_dicts( + existing_list=existing_rules, + new_list=rules, + uid='pos', + params_to_ignore=['digest', 'ipversion'] + ) + + if len(rules_to_create) == 0 and len(rules_to_update) == 0: + self.module.exit_json(changed=False, msg='No need to create/update any rule') + elif len(rules_to_update) > 0 and not update: + self.module.fail_json( + msg=f"Need to update rules at pos - {[x['pos'] for x in rules_to_update]} but update is false" + ) + + for rule in rules_to_update: + try: + rule_obj = getattr(rules_obj(), str(rule['pos'])) + rule['digest'] = rule_obj.get().get('digest') # Avoids concurrent changes + rule_obj.put(**rule) + + except Exception as e: + self.module.fail_json( + msg=f'Failed to update firewall rule at pos {rule["pos"]}: {e}' + ) + for rule in rules_to_create: + try: + rules_obj().post(**rule) + self.move_rule_to_correct_pos(rules_obj, rule) + + except Exception as e: + self.module.fail_json( + msg=f'Failed to create firewall rule {rule}: {e}' + ) + self.module.exit_json( + changed=True, msg='successfully created/updated firewall rules' + ) + + def move_rule_to_correct_pos(self, rules_obj, rule): + ################################################################################################## + # TODO: Once below mentioned issue is fixed. Remove this workaround. # + # Currently Proxmox API doesn't honor pos. All new rules are created at pos 0 # + # https://forum.proxmox.com/threads/issue-when-creating-a-firewall-rule.135878/ # + # Not able to find it in BUGZILLA. So maybe this is expected behaviour. # + # To workaround this issue we will check rule at pos 0 and if needed move it to correct position # + ################################################################################################## + + pos = rule.get('pos') + rule = {k: v for k, v in rule.items() if v is not None} + if pos is not None and pos != 0: + try: + fw_rule_at0 = getattr(rules_obj(), str(0)) + for param, value, in fw_rule_at0.get().items(): + if param in rule.keys() and param != 'pos' and value != rule.get(param): + self.module.warn( + msg=f'Skipping workaround for rule placement. ' + f'Verify rule is at correct pos ' + f'provided - {rule} rule_at0 - {fw_rule_at0.get()}') + break # No need to move this. Potentially the issue is resolved. + else: + fw_rule_at0.put(moveto=(pos + 1)) # moveto moves rule to one position before the value + except Exception as e: + self.module.fail_json( + msg=f'Rule created but failed to move it to correct pos. {e}' + ) + + +def main(): + module = get_ansible_module() + proxmox = ProxmoxFirewallAnsible(module) + + try: + proxmox.run() + except Exception as e: + module.fail_json(msg=f'An error occurred: {e}') + + +if __name__ == "__main__": + main() diff --git a/plugins/modules/proxmox_firewall_info.py b/plugins/modules/proxmox_firewall_info.py new file mode 100644 index 00000000..814f6572 --- /dev/null +++ b/plugins/modules/proxmox_firewall_info.py @@ -0,0 +1,306 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2025, Jana Hoch +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = r""" +module: proxmox_firewall_info +short_description: Manage firewall rules in Proxmox +version_added: "1.4.0" +description: + - Get firewall rules at cluster/group/vnet/node/vm level. + - Get firewall security groups at cluster level. + - Get aliases at cluster/VM level. +author: 'Jana Hoch (!UNKNOWN)' +options: + level: + description: + - Level at which the firewall rule applies. + type: str + choices: + - cluster + - group + - vnet + - node + - vm + default: cluster + node: + description: + - Name of the node. + - Only needed when O(level=node). + type: str + vmid: + description: + - ID of the VM to which the rule applies. + - Only needed when O(level=vm). + type: int + vnet: + description: + - Name of the virtual network for the rule. + - Only needed when O(level=vnet). + type: str + pos: + description: + - Position of the rule in the list. + type: int + group: + description: + - Name of the group to which the rule belongs. + - Only needed when O(level=group). + type: str +extends_documentation_fragment: + - community.proxmox.proxmox.actiongroup_proxmox + - community.proxmox.proxmox.documentation + - community.proxmox.attributes + - community.proxmox.attributes.info_module +""" + +EXAMPLES = r""" +- name: Get Cluster level firewall rules, aliases, and security groups + community.proxmox.proxmox_firewall: + api_user: "{{ pc.proxmox.api_user }}" + api_token_id: "{{ pc.proxmox.api_token_id }}" + api_token_secret: "{{ vault.proxmox.api_token_secret }}" + api_host: "{{ pc.proxmox.api_host }}" + validate_certs: false + level: cluster +""" + +RETURN = r""" +groups: + description: + - List of firewall security groups. + - This will always be given for cluster level regardless of the level passed. + - Because only at cluster level we can have firewall security groups. + returned: on success + type: list + elements: str + sample: + [ "test" ] + +aliases: + description: + - List of alias present at given level. + - Aliases are only available for cluster and VM level so if any other level it'll be empty list. + returned: on success + type: list + elements: dict + sample: + [ + { + "cidr": "10.10.1.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4, + "name": "test1" + }, + { + "cidr": "10.10.2.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4, + "name": "test2" + }, + { + "cidr": "10.10.3.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4, + "name": "test3" + } + ] + +firewall_rules: + description: List of firewall rules at given level. + returned: on success + type: list + elements: dict + sample: + [ + { + "action": "ACCEPT", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "dport": "53", + "enable": 1, + "ipversion": 4, + "log": "nolog", + "pos": 0, + "proto": "udp", + "source": "192.168.1.0/24", + "type": "in" + }, + { + "action": "ACCEPT", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "dport": "53", + "enable": 1, + "ipversion": 4, + "log": "nolog", + "pos": 1, + "proto": "tcp", + "source": "192.168.1.0/24", + "type": "in" + }, + { + "action": "ACCEPT", + "dest": "192.168.1.0/24", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "ipversion": 4, + "log": "nolog", + "pos": 2, + "type": "out" + }, + { + "action": "ACCEPT", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "ipversion": 4, + "log": "nolog", + "pos": 3, + "source": "192.168.1.0/24", + "type": "in" + }, + { + "action": "ACCEPT", + "dest": "+sdn/test2-gateway", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "iface": "test2", + "log": "nolog", + "macro": "DNS", + "pos": 4, + "type": "in" + }, + { + "action": "ACCEPT", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "iface": "test2", + "log": "nolog", + "macro": "DHCPfwd", + "pos": 5, + "type": "in" + }, + { + "action": "ACCEPT", + "dest": "+sdn/test2-all", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "dport": "68", + "enable": 1, + "log": "nolog", + "pos": 6, + "proto": "udp", + "source": "+sdn/test2-gateway", + "sport": "67", + "type": "out" + }, + { + "action": "DROP", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "log": "nolog", + "pos": 7, + "type": "in" + }, + { + "action": "DROP", + "digest": "b5ddaed23b415b9368706fc9edc83d037526aae9", + "enable": 1, + "log": "nolog", + "pos": 8, + "type": "out" + } + ] +""" + +from ansible.module_utils.basic import AnsibleModule +from ansible_collections.community.proxmox.plugins.module_utils.proxmox_sdn import ProxmoxSdnAnsible +from ansible_collections.community.proxmox.plugins.module_utils.proxmox import proxmox_auth_argument_spec + + +def get_proxmox_args(): + return dict( + level=dict(type="str", choices=["cluster", "node", "vm", "vnet", "group"], default="cluster", required=False), + node=dict(type="str", required=False), + vmid=dict(type="int", required=False), + vnet=dict(type="str", required=False), + group=dict(type="str", required=False), + pos=dict(type="int", required=False), + ) + + +def get_ansible_module(): + module_args = proxmox_auth_argument_spec() + module_args.update(get_proxmox_args()) + + return AnsibleModule( + argument_spec=module_args, + supports_check_mode=True, + required_if=[ + ('level', 'vm', ['vmid']), + ('level', 'node', ['node']), + ('level', 'vnet', ['vnet']), + ('level', 'group', ['group']), + ] + ) + + +class ProxmoxFirewallInfoAnsible(ProxmoxSdnAnsible): + def __init__(self, module): + super(ProxmoxFirewallInfoAnsible, self).__init__(module) + self.params = module.params + + def run(self): + level = self.params.get("level") + + if level == "vm": + vm = self.get_vm(vmid=self.params.get('vmid')) + node = self.proxmox_api.nodes(vm['node']) + virt = node(vm['type']) + firewall_obj = virt(str(vm['vmid'])).firewall + rules_obj = firewall_obj().rules + + elif level == "node": + firewall_obj = self.proxmox_api.nodes(self.params.get('node')).firewall + rules_obj = firewall_obj().rules + + elif level == "vnet": + firewall_obj = self.proxmox_api.cluster().sdn().vnets(self.params.get('vnet')).firewall + rules_obj = firewall_obj().rules + + elif level == "group": + firewall_obj = None + rules_obj = self.proxmox_api.cluster().firewall().groups(self.params.get("group")) + + else: + firewall_obj = self.proxmox_api.cluster().firewall + rules_obj = firewall_obj().rules + + rules = self.get_fw_rules(rules_obj, pos=self.params.get('pos')) + groups = self.get_groups() + aliases = self.get_aliases(firewall_obj=firewall_obj) + self.module.exit_json( + changed=False, + firewall_rules=rules, + groups=groups, + aliases=aliases, + msg='successfully retrieved firewall rules and groups' + ) + + +def main(): + module = get_ansible_module() + proxmox = ProxmoxFirewallInfoAnsible(module) + + try: + proxmox.run() + except Exception as e: + module.fail_json(msg=f'An error occurred: {e}') + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/modules/test_proxmox_firewall.py b/tests/unit/plugins/modules/test_proxmox_firewall.py new file mode 100644 index 00000000..d9975f11 --- /dev/null +++ b/tests/unit/plugins/modules/test_proxmox_firewall.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2025, Jana Hoch +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from unittest.mock import patch + +import pytest + +proxmoxer = pytest.importorskip("proxmoxer") + +from ansible.module_utils import basic +from ansible_collections.community.proxmox.plugins.modules import proxmox_firewall +from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( + ModuleTestCase, + set_module_args, +) +import ansible_collections.community.proxmox.plugins.module_utils.proxmox as proxmox_utils + +RAW_FIREWALL_RULES = [ + { + "ipversion": 4, + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", + "log": "nolog", + "action": "ACCEPT", + "enable": 1, + "type": "out", + "source": "1.1.1.1", + "pos": 0 + }, + { + "enable": 1, + "pos": 1, + "source": "1.0.0.1", + "type": "out", + "action": "ACCEPT", + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", + "ipversion": 4 + } +] + +RAW_GROUPS = [ + { + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62", + "group": "test1" + }, + { + "group": "test2", + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62" + } +] + + +def exit_json(*args, **kwargs): + """function to patch over exit_json; package return data into an exception""" + if 'changed' not in kwargs: + kwargs['changed'] = False + raise SystemExit(kwargs) + + +def fail_json(*args, **kwargs): + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise SystemExit(kwargs) + + +def get_module_args_group_conf(group, level="cluster", state="present"): + return { + "api_host": "host", + "api_user": "user", + "api_password": "password", + "level": level, + "group": group, + "group_conf": True, + "state": state + } + + +def get_module_args_rules(state, pos=1, level='cluster', source_ip='1.1.1.1'): + return { + "api_host": "host", + "api_user": "user", + "api_password": "password", + "level": level, + "state": state, + 'rules': [ + { + 'type': 'out', + 'action': 'ACCEPT', + 'source': source_ip, + 'pos': pos, + 'enable': True + } + ] + } + + +def get_module_args_fw_delete(pos, level='cluster', state='absent'): + return { + "api_host": "host", + "api_user": "user", + "api_password": "password", + "level": level, + "state": state, + "pos": pos + } + + +class TestProxmoxFirewallModule(ModuleTestCase): + def setUp(self): + super(TestProxmoxFirewallModule, self).setUp() + proxmox_utils.HAS_PROXMOXER = True + self.module = proxmox_firewall + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.connect_mock = patch( + "ansible_collections.community.proxmox.plugins.module_utils.proxmox.ProxmoxAnsible._connect", + ).start() + self.connect_mock.return_value.cluster.return_value.firewall.return_value.rules.return_value.get.return_value = RAW_FIREWALL_RULES + self.connect_mock.return_value.cluster.return_value.firewall.return_value.groups.return_value.get.return_value = RAW_GROUPS + + def tearDown(self): + self.connect_mock.stop() + self.mock_module_helper.stop() + super(TestProxmoxFirewallModule, self).tearDown() + + def test_create_group(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args_group_conf(group='test')): + self.module.main() + result = exc_info.value.args[0] + assert result['changed'] is True + assert result["msg"] == 'successfully created security group test' + assert result['group'] == 'test' + + def test_delete_group(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args_group_conf(group='test1', state="absent")): + self.module.main() + result = exc_info.value.args[0] + assert result['changed'] is True + assert result["msg"] == 'successfully deleted security group test1' + assert result['group'] == 'test1' + + def test_create_fw_rules(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args_rules(state='present', pos=2)): + self.module.main() + result = exc_info.value.args[0] + assert result['changed'] is True + assert result["msg"] == 'successfully created/updated firewall rules' + + def test_delete_fw_rule(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args_fw_delete(state='absent', pos=0)): + self.module.main() + result = exc_info.value.args[0] + assert result['changed'] is True + assert result["msg"] == 'successfully deleted firewall rules' diff --git a/tests/unit/plugins/modules/test_proxmox_firewall_info.py b/tests/unit/plugins/modules/test_proxmox_firewall_info.py new file mode 100644 index 00000000..938af769 --- /dev/null +++ b/tests/unit/plugins/modules/test_proxmox_firewall_info.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2025, Jana Hoch +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +from unittest.mock import patch + +import pytest + +proxmoxer = pytest.importorskip("proxmoxer") + +from ansible.module_utils import basic +from ansible_collections.community.proxmox.plugins.modules import proxmox_firewall_info +from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( + ModuleTestCase, + set_module_args, +) +import ansible_collections.community.proxmox.plugins.module_utils.proxmox as proxmox_utils + +RAW_FIREWALL_RULES = [ + { + "ipversion": 4, + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", + "log": "nolog", + "action": "ACCEPT", + "enable": 1, + "type": "out", + "source": "1.1.1.1", + "pos": 0 + }, + { + "enable": 1, + "pos": 1, + "source": "1.0.0.1", + "type": "out", + "action": "ACCEPT", + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", + "ipversion": 4 + } +] + +RAW_GROUPS = [ + { + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62", + "group": "test1" + }, + { + "group": "test2", + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62" + } +] + +RAW_ALIASES = [ + { + "name": "test1", + "cidr": "10.10.1.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4 + }, + { + "name": "test2", + "cidr": "10.10.2.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4 + }, + { + "name": "test3", + "cidr": "10.10.3.0/24", + "digest": "978391f460484e8d4fb3ca785cfe5a9d16fe8b1f", + "ipversion": 4 + } +] + +RAW_CLUSTER_RESOURCES = [ + { + "vmid": 100, + "maxcpu": 8, + "memhost": 860138496, + "type": "qemu", + "id": "qemu/100", + "diskread": 127452302, + "netin": 42, + "netout": 0, + "cpu": 0.0046731498237984, + "uptime": 119787, + "template": 0, + "disk": 0, + "name": "nextcloud", + "maxdisk": 644245094400, + "mem": 445415424, + "status": "running", + "diskwrite": 1024, + "maxmem": 8589934592, + "node": "pve" + } +] + + +def exit_json(*args, **kwargs): + """function to patch over exit_json; package return data into an exception""" + if 'changed' not in kwargs: + kwargs['changed'] = False + raise SystemExit(kwargs) + + +def fail_json(*args, **kwargs): + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise SystemExit(kwargs) + + +def get_module_args(level="cluster", vmid=None, node=None, vnet=None, group=None): + return { + "api_host": "host", + "api_user": "user", + "api_password": "password", + "level": level, + "vmid": vmid, + "node": node, + "vnet": vnet, + "group": group + } + + +class TestProxmoxFirewallModule(ModuleTestCase): + def setUp(self): + super(TestProxmoxFirewallModule, self).setUp() + proxmox_utils.HAS_PROXMOXER = True + self.module = proxmox_firewall_info + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.connect_mock = patch( + "ansible_collections.community.proxmox.plugins.module_utils.proxmox.ProxmoxAnsible._connect", + ).start() + + self.connect_mock.return_value.cluster.resources.get.return_value = ( + RAW_CLUSTER_RESOURCES + ) + + mock_cluster_fw = self.connect_mock.return_value.cluster.return_value.firewall.return_value + mock_vm100_fw = self.connect_mock.return_value.nodes.return_value.return_value.return_value.firewall.return_value + + mock_cluster_fw.rules.return_value.get.return_value = RAW_FIREWALL_RULES + mock_cluster_fw.groups.return_value.get.return_value = RAW_GROUPS + mock_cluster_fw.aliases.return_value.get.return_value = RAW_ALIASES + + mock_vm100_fw.rules.return_value.get.return_value = RAW_FIREWALL_RULES + mock_vm100_fw.aliases.return_value.get.return_value = RAW_ALIASES + + def tearDown(self): + self.connect_mock.stop() + self.mock_module_helper.stop() + super(TestProxmoxFirewallModule, self).tearDown() + + def test_cluster_level_info(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args()): + self.module.main() + result = exc_info.value.args[0] + assert result["changed"] is False + assert result["msg"] == "successfully retrieved firewall rules and groups" + assert result["firewall_rules"] == RAW_FIREWALL_RULES + assert result["groups"] == ['test1', 'test2'] + assert result["aliases"] == RAW_ALIASES + + def test_vm_level_info(self): + with pytest.raises(SystemExit) as exc_info: + with set_module_args(get_module_args(level='vm', vmid=100)): + self.module.main() + result = exc_info.value.args[0] + assert result["changed"] is False + assert result["msg"] == "successfully retrieved firewall rules and groups" + assert result["firewall_rules"] == RAW_FIREWALL_RULES + assert result["groups"] == ['test1', 'test2'] + assert result["aliases"] == RAW_ALIASES