diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a7e64279..3c48904b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,6 +78,7 @@ jobs: pip install -U pip wheel setuptools pip install -U -r requirements-test.txt pip install -U -e . + pip install -UI "openwisp-users @ https://github.com/openwisp/openwisp-users/tarball/issues/238-view-shared-objects" "cryptography~=43.0.3" pip install ${{ matrix.django-version }} - name: Start postgres and redis diff --git a/openwisp_controller/config/migrations/0061_update_groups_permissions.py b/openwisp_controller/config/migrations/0061_update_groups_permissions.py new file mode 100644 index 000000000..205812c0b --- /dev/null +++ b/openwisp_controller/config/migrations/0061_update_groups_permissions.py @@ -0,0 +1,15 @@ +from django.db import migrations + +from . import assign_permissions_to_groups + + +class Migration(migrations.Migration): + dependencies = [ + ("config", "0060_cleanup_api_task_notification_types"), + ] + + operations = [ + migrations.RunPython( + assign_permissions_to_groups, reverse_code=migrations.RunPython.noop + ) + ] diff --git a/openwisp_controller/config/migrations/__init__.py b/openwisp_controller/config/migrations/__init__.py index 487524cb4..8c7fa50cb 100644 --- a/openwisp_controller/config/migrations/__init__.py +++ b/openwisp_controller/config/migrations/__init__.py @@ -27,7 +27,7 @@ def assign_permissions_to_groups(apps, schema_editor): create_default_permissions(apps, schema_editor) operators_and_admins_can_change = ["device", "config", "template"] operators_read_only_admins_manage = ["vpn"] - manage_operations = ["add", "change", "delete"] + manage_operations = ["add", "change", "view", "delete"] Group = get_swapped_model(apps, "openwisp_users", "Group") try: diff --git a/openwisp_controller/config/tests/test_admin.py b/openwisp_controller/config/tests/test_admin.py index 49152293c..6f63eaa67 100644 --- a/openwisp_controller/config/tests/test_admin.py +++ b/openwisp_controller/config/tests/test_admin.py @@ -506,16 +506,63 @@ def test_template_vpn_fk_autocomplete_view(self): hidden=[data["vpn2"].name, data["vpn_inactive"].name], ) + def test_org_admin_create_template_with_shared_vpn(self): + vpn = self._create_vpn(organization=None) + org = self._create_org() + administrator = self._create_administrator(organizations=[org]) + path = reverse(f"admin:{self.app_label}_template_add") + payload = { + "organization": "", + "name": "Test", + "type": "vpn", + "vpn": str(vpn.id), + "backend": "netjsonconfig.OpenWrt", + "config": "", + "default_values": "", + "tags": "", + } + self.assertEqual(Template.objects.count(), 2) + + with self.subTest("Should not allow creating shared template"): + self._test_org_admin_create_shareable_object( + path=path, + payload=payload, + model=Template, + expected_count=2, + user=administrator, + ) + + with self.subTest("Should allow creating non-shared template"): + payload["organization"] = str(org.pk) + self._test_org_admin_create_shareable_object( + path=path, + payload=payload, + model=Template, + expected_count=3, + user=administrator, + raises_error=False, + ) + + def test_org_admin_view_shared_template(self): + vpn = self._create_vpn(organization=None) + template = self._create_template(type="vpn", vpn=vpn) + self._test_org_admin_view_shareable_object( + path=reverse(f"admin:{self.app_label}_template_change", args=[template.pk]), + ) + def test_vpn_queryset(self): data = self._create_multitenancy_test_env(vpn=True) self._test_multitenant_admin( url=reverse(f"admin:{self.app_label}_vpn_changelist"), - visible=[data["org1"].name, data["vpn1"].name], + visible=[ + data["org1"].name, + data["vpn1"].name, + data["vpn_shared"].name, + ], hidden=[ data["org2"].name, data["inactive"], data["vpn2"].name, - data["vpn_shared"].name, data["vpn_inactive"].name, ], ) @@ -561,6 +608,39 @@ def test_recoverlist_operator_403(self): self._test_recoverlist_operator_403(self.app_label, "template") self._test_recoverlist_operator_403(self.app_label, "vpn") + def test_org_admin_create_shared_vpn(self): + org = self._get_org() + ca = self._create_ca(organization=org) + self._test_org_admin_create_shareable_object( + path=reverse(f"admin:{self.app_label}_vpn_add"), + model=Vpn, + payload={ + "organization": "", + "name": "Test", + "host": "vpn1.test.com", + "key": "vZFUV5FqYt4WW9nerc23BofQH51gHNNy", + "backend": "openwisp_controller.vpn_backends.OpenVPN", + "ca": ca.pk, + "config": { + "openvpn": [ + { + "server_bridge": "10.8.0.0 255.255.255.0", + "name": "tun0", + "mode": "server", + "proto": "udp", + "dev": "tun0", + } + ] + }, + }, + ) + + def test_org_admin_view_shared_vpn(self): + vpn = self._create_vpn(organization=None) + self._test_org_admin_view_shareable_object( + path=reverse(f"admin:{self.app_label}_vpn_change", args=[vpn.pk]), + ) + def test_device_template_filter(self): org = self._get_org(org_name="test-org") t = self._create_template(name="test-template", organization=org) diff --git a/openwisp_controller/config/tests/test_api.py b/openwisp_controller/config/tests/test_api.py index 9ce0d1947..f3ed708e1 100644 --- a/openwisp_controller/config/tests/test_api.py +++ b/openwisp_controller/config/tests/test_api.py @@ -10,7 +10,7 @@ from openwisp_controller.config.api.serializers import BaseConfigSerializer from openwisp_controller.tests.utils import TestAdminMixin -from openwisp_users.tests.test_api import AuthenticationMixin +from openwisp_users.tests.test_api import AuthenticationMixin, TestMultitenantApiMixin from openwisp_utils.tests import capture_any_output, catch_signal from .. import settings as app_settings @@ -33,7 +33,7 @@ OrganizationUser = load_model("openwisp_users", "OrganizationUser") -class ApiTestMixin: +class ApiTestMixin(AuthenticationMixin, TestMultitenantApiMixin): @property def _template_data(self): return { @@ -103,7 +103,6 @@ class TestConfigApi( CreateConfigTemplateMixin, TestVpnX509Mixin, CreateDeviceGroupMixin, - AuthenticationMixin, TestCase, ): def setUp(self): @@ -671,29 +670,50 @@ def test_template_create_of_vpn_type(self): self.assertEqual(Template.objects.count(), 1) self.assertEqual(r.status_code, 201) - def test_template_create_with_shared_vpn(self): - org1 = self._get_org() - test_user = self._create_operator(organizations=[org1]) - self.client.force_login(test_user) - vpn1 = self._create_vpn(name="vpn1", organization=None) - path = reverse("config_api:template_list") - data = self._template_data - data["type"] = "vpn" - data["vpn"] = vpn1.id - data["organization"] = org1.pk - r = self.client.post(path, data, content_type="application/json") - self.assertEqual(r.status_code, 201) - self.assertEqual(Template.objects.count(), 1) - self.assertEqual(r.data["vpn"], vpn1.id) - - def test_template_creation_with_no_org_by_operator(self): - path = reverse("config_api:template_list") - data = self._template_data + def test_operator_access_shared_template(self): test_user = self._create_operator(organizations=[self._get_org()]) - self.client.force_login(test_user) - r = self.client.post(path, data, content_type="application/json") - self.assertEqual(r.status_code, 400) - self.assertIn("This field may not be null.", str(r.content)) + token = self._obtain_auth_token(test_user) + self._create_template(organization=None) + self._test_org_user_access_shared_object( + listview_name="config_api:template_list", + detailview_name="config_api:template_detail", + create_payload={"name": "test", "organization": ""}, + update_payload={"name": "updated-test"}, + expected_count=1, + token=token, + ) + + def test_org_admin_create_template_with_shared_vpn(self): + org = self._get_org() + vpn = self._create_vpn(organization=None) + create_payload = self._template_data + create_payload.update( + { + "organization": org.pk, + "type": "vpn", + "vpn": vpn.pk, + } + ) + update_payload = create_payload.copy() + update_payload["name"] = "updated-test" + test_user = self._create_operator(organizations=[org]) + self._test_org_user_access_shared_object( + listview_name="config_api:template_list", + detailview_name="config_api:template_detail", + create_payload=create_payload, + update_payload=update_payload, + expected_count=1, + token=self._obtain_auth_token(test_user), + expected_status_codes={ + "create": 201, + "list": 200, + "retrieve": 200, + "update": 200, + "delete": 204, + "head": 200, + "option": 200, + }, + ) def test_template_create_with_empty_config(self): path = reverse("config_api:template_list") @@ -855,19 +875,50 @@ def test_vpn_create_api(self): self.assertEqual(r.status_code, 201) self.assertEqual(Vpn.objects.count(), 1) - def test_vpn_create_with_shared_objects(self): - org1 = self._get_org() + def test_org_admin_access_vpn_with_shared_objects(self): + org = self._get_org() + shared_ca = self._create_ca(name="shared_ca", organization=None) + create_payload = self._vpn_data + create_payload.update( + { + "organization": org.pk, + "ca": shared_ca.pk, + } + ) + update_payload = create_payload.copy() + update_payload["name"] = "updated-test-vpn" + administrator = self._create_administrator(organizations=[org]) + self._test_access_shared_object( + listview_name="config_api:vpn_list", + detailview_name="config_api:vpn_detail", + create_payload=create_payload, + update_payload=update_payload, + expected_count=1, + expected_status_codes={ + "create": 201, + "list": 200, + "retrieve": 200, + "update": 200, + "delete": 204, + "head": 200, + "option": 200, + }, + token=self._obtain_auth_token(administrator), + ) + + def test_org_admin_create_shared_vpn(self): shared_ca = self._create_ca(name="shared_ca", organization=None) - test_user = self._create_administrator(organizations=[org1]) - self.client.force_login(test_user) data = self._vpn_data - data["organization"] = org1.pk data["ca"] = shared_ca.pk - path = reverse("config_api:vpn_list") - r = self.client.post(path, data, content_type="application/json") - self.assertEqual(Vpn.objects.count(), 1) - self.assertEqual(r.status_code, 201) - self.assertEqual(r.data["ca"], shared_ca.pk) + # API does not allow creating shared VPN by org admin, + # therefore we create an object to test the detail view. + self._create_vpn(organization=None, ca=shared_ca) + self._test_org_user_access_shared_object( + listview_name="config_api:vpn_list", + detailview_name="config_api:vpn_detail", + create_payload=data, + expected_count=1, + ) def test_vpn_list_api(self): org = self._get_org() diff --git a/openwisp_controller/connection/base/models.py b/openwisp_controller/connection/base/models.py index 55f75c5f7..7f120a988 100644 --- a/openwisp_controller/connection/base/models.py +++ b/openwisp_controller/connection/base/models.py @@ -90,6 +90,7 @@ class AbstractCredentials(ConnectorMixin, ShareableOrgMixinUniqueName, BaseModel # Controls the number of objects which can be stored in memory # before commiting them to database during bulk auto add operation. chunk_size = 1000 + sensitive_fields = ["params"] connector = models.CharField( _("connection type"), diff --git a/openwisp_controller/connection/tests/test_admin.py b/openwisp_controller/connection/tests/test_admin.py index d8557a1ea..d47c44777 100644 --- a/openwisp_controller/connection/tests/test_admin.py +++ b/openwisp_controller/connection/tests/test_admin.py @@ -132,6 +132,49 @@ def test_admin_menu_groups(self): url = reverse(f"admin:{self.app_label}_{model}_changelist") self.assertContains(response, f' class="mg-link" href="{url}"') + def test_org_admin_create_shared_credentials(self): + self._test_org_admin_create_shareable_object( + path=reverse(f"admin:{self.app_label}_credentials_add"), + model=Credentials, + payload={ + "name": "Shared Credentials", + "organization": "", + "connector": "openwisp_controller.connection.connectors.ssh.Ssh", + "params": { + "username": "root", + "password": "password", + "port": 22, + }, + }, + ) + + def test_org_admin_view_shared_credentials(self): + credentials = self._create_credentials(organization=None) + self._test_org_admin_view_shareable_object( + path=reverse( + f"admin:{self.app_label}_credentials_change", args=[credentials.pk] + ), + ) + + def test_credential_admin_sensitive_fields(self): + """ + Sensitive fields for shared credentials should be hidden for non-superusers. + """ + org = self._get_org() + shared_credentials = self._create_credentials(organization=None) + org_credentials = self._create_credentials(organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["params"], + shared_obj_path=reverse( + f"admin:{self.app_label}_credentials_change", + args=(shared_credentials.id,), + ), + org_obj_path=reverse( + f"admin:{self.app_label}_credentials_change", args=(org_credentials.id,) + ), + organization=org, + ) + class TestCommandInlines(TestAdminMixin, CreateConnectionsMixin, TestCase): config_app_label = "config" diff --git a/openwisp_controller/connection/tests/test_api.py b/openwisp_controller/connection/tests/test_api.py index 22f2e5713..0288fce8b 100644 --- a/openwisp_controller/connection/tests/test_api.py +++ b/openwisp_controller/connection/tests/test_api.py @@ -12,7 +12,7 @@ from swapper import load_model from openwisp_controller.tests.utils import TestAdminMixin -from openwisp_users.tests.test_api import AuthenticationMixin +from openwisp_users.tests.test_api import APITestCase, AuthenticationMixin from .. import settings as app_settings from ..api.views import ListViewPagination @@ -387,9 +387,7 @@ def test_create_command_without_connection(self): ) -class TestConnectionApi( - TestAdminMixin, AuthenticationMixin, TestCase, CreateConnectionsMixin -): +class TestConnectionApi(TestAdminMixin, CreateConnectionsMixin, APITestCase): def setUp(self): super().setUp() self._login() @@ -494,6 +492,38 @@ def test_delete_credential_detail(self): response = self.client.delete(path) self.assertEqual(response.status_code, 204) + def test_operator_access_shared_credentials(self): + self._create_credentials(organization=None) + self._test_org_user_access_shared_object( + listview_name="connection_api:credential_list", + detailview_name="connection_api:credential_detail", + create_payload={ + "connector": "openwisp_controller.connection.connectors.ssh.Ssh", + "name": "Test credentials", + "organization": "", + "auto_add": False, + "params": {"username": "root", "password": "Pa$$w0Rd", "port": 22}, + }, + update_payload={"name": "updated-test"}, + expected_count=1, + ) + + def test_credentials_sensitive_fields_visibility(self): + """ + Test that sensitive fields are hidden for shared objects for non-superusers. + """ + org = self._get_org() + shared_credentials = self._create_credentials(organization=None) + org_credentials = self._create_credentials(organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["params"], + shared_obj=shared_credentials, + org_obj=org_credentials, + listview_name="connection_api:credential_list", + detailview_name="connection_api:credential_detail", + organization=org, + ) + def test_get_deviceconnection_list(self): d1 = self._create_device() path = reverse("connection_api:deviceconnection_list", args=(d1.pk,)) @@ -509,7 +539,6 @@ def test_get_deviceconnection_list(self): for cred in creds: DeviceConnection.objects.create(device=d1, credentials=cred) response = self.client.get(path) - print(response.data) self.assertEqual(response.status_code, 200) created_list = [conn["created"] for conn in response.data["results"]] sorted_created = sorted(created_list, reverse=True) diff --git a/openwisp_controller/geo/migrations/0004_update_groups_permissions.py b/openwisp_controller/geo/migrations/0004_update_groups_permissions.py new file mode 100644 index 000000000..a697dd7d3 --- /dev/null +++ b/openwisp_controller/geo/migrations/0004_update_groups_permissions.py @@ -0,0 +1,14 @@ +from django.db import migrations + +from . import assign_permissions_to_groups + + +class Migration(migrations.Migration): + dependencies = [ + ("geo", "0003_alter_devicelocation_floorplan_location"), + ] + operations = [ + migrations.RunPython( + assign_permissions_to_groups, reverse_code=migrations.RunPython.noop + ) + ] diff --git a/openwisp_controller/geo/migrations/__init__.py b/openwisp_controller/geo/migrations/__init__.py index d2fb75c6a..62da9021d 100644 --- a/openwisp_controller/geo/migrations/__init__.py +++ b/openwisp_controller/geo/migrations/__init__.py @@ -6,7 +6,7 @@ def assign_permissions_to_groups(apps, schema_editor): create_default_permissions(apps, schema_editor) operators_and_admins_can_change = ["location", "floorplan", "devicelocation"] - manage_operations = ["add", "change", "delete"] + manage_operations = ["add", "change", "view", "delete"] Group = get_swapped_model(apps, "openwisp_users", "Group") try: diff --git a/openwisp_controller/pki/api/serializers.py b/openwisp_controller/pki/api/serializers.py index f7ea88914..ff1b793b6 100644 --- a/openwisp_controller/pki/api/serializers.py +++ b/openwisp_controller/pki/api/serializers.py @@ -73,7 +73,12 @@ class Meta: ] read_only_fields = ["created", "modified"] extra_kwargs = { - "organization": {"required": True}, + # In DRF 3.16+, nullable fields that are part of a unique constraint + # automatically get `default: None`, which can cause validation issues. + # Setting the default to `serializers.empty` ensures DRF does not treat + # these fields as both required and having a default value, avoiding + # conflicts. + "organization": {"required": True, "default": serializers.empty}, "common_name": {"default": "", "required": False}, "key_length": {"initial": "2048"}, "digest": {"initial": "sha256"}, diff --git a/openwisp_controller/pki/base/models.py b/openwisp_controller/pki/base/models.py index f93767329..f206f8ea0 100644 --- a/openwisp_controller/pki/base/models.py +++ b/openwisp_controller/pki/base/models.py @@ -10,6 +10,8 @@ class AbstractCa(ShareableOrgMixin, UnqiueCommonNameMixin, BaseCa): + sensitive_fields = ["private_key"] + class Meta(BaseCa.Meta): abstract = True constraints = [ @@ -21,6 +23,8 @@ class Meta(BaseCa.Meta): class AbstractCert(ShareableOrgMixin, UnqiueCommonNameMixin, BaseCert): + sensitive_fields = ["private_key"] + ca = models.ForeignKey( get_model_name("django_x509", "Ca"), verbose_name=_("CA"), diff --git a/openwisp_controller/pki/migrations/0012_update_group_permissions.py b/openwisp_controller/pki/migrations/0012_update_group_permissions.py new file mode 100644 index 000000000..33fd8afc3 --- /dev/null +++ b/openwisp_controller/pki/migrations/0012_update_group_permissions.py @@ -0,0 +1,14 @@ +from django.db import migrations + +from . import assign_permissions_to_groups + + +class Migration(migrations.Migration): + dependencies = [ + ("pki", "0011_disallowed_blank_key_length_or_digest"), + ] + operations = [ + migrations.RunPython( + assign_permissions_to_groups, reverse_code=migrations.RunPython.noop + ) + ] diff --git a/openwisp_controller/pki/migrations/__init__.py b/openwisp_controller/pki/migrations/__init__.py index 0b3736804..81b48fbae 100644 --- a/openwisp_controller/pki/migrations/__init__.py +++ b/openwisp_controller/pki/migrations/__init__.py @@ -6,7 +6,7 @@ def assign_permissions_to_groups(apps, schema_editor): create_default_permissions(apps, schema_editor) operators_read_only_admins_manage = ["ca", "cert"] - manage_operations = ["add", "change", "delete"] + manage_operations = ["add", "change", "view", "delete"] Group = get_swapped_model(apps, "openwisp_users", "Group") try: diff --git a/openwisp_controller/pki/tests/test_admin.py b/openwisp_controller/pki/tests/test_admin.py index a12deb1f1..dc6f6dd01 100644 --- a/openwisp_controller/pki/tests/test_admin.py +++ b/openwisp_controller/pki/tests/test_admin.py @@ -58,12 +58,15 @@ def test_ca_queryset(self): data = self._create_multitenancy_test_env() self._test_multitenant_admin( url=reverse(f"admin:{self.app_label}_ca_changelist"), - visible=[data["ca1"].name, data["org1"].name], + visible=[ + data["ca1"].name, + data["org1"].name, + data["ca_shared"].name, + ], hidden=[ data["ca2"].name, data["org2"].name, data["ca_inactive"].name, - data["ca_shared"].name, ], ) @@ -76,16 +79,57 @@ def test_ca_organization_fk_autocomplete_view(self): administrator=True, ) + def test_org_create_shared_ca(self): + self._test_org_admin_create_shareable_object( + path=reverse(f"admin:{self.app_label}_ca_add"), + model=Ca, + payload={ + "name": "ca-shared", + "organization": "", + "key_length": 2048, + "digest": "sha256", + "operation_type": "new", + "extensions": "", + }, + ) + + def test_org_admin_view_shared_ca(self): + ca = self._create_ca(organization=None) + self._test_org_admin_view_shareable_object( + path=reverse(f"admin:{self.app_label}_ca_change", args=[ca.pk]), + ) + + def test_ca_admin_sensitive_fields(self): + """ + Sensitive fields for shared CA should be hidden for non-superusers. + """ + org = self._get_org() + shared_ca = self._create_ca(organization=None) + org_ca = self._create_ca(organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["private_key"], + shared_obj_path=reverse( + f"admin:{self.app_label}_ca_change", args=(shared_ca.id,) + ), + org_obj_path=reverse( + f"admin:{self.app_label}_ca_change", args=(org_ca.id,) + ), + organization=org, + ) + def test_cert_queryset(self): data = self._create_multitenancy_test_env(cert=True) self._test_multitenant_admin( url=reverse(f"admin:{self.app_label}_cert_changelist"), - visible=[data["cert1"].name, data["org1"].name], + visible=[ + data["cert1"].name, + data["org1"].name, + data["cert_shared"].name, + ], hidden=[ data["cert2"].name, data["org2"].name, data["cert_inactive"].name, - data["cert_shared"].name, ], ) @@ -109,6 +153,64 @@ def test_cert_ca_fk_autocomplete_view(self): administrator=True, ) + def test_org_admin_create_cert_with_shared_ca(self): + org = self._get_org() + administrator = self._create_administrator(organizations=[org]) + shared_ca = self._create_ca(organization=None) + payload = { + "name": "Test", + "organization": "", + "ca": str(shared_ca.pk), + "operation_type": "new", + "key_length": 2048, + "digest": "sha256", + "extensions": "[]", + } + with self.subTest("Should not allow creating shared certificate"): + self._test_org_admin_create_shareable_object( + path=reverse(f"admin:{self.app_label}_cert_add"), + model=Cert, + payload=payload, + user=administrator, + ) + + with self.subTest("Should allow creating non-shared certificate"): + payload["organization"] = str(org.pk) + self._test_org_admin_create_shareable_object( + path=reverse(f"admin:{self.app_label}_cert_add"), + model=Cert, + payload=payload, + user=administrator, + raises_error=False, + expected_count=1, + ) + + def test_org_admin_view_shared_cert(self): + shared_ca = self._create_ca(organization=None) + shared_cert = self._create_cert(ca=shared_ca, organization=None) + self._test_org_admin_view_shareable_object( + path=reverse(f"admin:{self.app_label}_cert_change", args=[shared_cert.pk]), + ) + + def test_cert_admin_sensitive_fields(self): + """ + Sensitive fields for shared certificates should be hidden for non-superusers. + """ + org = self._get_org() + shared_ca = self._create_ca(organization=None) + shared_cert = self._create_cert(ca=shared_ca, organization=None) + org_cert = self._create_cert(ca=shared_ca, organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["private_key"], + shared_obj_path=reverse( + f"admin:{self.app_label}_cert_change", args=(shared_cert.id,) + ), + org_obj_path=reverse( + f"admin:{self.app_label}_cert_change", args=(org_cert.id,) + ), + organization=org, + ) + def test_cert_changeform_200(self): org = self._create_org(name="test-org") self._create_operator(organizations=[org]) diff --git a/openwisp_controller/pki/tests/test_api.py b/openwisp_controller/pki/tests/test_api.py index d2d36f1d4..03aaf1e67 100644 --- a/openwisp_controller/pki/tests/test_api.py +++ b/openwisp_controller/pki/tests/test_api.py @@ -1,12 +1,10 @@ -from django.test import TestCase from django.urls import reverse from packaging.version import parse as parse_version from rest_framework import VERSION as REST_FRAMEWORK_VERSION from swapper import load_model from openwisp_controller.tests.utils import TestAdminMixin -from openwisp_users.tests.test_api import AuthenticationMixin -from openwisp_users.tests.utils import TestOrganizationMixin +from openwisp_users.tests.test_api import APITestCase from openwisp_utils.tests import AssertNumQueriesSubTestMixin, capture_any_output from .utils import TestPkiMixin @@ -16,12 +14,7 @@ class TestPkiApi( - AssertNumQueriesSubTestMixin, - TestAdminMixin, - TestPkiMixin, - TestOrganizationMixin, - AuthenticationMixin, - TestCase, + AssertNumQueriesSubTestMixin, TestAdminMixin, TestPkiMixin, APITestCase ): def setUp(self): super().setUp() @@ -168,6 +161,38 @@ def test_ca_post_renew_api(self): self.assertNotEqual(ca1.serial_number, old_serial_num) self.assertNotEqual(r.data["serial_number"], old_serial_num) + def test_org_admin_access_shared_ca(self): + # API wouldn't allow creating the object, + # therefore, we create one here to test list view. + self._create_ca() + + create_payload = self._ca_data + update_payload = create_payload.copy() + update_payload["name"] = "updated-name" + self._test_org_user_access_shared_object( + listview_name="pki_api:ca_list", + detailview_name="pki_api:ca_detail", + create_payload=create_payload, + update_payload=update_payload, + expected_count=1, + ) + + def test_ca_sensitive_fields_visibility(self): + """ + Test that sensitive fields are hidden for shared objects for non-superusers. + """ + org = self._get_org() + shared_ca = self._create_ca(organization=None) + org_ca = self._create_ca(organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["private_key"], + shared_obj=shared_ca, + org_obj=org_ca, + listview_name="pki_api:ca_list", + detailview_name="pki_api:ca_detail", + organization=org, + ) + def test_cert_post_api(self): path = reverse("pki_api:cert_list") data = self._cert_data @@ -307,6 +332,73 @@ def test_post_cert_revoke_api(self): self.assertTrue(cert1.revoked) self.assertTrue(r.data["revoked"]) + def test_org_admin_access_shared_cert(self): + # API wouldn't allow creating the object, + # therefore, we create one here to test list view. + shared_ca = self._create_ca(organization=None) + self._create_cert(ca=shared_ca) + + create_payload = self._cert_data + create_payload["ca"] = shared_ca.pk + update_payload = create_payload.copy() + update_payload["name"] = "update-name" + self._test_org_user_access_shared_object( + listview_name="pki_api:cert_list", + detailview_name="pki_api:cert_detail", + create_payload=create_payload, + update_payload=update_payload, + expected_count=1, + ) + + def test_org_admin_access_cert_with_shared_ca(self): + org = self._get_org() + shared_ca = self._create_ca(organization=None) + create_payload = self._cert_data + create_payload.update( + { + "organization": org.pk, + "ca": shared_ca.pk, + } + ) + update_payload = { + "name": "updated-name", + "organization": org.pk, + "notes": "new-notes", + } + self._test_org_user_access_shared_object( + listview_name="pki_api:cert_list", + detailview_name="pki_api:cert_detail", + create_payload=create_payload, + update_payload=update_payload, + expected_count=1, + expected_status_codes={ + "create": 201, + "list": 200, + "retrieve": 200, + "update": 200, + "delete": 204, + "head": 200, + "option": 200, + }, + ) + + def test_cert_sensitive_fields_visibility(self): + """ + Test that sensitive fields are hidden for shared objects for non-superusers. + """ + org = self._get_org() + shared_ca = self._create_ca(organization=None) + shared_cert = self._create_cert(ca=shared_ca, organization=None) + org_cert = self._create_cert(ca=shared_ca, organization=org) + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["private_key"], + shared_obj=shared_cert, + org_obj=org_cert, + listview_name="pki_api:cert_list", + detailview_name="pki_api:cert_detail", + organization=org, + ) + @capture_any_output() def test_bearer_authentication(self): self.client.logout()