|
1 | 1 | import functools |
2 | 2 | import os |
3 | 3 | import subprocess |
4 | | -import unittest |
5 | | - |
6 | | -from qubesadmin import Qubes |
7 | | - |
8 | | - |
9 | | -class SD_Qubes_Rpc_Tests(unittest.TestCase): |
10 | | - @classmethod |
11 | | - def setUpClass(cls): |
12 | | - cls.all_vms = set() |
13 | | - cls.vms_by_tag = {} |
14 | | - |
15 | | - app = Qubes() |
16 | | - cls.all_vms = {vm for vm in app.domains if vm.name != "dom0"} |
17 | | - cls.sdw_tagged_vms = {vm for vm in app.domains if "sd-workstation" in vm.tags} |
18 | | - |
19 | | - @functools.cache |
20 | | - def _qrexec_policy_graph(self, service): |
21 | | - cmd = ["qrexec-policy-graph", "--service", service] |
22 | | - p = subprocess.run(cmd, capture_output=True, text=True, check=False) |
23 | | - return p.stdout |
24 | | - |
25 | | - def _policy_exists(self, source, target, service): |
26 | | - service_policy_graph = self._qrexec_policy_graph(service) |
27 | | - policy_str = f'"{source}" -> "{target}" [label="{service}"' |
28 | | - return policy_str in service_policy_graph |
29 | | - |
30 | | - def test_policy_files_exist(self): |
31 | | - """verify the policies are installed""" |
32 | | - assert os.path.exists("/etc/qubes/policy.d/31-securedrop-workstation.policy") |
33 | | - assert os.path.exists("/etc/qubes/policy.d/32-securedrop-workstation.policy") |
34 | | - |
35 | | - # securedrop.Log from @tag:sd-workstation to sd-log should be allowed |
36 | | - def test_sdlog_from_sdw_to_sdlog_allowed(self): |
37 | | - for vm in self.sdw_tagged_vms: |
38 | | - if vm != "sd-log": |
39 | | - assert self._policy_exists(vm, "sd-log", "securedrop.Log") |
40 | | - |
41 | | - # securedrop.Log from anything else to sd-log should be denied |
42 | | - def test_sdlog_from_other_to_sdlog_denied(self): |
43 | | - non_sd_workstation_vms = self.all_vms.difference(self.sdw_tagged_vms) |
44 | | - for vm in non_sd_workstation_vms: |
45 | | - if vm != "sd-log": |
46 | | - assert not self._policy_exists(vm, "sd-log", "securedrop.Log") |
47 | | - |
48 | | - # securedrop.Proxy from sd-app to sd-proxy should be allowed |
49 | | - def test_sdproxy_from_sdapp_to_sdproxy_allowed(self): |
50 | | - assert self._policy_exists("sd-app", "sd-proxy", "securedrop.Proxy") |
51 | | - |
52 | | - # securedrop.Proxy from anything else to sd-proxy should be denied |
53 | | - def test_sdproxy_from_other_to_sdproxy_denied(self): |
54 | | - assert not self._policy_exists("sys-net", "sd-proxy", "securedrop.Proxy") |
55 | | - assert not self._policy_exists("sys-firewall", "sd-proxy", "securedrop.Proxy") |
56 | | - |
57 | | - # qubes.Gpg, qubes.GpgImportKey, and qubes.Gpg2 from anything else to sd-gpg should be denied |
58 | | - def test_qubesgpg_from_other_to_sdgpg_denied(self): |
59 | | - assert not self._policy_exists("sys-net", "sd-gpg", "qubes.Gpg") |
60 | | - assert not self._policy_exists("sys-firewall", "sd-gpg", "qubes.Gpg") |
61 | | - assert not self._policy_exists("sys-net", "sd-gpg", "qubes.GpgImportKey") |
62 | | - assert not self._policy_exists("sys-firewall", "sd-gpg", "qubes.GpgImportKey") |
63 | | - assert not self._policy_exists("sys-net", "sd-gpg", "qubes.Gpg2") |
64 | | - assert not self._policy_exists("sys-firewall", "sd-gpg", "qubes.Gpg2") |
65 | | - |
66 | | - |
67 | | -def load_tests(loader, tests, pattern): |
68 | | - return unittest.TestLoader().loadTestsFromTestCase(SD_Qubes_Rpc_Tests) |
| 4 | + |
| 5 | + |
| 6 | +@functools.cache |
| 7 | +def qrexec_policy_graph(service): |
| 8 | + cmd = ["qrexec-policy-graph", "--service", service] |
| 9 | + p = subprocess.run(cmd, capture_output=True, text=True, check=False) |
| 10 | + return p.stdout |
| 11 | + |
| 12 | + |
| 13 | +def policy_exists(source, target, service): |
| 14 | + service_policy_graph = qrexec_policy_graph(service) |
| 15 | + policy_str = f'"{source}" -> "{target}" [label="{service}"' |
| 16 | + return policy_str in service_policy_graph |
| 17 | + |
| 18 | + |
| 19 | +def test_policy_files_exist(): |
| 20 | + """verify the policies are installed""" |
| 21 | + assert os.path.exists("/etc/qubes/policy.d/31-securedrop-workstation.policy") |
| 22 | + assert os.path.exists("/etc/qubes/policy.d/32-securedrop-workstation.policy") |
| 23 | + |
| 24 | + |
| 25 | +# securedrop.Log from @tag:sd-workstation to sd-log should be allowed |
| 26 | +def test_sdlog_from_sdw_to_sdlog_allowed(sdw_tagged_vms): |
| 27 | + for vm in sdw_tagged_vms: |
| 28 | + if vm.name != "sd-log": |
| 29 | + assert policy_exists(vm, "sd-log", "securedrop.Log") |
| 30 | + |
| 31 | + |
| 32 | +# securedrop.Log from anything else to sd-log should be denied |
| 33 | +def test_sdlog_from_other_to_sdlog_denied(all_vms, sdw_tagged_vms): |
| 34 | + non_sd_workstation_vms = set(all_vms).difference(set(sdw_tagged_vms)) |
| 35 | + for vm in non_sd_workstation_vms: |
| 36 | + if vm.name != "sd-log": |
| 37 | + assert not policy_exists(vm, "sd-log", "securedrop.Log") |
| 38 | + |
| 39 | + |
| 40 | +# securedrop.Proxy from sd-app to sd-proxy should be allowed |
| 41 | +def test_sdproxy_from_sdapp_to_sdproxy_allowed(): |
| 42 | + assert policy_exists("sd-app", "sd-proxy", "securedrop.Proxy") |
| 43 | + |
| 44 | + |
| 45 | +# securedrop.Proxy from anything else to sd-proxy should be denied |
| 46 | +def test_sdproxy_from_other_to_sdproxy_denied(): |
| 47 | + assert not policy_exists("sys-net", "sd-proxy", "securedrop.Proxy") |
| 48 | + assert not policy_exists("sys-firewall", "sd-proxy", "securedrop.Proxy") |
| 49 | + |
| 50 | + |
| 51 | +# qubes.Gpg, qubes.GpgImportKey, and qubes.Gpg2 from anything else to sd-gpg should be denied |
| 52 | +def test_qubesgpg_from_other_to_sdgpg_denied(): |
| 53 | + assert not policy_exists("sys-net", "sd-gpg", "qubes.Gpg") |
| 54 | + assert not policy_exists("sys-firewall", "sd-gpg", "qubes.Gpg") |
| 55 | + assert not policy_exists("sys-net", "sd-gpg", "qubes.GpgImportKey") |
| 56 | + assert not policy_exists("sys-firewall", "sd-gpg", "qubes.GpgImportKey") |
| 57 | + assert not policy_exists("sys-net", "sd-gpg", "qubes.Gpg2") |
| 58 | + assert not policy_exists("sys-firewall", "sd-gpg", "qubes.Gpg2") |
0 commit comments