|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# |
| 3 | +# Copyright (c) 2025, Jana Hoch <[email protected]> |
| 4 | +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) |
| 5 | +# SPDX-License-Identifier: GPL-3.0-or-later |
| 6 | + |
| 7 | +from __future__ import absolute_import, division, print_function |
| 8 | + |
| 9 | +__metaclass__ = type |
| 10 | + |
| 11 | +from unittest.mock import patch |
| 12 | + |
| 13 | +import pytest |
| 14 | + |
| 15 | +proxmoxer = pytest.importorskip("proxmoxer") |
| 16 | + |
| 17 | +from ansible.module_utils import basic |
| 18 | +from ansible_collections.community.proxmox.plugins.modules import proxmox_firewall |
| 19 | +from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import ( |
| 20 | + ModuleTestCase, |
| 21 | + set_module_args, |
| 22 | +) |
| 23 | +import ansible_collections.community.proxmox.plugins.module_utils.proxmox as proxmox_utils |
| 24 | + |
| 25 | +RAW_FIREWALL_RULES = [ |
| 26 | + { |
| 27 | + "ipversion": 4, |
| 28 | + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", |
| 29 | + "log": "nolog", |
| 30 | + "action": "ACCEPT", |
| 31 | + "enable": 1, |
| 32 | + "type": "out", |
| 33 | + "source": "1.1.1.1", |
| 34 | + "pos": 0 |
| 35 | + }, |
| 36 | + { |
| 37 | + "enable": 1, |
| 38 | + "pos": 1, |
| 39 | + "source": "1.0.0.1", |
| 40 | + "type": "out", |
| 41 | + "action": "ACCEPT", |
| 42 | + "digest": "245f9fb31d5f59543dedc5a84ba7cd6afa4dbcc0", |
| 43 | + "ipversion": 4 |
| 44 | + } |
| 45 | +] |
| 46 | + |
| 47 | +RAW_GROUPS = [ |
| 48 | + { |
| 49 | + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62", |
| 50 | + "group": "test1" |
| 51 | + }, |
| 52 | + { |
| 53 | + "group": "test2", |
| 54 | + "digest": "fdb62dec01018d4f35c83ecc2ae3f110a8b3bd62" |
| 55 | + } |
| 56 | +] |
| 57 | + |
| 58 | + |
| 59 | +def exit_json(*args, **kwargs): |
| 60 | + """function to patch over exit_json; package return data into an exception""" |
| 61 | + if 'changed' not in kwargs: |
| 62 | + kwargs['changed'] = False |
| 63 | + raise SystemExit(kwargs) |
| 64 | + |
| 65 | + |
| 66 | +def fail_json(*args, **kwargs): |
| 67 | + """function to patch over fail_json; package return data into an exception""" |
| 68 | + kwargs['failed'] = True |
| 69 | + raise SystemExit(kwargs) |
| 70 | + |
| 71 | + |
| 72 | +def get_module_args_state_none(level="cluster", vmid=None, node=None, vnet=None, group=None): |
| 73 | + return { |
| 74 | + "api_host": "host", |
| 75 | + "api_user": "user", |
| 76 | + "api_password": "password", |
| 77 | + "level": level, |
| 78 | + "vmid": vmid, |
| 79 | + "node": node, |
| 80 | + "vnet": vnet, |
| 81 | + "group": group |
| 82 | + } |
| 83 | + |
| 84 | + |
| 85 | +def get_module_args_group_conf(group, level="cluster", state="present"): |
| 86 | + return { |
| 87 | + "api_host": "host", |
| 88 | + "api_user": "user", |
| 89 | + "api_password": "password", |
| 90 | + "level": level, |
| 91 | + "group": group, |
| 92 | + "group_conf": True, |
| 93 | + "state": state |
| 94 | + } |
| 95 | + |
| 96 | + |
| 97 | +def get_module_args_rules(state, pos=1, level='cluster', source_ip='1.1.1.1'): |
| 98 | + return { |
| 99 | + "api_host": "host", |
| 100 | + "api_user": "user", |
| 101 | + "api_password": "password", |
| 102 | + "level": level, |
| 103 | + "state": state, |
| 104 | + 'rules': [ |
| 105 | + { |
| 106 | + 'type': 'out', |
| 107 | + 'action': 'ACCEPT', |
| 108 | + 'source': source_ip, |
| 109 | + 'pos': pos, |
| 110 | + 'enable': True |
| 111 | + } |
| 112 | + ] |
| 113 | + } |
| 114 | + |
| 115 | + |
| 116 | +class TestProxmoxFirewallModule(ModuleTestCase): |
| 117 | + def setUp(self): |
| 118 | + super(TestProxmoxFirewallModule, self).setUp() |
| 119 | + proxmox_utils.HAS_PROXMOXER = True |
| 120 | + self.module = proxmox_firewall |
| 121 | + self.mock_module_helper = patch.multiple(basic.AnsibleModule, |
| 122 | + exit_json=exit_json, |
| 123 | + fail_json=fail_json) |
| 124 | + self.mock_module_helper.start() |
| 125 | + self.connect_mock = patch( |
| 126 | + "ansible_collections.community.proxmox.plugins.module_utils.proxmox.ProxmoxAnsible._connect", |
| 127 | + ).start() |
| 128 | + self.connect_mock.return_value.cluster.return_value.firewall.return_value.rules.get.return_value = RAW_FIREWALL_RULES |
| 129 | + self.connect_mock.return_value.cluster.return_value.firewall.return_value.groups.return_value.get.return_value = RAW_GROUPS |
| 130 | + |
| 131 | + def tearDown(self): |
| 132 | + self.connect_mock.stop() |
| 133 | + self.mock_module_helper.stop() |
| 134 | + super(TestProxmoxFirewallModule, self).tearDown() |
| 135 | + |
| 136 | + def test_get_fw_state_none(self): |
| 137 | + with pytest.raises(SystemExit) as exc_info: |
| 138 | + with set_module_args(get_module_args_state_none()): |
| 139 | + self.module.main() |
| 140 | + result = exc_info.value.args[0] |
| 141 | + assert result["changed"] is False |
| 142 | + assert result["msg"] == "successfully retrieved firewall rules and groups" |
| 143 | + assert result["firewall_rules"] == RAW_FIREWALL_RULES |
| 144 | + assert result["groups"] == ['test1', 'test2'] |
| 145 | + |
| 146 | + def test_create_group(self): |
| 147 | + with pytest.raises(SystemExit) as exc_info: |
| 148 | + with set_module_args(get_module_args_group_conf(group='test')): |
| 149 | + self.module.main() |
| 150 | + result = exc_info.value.args[0] |
| 151 | + assert result['changed'] is True |
| 152 | + assert result["msg"] == 'successfully created security group test' |
| 153 | + assert result['group'] == 'test' |
| 154 | + |
| 155 | + def test_delete_group(self): |
| 156 | + with pytest.raises(SystemExit) as exc_info: |
| 157 | + with set_module_args(get_module_args_group_conf(group='test1', state="absent")): |
| 158 | + self.module.main() |
| 159 | + result = exc_info.value.args[0] |
| 160 | + assert result['changed'] is True |
| 161 | + assert result["msg"] == 'successfully deleted security group test1' |
| 162 | + assert result['group'] == 'test1' |
| 163 | + |
| 164 | + def test_update_fw_rules(self): |
| 165 | + with pytest.raises(SystemExit) as exc_info: |
| 166 | + with set_module_args(get_module_args_rules(state='update')): |
| 167 | + self.module.main() |
| 168 | + result = exc_info.value.args[0] |
| 169 | + assert result['changed'] is True |
| 170 | + assert result["msg"] == 'successfully updated firewall rules' |
| 171 | + |
| 172 | + def test_create_fw_rules(self): |
| 173 | + with pytest.raises(SystemExit) as exc_info: |
| 174 | + with set_module_args(get_module_args_rules(state='present', pos=2)): |
| 175 | + self.module.main() |
| 176 | + result = exc_info.value.args[0] |
| 177 | + assert result['changed'] is True |
| 178 | + assert result["msg"] == 'successfully created firewall rules' |
0 commit comments