diff --git a/docs/user/basic-concepts.rst b/docs/user/basic-concepts.rst index 69ac29f7..2a123427 100644 --- a/docs/user/basic-concepts.rst +++ b/docs/user/basic-concepts.rst @@ -157,6 +157,8 @@ different roles in each. Here's a summary of the default organization roles. +.. _users_organization_manager: + Organization Manager ~~~~~~~~~~~~~~~~~~~~ @@ -241,6 +243,18 @@ objects are defined and managed by super administrators and can include configurations, policies, or other data that need to be consistent across all organizations. +Shared objects can only be created by superusers. Non-superusers (e.g. +:ref:`users_organization_manager`) have view-only access to shared +objects, both through the admin interface and the REST API. However, they +can use these shared objects when creating related organization-specific +resources. For example, an organization manager can use a shared VPN +server to create a configuration template for their organization. + +In some cases, non-superusers may be restricted from viewing sensitive +details of shared objects—particularly if such information could allow +them to gain unauthorized access to infrastructure or data used by other +organizations. + By sharing common resources, global uniformity and consistency can be enforced across the entire system. diff --git a/openwisp_users/api/mixins.py b/openwisp_users/api/mixins.py index 0eb32756..dc194a95 100644 --- a/openwisp_users/api/mixins.py +++ b/openwisp_users/api/mixins.py @@ -31,7 +31,8 @@ def queryset_organization_conditions(self): # If user has access to any organization, then include shared # objects in the queryset. if len(organizations): - conditions |= Q(**{f"{self.org_field}__isnull": True}) + lookup_field = self.organization_lookup.replace("__in", "__isnull") + conditions |= Q(**{lookup_field: True}) return conditions @@ -114,9 +115,16 @@ def assert_parent_exists(self): except (AssertionError, ValidationError): raise NotFound() + @property + def queryset_organization_conditions(self): + return Q( + **{self.organization_lookup: getattr(self.request.user, self._user_attr)} + ) + def get_organization_queryset(self, qs): - lookup = {self.organization_lookup: getattr(self.request.user, self._user_attr)} - return qs.filter(**lookup) + if self.request.user.is_anonymous: + return + return qs.filter(self.queryset_organization_conditions) def get_parent_queryset(self): raise NotImplementedError() @@ -130,7 +138,7 @@ class FilterByParentMembership(FilterByParent): _user_attr = "organizations_dict" -class FilterByParentManaged(FilterByParent): +class FilterByParentManaged(SharedObjectsLookup, FilterByParent): """ Filter queryset based on parent organizations managed by user """ @@ -138,7 +146,7 @@ class FilterByParentManaged(FilterByParent): _user_attr = "organizations_managed" -class FilterByParentOwned(FilterByParent): +class FilterByParentOwned(SharedObjectsLookup, FilterByParent): """ Filter queryset based on parent organizations owned by user """ @@ -146,6 +154,52 @@ class FilterByParentOwned(FilterByParent): _user_attr = "organizations_owned" +class HideSensitiveFieldsMixin: + """ + Mixin to hide sensitive fields in the serializer representation + based on the organization of the user. + """ + + def get_sensitive_fields(self): + """ + Returns a list of sensitive fields that should be hidden. + """ + ModelClass = self.Meta.model + return getattr(ModelClass, "sensitive_fields", []) + + def _is_object_shared(self, instance): + """ + Returns the organization of the instance if it exists. + """ + view = self.context.get("view") + organization_field = getattr(view, "organization_field", "organization_id") + related_field = instance + for field in organization_field.split("__"): + if hasattr(related_field, field): + related_field = getattr(related_field, field) + else: + return False + return related_field is None + + def hide_sensitive_fields(self, obj): + request = self.context.get("request") + if ( + request + and not request.user.is_superuser + and "organization" in obj + and obj["organization"] is None + ): + for field in self.get_sensitive_fields(): + if field in obj: + del obj[field] + return obj + + def to_representation(self, data): + rep = super().to_representation(data) + self.hide_sensitive_fields(rep) + return rep + + class FilterSerializerByOrganization(OrgLookup): """ Filter the options in browsable API for serializers @@ -190,7 +244,9 @@ def __init__(self, *args, **kwargs): self.filter_fields() -class FilterSerializerByOrgMembership(FilterSerializerByOrganization): +class FilterSerializerByOrgMembership( + HideSensitiveFieldsMixin, FilterSerializerByOrganization +): """ Filter serializer by organizations the user is member of """ @@ -198,7 +254,9 @@ class FilterSerializerByOrgMembership(FilterSerializerByOrganization): _user_attr = "organizations_dict" -class FilterSerializerByOrgManaged(FilterSerializerByOrganization): +class FilterSerializerByOrgManaged( + HideSensitiveFieldsMixin, FilterSerializerByOrganization +): """ Filter serializer by organizations managed by user """ @@ -206,7 +264,9 @@ class FilterSerializerByOrgManaged(FilterSerializerByOrganization): _user_attr = "organizations_managed" -class FilterSerializerByOrgOwned(FilterSerializerByOrganization): +class FilterSerializerByOrgOwned( + HideSensitiveFieldsMixin, FilterSerializerByOrganization +): """ Filter serializer by organizations owned by user """ diff --git a/openwisp_users/multitenancy.py b/openwisp_users/multitenancy.py index b6557fb2..e3926660 100644 --- a/openwisp_users/multitenancy.py +++ b/openwisp_users/multitenancy.py @@ -21,6 +21,9 @@ class MultitenantAdminMixin(object): multitenant_shared_relations = None multitenant_parent = None + def get_sensitive_fields(self, request, obj=None): + return getattr(self.model, "sensitive_fields", []) + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) parent = self.multitenant_parent @@ -37,6 +40,29 @@ def get_repr(self, obj): get_repr.short_description = _("name") + def get_fields(self, request, obj=None): + """ + Return the list of fields to be displayed in the admin. + + If the user is not a superuser, it will remove sensitive fields. + """ + fields = super().get_fields(request, obj) + if obj and not request.user.is_superuser: + if self.multitenant_parent: + obj = getattr(obj, self.multitenant_parent) + if getattr(obj, "organization_id", None) is None: + sensitive_fields = self.get_sensitive_fields(request, obj) + return [f for f in fields if f not in sensitive_fields] + return fields + + @property + def org_field(self): + if hasattr(self.model, "organization"): + return "organization" + if self.multitenant_parent: + return f"{self.multitenant_parent}__organization" + return None + def get_queryset(self, request): """ If current user is not superuser, show only the @@ -48,15 +74,62 @@ def get_queryset(self, request): return self.multitenant_behaviour_for_user_admin(request) if user.is_superuser: return qs - if hasattr(self.model, "organization"): - return qs.filter(organization__in=user.organizations_managed) if self.model.__name__ == "Organization": return qs.filter(pk__in=user.organizations_managed) - elif not self.multitenant_parent: + if not self.org_field: + # if there is no organization field, return the queryset as is return qs - else: - qsarg = "{0}__organization__in".format(self.multitenant_parent) - return qs.filter(**{qsarg: user.organizations_managed}) + return qs.filter( + Q(**{f"{self.org_field}__in": user.organizations_managed}) + | Q(**{self.org_field: None}) + ) + + def get_search_results(self, request, queryset, search_term): + """ + Override to ensure that the search results are filtered by the + organization of the current user. + """ + if ( + request.GET.get("field_name") + and not request.user.is_superuser + and not self.multitenant_shared_relations + and self.org_field + ): + queryset = queryset.filter( + **{f"{self.org_field}__in": request.user.organizations_managed} + ) + return super().get_search_results(request, queryset, search_term) + + def _has_org_permission(self, request, obj, perm_func): + """ + Helper method to check object-level permissions for users + associated with specific organizations. + """ + perm = perm_func(request, obj) + if obj and self.multitenant_parent: + # In case of a multitenant parent, we need to check if the + # user has permission on the parent object. + obj = getattr(obj, self.multitenant_parent) + if not request.user.is_superuser and obj and hasattr(obj, "organization_id"): + perm = perm and ( + obj.organization_id + and str(obj.organization_id) in request.user.organizations_managed + ) + return perm + + def has_change_permission(self, request, obj=None): + """ + Returns True if the user has permission to change the object. + Non-superusers cannot change shared objects. + """ + return self._has_org_permission(request, obj, super().has_change_permission) + + def has_delete_permission(self, request, obj=None): + """ + Returns True if the user has permission to delete the object. + Non-superusers cannot change shared objects. + """ + return self._has_org_permission(request, obj, super().has_delete_permission) def _edit_form(self, request, form): """ @@ -149,7 +222,9 @@ class MultitenantOrgFilter(AutocompleteFilter): org_lookup = "id__in" title = _("organization") widget_attrs = AutocompleteFilter.widget_attrs.copy() - widget_attrs.update({"data-empty-label": SHARED_SYSTEMWIDE_LABEL}) + widget_attrs.update( + {"data-empty-label": SHARED_SYSTEMWIDE_LABEL, "data-is-filter": "true"} + ) class MultitenantRelatedOrgFilter(MultitenantOrgFilter): diff --git a/openwisp_users/static/admin/js/autocomplete.js b/openwisp_users/static/admin/js/autocomplete.js new file mode 100644 index 00000000..23ce9c23 --- /dev/null +++ b/openwisp_users/static/admin/js/autocomplete.js @@ -0,0 +1,39 @@ +// Custom override of Django's autocomplete.js to add the "is_filter" parameter +// to AJAX requests. This parameter enables the backend autocomplete view to +// determine if the request is for filtering, allowing it to include the shared +// option when appropriate (e.g., for filtering shared objects in the admin). + +"use strict"; +{ + const $ = django.jQuery; + + $.fn.djangoAdminSelect2 = function () { + $.each(this, function (i, element) { + $(element).select2({ + ajax: { + data: (params) => { + return { + term: params.term, + page: params.page, + app_label: element.dataset.appLabel, + model_name: element.dataset.modelName, + field_name: element.dataset.fieldName, + is_filter: element.dataset.isFilter, + }; + }, + }, + }); + }); + return this; + }; + + $(function () { + // Initialize all autocomplete widgets except the one in the template + // form used when a new formset is added. + $(".admin-autocomplete").not("[name*=__prefix__]").djangoAdminSelect2(); + }); + + document.addEventListener("formset:added", (event) => { + $(event.target).find(".admin-autocomplete").djangoAdminSelect2(); + }); +} diff --git a/openwisp_users/tests/test_api/__init__.py b/openwisp_users/tests/test_api/__init__.py index 2efa2ba5..54f5711a 100644 --- a/openwisp_users/tests/test_api/__init__.py +++ b/openwisp_users/tests/test_api/__init__.py @@ -12,5 +12,223 @@ def _obtain_auth_token(self, username="operator", password="tester"): return response.data["token"] -class APITestCase(TestMultitenantAdminMixin, AuthenticationMixin, TestCase): +class TestMultitenantApiMixin(TestMultitenantAdminMixin): + def _test_access_shared_object( + self, + token, + listview_name=None, + listview_path=None, + detailview_name=None, + detailview_path=None, + create_payload=None, + update_payload=None, + expected_count=0, + expected_status_codes=None, + ): + auth = dict(HTTP_AUTHORIZATION=f"Bearer {token}") + create_payload = create_payload or {} + update_payload = update_payload or {} + expected_status_codes = expected_status_codes or {} + + if listview_name or listview_path: + listview_path = listview_path or reverse(listview_name) + with self.subTest("HEAD and OPTION methods"): + response = self.client.head(listview_path, **auth) + self.assertEqual(response.status_code, expected_status_codes["head"]) + + response = self.client.options(listview_path, **auth) + self.assertEqual(response.status_code, expected_status_codes["option"]) + + with self.subTest("Create shared object"): + response = self.client.post( + listview_path, + data=create_payload, + content_type="application/json", + **auth, + ) + self.assertEqual(response.status_code, expected_status_codes["create"]) + if ( + expected_status_codes["create"] == 400 + and "organization" in create_payload + ): + self.assertEqual( + str(response.data["organization"][0]), + "This field may not be null.", + ) + + with self.subTest("List all shared objects"): + response = self.client.get(listview_path, **auth) + self.assertEqual(response.status_code, expected_status_codes["list"]) + data = response.data + if not isinstance(response.data, list): + data = data.get("results", data) + self.assertEqual(len(data), expected_count) + + if detailview_name or detailview_path: + if not detailview_path and listview_path and expected_count > 0: + detailview_path = reverse(detailview_name, args=[data[0]["id"]]) + + with self.subTest("Retrieve shared object"): + response = self.client.get(detailview_path, **auth) + self.assertEqual( + response.status_code, expected_status_codes["retrieve"] + ) + + with self.subTest("Update shared object"): + response = self.client.put( + detailview_path, + data=update_payload, + content_type="application/json", + **auth, + ) + self.assertEqual(response.status_code, expected_status_codes["update"]) + + with self.subTest("Delete shared object"): + response = self.client.delete(detailview_path, **auth) + self.assertEqual(response.status_code, expected_status_codes["delete"]) + + def _test_org_user_access_shared_object( + self, + listview_name=None, + listview_path=None, + detailview_name=None, + detailview_path=None, + create_payload=None, + update_payload=None, + expected_count=0, + expected_status_codes=None, + token=None, + ): + """ + Non-superusers can only view shared objects. + They cannot create, update, or delete them. + """ + if not token: + user = self._create_administrator(organizations=[self._get_org()]) + token = self._obtain_auth_token(user.username, "tester") + if not expected_status_codes: + expected_status_codes = { + "create": 400, + "list": 200, + "retrieve": 200, + "update": 403, + "delete": 403, + "head": 200, + "option": 200, + } + self._test_access_shared_object( + token=token, + listview_name=listview_name, + listview_path=listview_path, + detailview_name=detailview_name, + detailview_path=detailview_path, + create_payload=create_payload, + update_payload=update_payload, + expected_count=expected_count, + expected_status_codes=expected_status_codes, + ) + + def _test_superuser_access_shared_object( + self, + token, + listview_path=None, + listview_name=None, + detailview_path=None, + detailview_name=None, + create_payload=None, + update_payload=None, + expected_count=1, + expected_status_codes=None, + ): + """ + Superusers can perform all operations on shared objects. + """ + if not token: + user = self._get_admin() + token = self._obtain_auth_token(user.username, "tester") + if not expected_status_codes: + expected_status_codes = { + "create": 201, + "list": 200, + "retrieve": 200, + "update": 200, + "delete": 204, + "head": 200, + "option": 200, + } + self._test_access_shared_object( + token=token, + listview_name=listview_name, + listview_path=listview_path, + detailview_name=detailview_name, + detailview_path=detailview_path, + create_payload=create_payload, + update_payload=update_payload, + expected_count=expected_count, + expected_status_codes=expected_status_codes, + ) + + def _test_sensitive_fields_visibility_on_shared_and_org_objects( + self, + sensitive_fields, + shared_obj, + org_obj, + detailview_name, + listview_name, + organization, + org_admin=None, + super_user=None, + ): + def assert_sensitive_fields_visibility(obj, user, should_be_visible=False): + token = self._obtain_auth_token(user.username, "tester") + auth = {"HTTP_AUTHORIZATION": f"Bearer {token}"} + # List view + listview_path = reverse(listview_name) + response = self.client.get(listview_path, **auth) + self.assertEqual(response.status_code, 200) + results = ( + response.data + if "results" not in response.data + else response.data["results"] + ) + for item in results: + if str(item["id"]) == str(obj.pk): + break + for field in sensitive_fields: + self.assertEqual( + field in item, + should_be_visible, + ) + # Detail view + detailview_path = reverse(detailview_name, args=[obj.pk]) + response = self.client.get(detailview_path, **auth) + self.assertEqual(response.status_code, 200) + for field in sensitive_fields: + if should_be_visible: + self.assertIn(field, response.data) + else: + self.assertNotIn(field, response.data) + + org_admin = org_admin or self._create_administrator( + organizations=[organization] + ) + super_user = super_user or self._get_admin() + + with self.subTest("Org admin should not see sensitive fields in shared object"): + assert_sensitive_fields_visibility( + shared_obj, org_admin, should_be_visible=False + ) + + with self.subTest("Org admin should see sensitive fields in org object"): + assert_sensitive_fields_visibility( + org_obj, org_admin, should_be_visible=True + ) + + with self.subTest("Superuser should see sensitive fields in shared object"): + assert_sensitive_fields_visibility( + shared_obj, super_user, should_be_visible=True + ) + + +class APITestCase(TestMultitenantApiMixin, AuthenticationMixin, TestCase): pass diff --git a/openwisp_users/tests/utils.py b/openwisp_users/tests/utils.py index 6c8bbe27..1168794b 100644 --- a/openwisp_users/tests/utils.py +++ b/openwisp_users/tests/utils.py @@ -1,10 +1,13 @@ from datetime import date +import django from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission from django.urls import reverse from swapper import load_model +from openwisp_users.multitenancy import SHARED_SYSTEMWIDE_LABEL + Organization = load_model("openwisp_users", "Organization") OrganizationOwner = load_model("openwisp_users", "OrganizationOwner") OrganizationUser = load_model("openwisp_users", "OrganizationUser") @@ -236,9 +239,124 @@ def _test_recoverlist_operator_403(self, app_label, model_label): ) self.assertEqual(response.status_code, 403) - def _get_autocomplete_view_path(self, app_label, model_name, field_name): + def _test_org_admin_create_shareable_object( + self, + path, + payload, + model, + expected_count=0, + user=None, + error_message=None, + raises_error=True, + ): + """ + Verifies a non-superuser cannot create a shareable object + """ + if not user: + user = self._create_administrator(organizations=[self._get_org()]) + self.client.force_login(user) + response = self.client.post( + path, + data=payload, + follow=True, + ) + if raises_error: + error_message = error_message or ( + '
\n' + ' " + ).format(' id="id_organization_error"' if django.VERSION >= (5, 2) else "") + self.assertContains(response, error_message) + self.assertEqual(model.objects.count(), expected_count) + + def _test_org_admin_view_shareable_object( + self, path, user=None, expected_element=None + ): + """ + Verifies a non-superuser can view a shareable object + """ + if not user: + user = self._create_administrator(organizations=[self._get_org()]) + self.client.force_login(user) + response = self.client.get(path, follow=True) + self.assertEqual(response.status_code, 200) + if not expected_element: + expected_element = ( + '
\n\n\n
\n\n' + '
\n\n' + "\n\n" + '
-
\n\n\n' + "
\n\n
\n\n\n
" + ) + self.assertContains(response, expected_element, html=True) + + def _test_sensitive_fields_visibility_on_shared_and_org_objects( + self, + sensitive_fields, + shared_obj_path, + org_obj_path, + organization, + org_admin=None, + super_user=None, + ): + org_admin = org_admin or self._create_administrator( + organizations=[organization] + ) + super_user = super_user or self._get_admin() + + self.client.force_login(org_admin) + with self.subTest("Org admin should not see sensitive fields in shared object"): + response = self.client.get(shared_obj_path) + self.assertEqual(response.status_code, 200) + for field in sensitive_fields: + self.assertNotContains(response, field) + + with self.subTest("Org admin should see sensitive fields in org object"): + response = self.client.get(org_obj_path) + self.assertEqual(response.status_code, 200) + for field in sensitive_fields: + self.assertContains(response, field) + + self.client.force_login(super_user) + with self.subTest("Superuser should see sensitive fields in shared object"): + response = self.client.get(shared_obj_path) + self.assertEqual(response.status_code, 200) + for field in sensitive_fields: + self.assertContains(response, field) + + def _test_object_organization_fk_autocomplete_view( + self, + model, + ): + app_label = model._meta.app_label + model_name = model._meta.model_name + path = self._get_autocomplete_view_path(app_label, model_name, "organization") + org = self._get_org() + admin = User.objects.filter(is_superuser=True).first() + if not admin: + admin = self._create_admin() + org_admin = self._create_administrator(organizations=[org]) + + with self.subTest("Org admin should only see their own org"): + self.client.force_login(org_admin) + response = self.client.get(path) + self.assertEqual(response.status_code, 200) + self.assertContains(response, org.name) + self.assertNotContains(response, SHARED_SYSTEMWIDE_LABEL) + + with self.subTest("Superuser should see all orgs and shared label"): + self.client.force_login(admin) + response = self.client.get(path) + self.assertEqual(response.status_code, 200) + self.assertContains(response, org.name) + self.assertContains(response, SHARED_SYSTEMWIDE_LABEL) + + def _get_autocomplete_view_path( + self, app_label, model_name, field_name, is_filter=False + ): path = reverse("admin:ow-auto-filter") return ( f"{path}?app_label={app_label}" f"&model_name={model_name}&field_name={field_name}" + "{}".format("&is_filter=true" if is_filter else "") ) diff --git a/openwisp_users/views.py b/openwisp_users/views.py index 5fbb9c75..e030baae 100644 --- a/openwisp_users/views.py +++ b/openwisp_users/views.py @@ -43,5 +43,5 @@ def get_empty_label(self): def get_allow_null(self): if self.object_list.model == Organization: - return self.request.user.is_superuser + return self.request.user.is_superuser or self.request.GET.get("is_filter") return super().get_allow_null() diff --git a/requirements.txt b/requirements.txt index de7a7b3c..01eb2cae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,6 @@ django-extensions>=3.2,<4.2 django-allauth[socialaccount]~=65.8.1 django-phonenumber-field~=8.1.0 phonenumbers~=9.0.8 -openwisp-utils[rest,celery] @ https://github.com/openwisp/openwisp-utils/tarball/1.2 +openwisp-utils[rest,celery] @ https://github.com/openwisp/openwisp-utils/tarball/bump-drf packaging django-sesame~=3.2.3 diff --git a/tests/testapp/migrations/0007_template_secrets.py b/tests/testapp/migrations/0007_template_secrets.py new file mode 100644 index 00000000..37479182 --- /dev/null +++ b/tests/testapp/migrations/0007_template_secrets.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.1 on 2025-07-04 13:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("testapp", "0006_alter_book_shelf"), + ] + + operations = [ + migrations.AddField( + model_name="template", + name="secrets", + field=models.TextField(blank=True, null=True), + ), + ] diff --git a/tests/testapp/models.py b/tests/testapp/models.py index b43c834f..b1abff38 100644 --- a/tests/testapp/models.py +++ b/tests/testapp/models.py @@ -7,7 +7,12 @@ class Template(ShareableOrgMixin): + sensitive_fields = ["secrets"] name = models.CharField(max_length=16) + secrets = models.TextField( + blank=True, + null=True, + ) def __str__(self): return self.name diff --git a/tests/testapp/tests/mixins.py b/tests/testapp/tests/mixins.py index 4f323cd4..e55a4863 100644 --- a/tests/testapp/tests/mixins.py +++ b/tests/testapp/tests/mixins.py @@ -1,10 +1,7 @@ -from openwisp_users.tests.test_api import AuthenticationMixin -from openwisp_users.tests.utils import TestMultitenantAdminMixin +from openwisp_users.tests.test_api import AuthenticationMixin, TestMultitenantApiMixin from .. import CreateMixin -class TestMultitenancyMixin( - CreateMixin, TestMultitenantAdminMixin, AuthenticationMixin -): +class TestMultitenancyMixin(CreateMixin, TestMultitenantApiMixin, AuthenticationMixin): pass diff --git a/tests/testapp/tests/test_admin.py b/tests/testapp/tests/test_admin.py index b27b3a33..b32bf3d5 100644 --- a/tests/testapp/tests/test_admin.py +++ b/tests/testapp/tests/test_admin.py @@ -1,12 +1,11 @@ import os -import django from django.contrib.auth import get_user_model from django.test import TestCase from django.urls import reverse from swapper import load_model -from openwisp_users.tests.utils import TestOrganizationMixin +from openwisp_users.tests.utils import TestMultitenantAdminMixin, TestOrganizationMixin from ..models import Template @@ -45,30 +44,42 @@ def test_accounts_login(self): ) -class TestTemplateAdmin(TestOrganizationMixin, TestCase): +class TestTemplateAdmin(TestMultitenantAdminMixin, TestCase): + def _create_template(self, **kwargs): + if "organization" not in kwargs: + kwargs["organization"] = self._get_org() + options = { + "name": "test-template", + } + options.update(kwargs) + template = Template(**options) + template.full_clean() + template.save() + return template + def test_org_admin_create_shareable_template(self): - administrator = self._create_administrator() - self.client.force_login(administrator) - response = self.client.post( + self._test_org_admin_create_shareable_object( reverse("admin:testapp_template_add"), - data={ + payload={ "name": "test-template", "organization": "", }, - follow=True, + model=Template, ) - self.assertContains( - response, - ( - '
\n' - ' " - ).format(' id="id_organization_error"' if django.VERSION >= (5, 2) else ""), + + def test_template_organization_autocomplete_view(self): + self._test_object_organization_fk_autocomplete_view( + Template, + ) + + def test_org_admin_view_shared_template(self): + template = self._create_template(organization=None) + self._test_org_admin_view_shareable_object( + reverse("admin:testapp_template_change", args=(template.id,)), ) - self.assertEqual(Template.objects.count(), 0) def test_superuser_create_shareable_template(self): - admin = self._create_admin() + admin = self._get_admin() self.client.force_login(admin) response = self.client.post( reverse("admin:testapp_template_add"), @@ -80,3 +91,23 @@ def test_superuser_create_shareable_template(self): ) self.assertEqual(response.status_code, 200) self.assertEqual(Template.objects.count(), 1) + + def test_hide_sensitive_fields_for_shared_object(self): + """ + Test that sensitive fields are hidden for shared objects for non-superusers. + """ + org = self._get_org() + shared_template = self._create_template( + organization=None, secrets="shared-secret" + ) + org_template = self._create_template(organization=org, secrets="org-secret") + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["secrets"], + shared_obj_path=reverse( + "admin:testapp_template_change", args=(shared_template.id,) + ), + org_obj_path=reverse( + "admin:testapp_template_change", args=(org_template.id,) + ), + organization=org, + ) diff --git a/tests/testapp/tests/test_filter_classes.py b/tests/testapp/tests/test_filter_classes.py index 71d1dcb0..53be3861 100644 --- a/tests/testapp/tests/test_filter_classes.py +++ b/tests/testapp/tests/test_filter_classes.py @@ -1,22 +1,22 @@ from django.contrib.auth import get_user_model -from django.test import TestCase from django.urls import reverse from packaging.version import parse as version_parse from rest_framework import VERSION as REST_FRAMEWORK_VERSION from swapper import load_model from openwisp_users.api.throttling import AuthRateThrottle +from openwisp_users.tests.test_api import APITestCase from openwisp_utils.tests import AssertNumQueriesSubTestMixin +from .. import CreateMixin from ..models import Book, Library, Shelf, Tag -from .mixins import TestMultitenancyMixin OrganizationUser = load_model("openwisp_users", "OrganizationUser") OrganizationOwner = load_model("openwisp_users", "OrganizationOwner") User = get_user_model() -class TestFilterClasses(AssertNumQueriesSubTestMixin, TestMultitenancyMixin, TestCase): +class TestFilterClasses(AssertNumQueriesSubTestMixin, CreateMixin, APITestCase): def setUp(self): AuthRateThrottle.rate = 0 self.shelf_model = Shelf diff --git a/tests/testapp/tests/test_permission_classes.py b/tests/testapp/tests/test_permission_classes.py index 11746c4e..6a1fcd6f 100644 --- a/tests/testapp/tests/test_permission_classes.py +++ b/tests/testapp/tests/test_permission_classes.py @@ -1,20 +1,20 @@ from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission -from django.test import TestCase from django.urls import reverse from swapper import load_model from openwisp_users.api.throttling import AuthRateThrottle +from openwisp_users.tests.test_api import APITestCase +from .. import CreateMixin from ..models import Template -from .mixins import TestMultitenancyMixin User = get_user_model() Group = load_model("openwisp_users", "Group") OrganizationUser = load_model("openwisp_users", "OrganizationUser") -class TestPermissionClasses(TestMultitenancyMixin, TestCase): +class TestPermissionClasses(CreateMixin, APITestCase): def setUp(self): AuthRateThrottle.rate = 0 self.template_model = Template @@ -231,96 +231,24 @@ def test_view_django_model_permission_with_change_perm(self): ) self.assertEqual(response.status_code, 200) - def _test_access_shared_object( - self, token, expected_templates_count=1, expected_status_codes={} - ): - auth = dict(HTTP_AUTHORIZATION=f"Bearer {token}") - template = self._create_template(organization=None) - - with self.subTest("Test listing templates"): - response = self.client.get(reverse("test_template_list"), **auth) - data = response.data.copy() - # Only check "templates" in response. - if isinstance(data, dict): - data.pop("detail", None) - self.assertEqual(response.status_code, expected_status_codes["list"]) - self.assertEqual(len(data), expected_templates_count) - - with self.subTest("Test creating template"): - response = self.client.post( - reverse("test_template_list"), - data={"name": "Test Template", "organization": None}, - content_type="application/json", - **auth, - ) - self.assertEqual(response.status_code, expected_status_codes["create"]) - if expected_status_codes["create"] == 400: - self.assertEqual( - str(response.data["organization"][0]), "This field may not be null." - ) - - with self.subTest("Test retreiving template"): - response = self.client.get( - reverse("test_template_detail", args=[template.id]), **auth - ) - self.assertEqual(response.status_code, expected_status_codes["retrieve"]) - - with self.subTest("Test updating template"): - response = self.client.put( - reverse("test_template_detail", args=[template.id]), - data={"name": "Name changed"}, - content_type="application/json", - **auth, - ) - self.assertEqual(response.status_code, expected_status_codes["update"]) - - with self.subTest("Test deleting template"): - response = self.client.delete( - reverse("test_template_detail", args=[template.id]), **auth - ) - self.assertEqual(response.status_code, expected_status_codes["delete"]) - - with self.subTest("Test HEAD and OPTION methods"): - response = self.client.head(reverse("test_template_list"), **auth) - self.assertEqual(response.status_code, expected_status_codes["head"]) - - response = self.client.options(reverse("test_template_list"), **auth) - self.assertEqual(response.status_code, expected_status_codes["option"]) - def test_superuser_access_shared_object(self): - superuser = self._get_admin() - token = self._obtain_auth_token(username=superuser) - self._test_access_shared_object( - token, - expected_status_codes={ - "create": 201, - "list": 200, - "retrieve": 200, - "update": 200, - "delete": 204, - "head": 200, - "option": 200, - }, + self._test_superuser_access_shared_object( + token=None, + listview_name="test_template_list", + detailview_name="test_template_detail", + create_payload={"name": "test", "organization": ""}, + update_payload={"name": "updated-test"}, + expected_count=1, ) def test_org_manager_access_shared_object(self): - org_manager = self._create_administrator() - token = self._obtain_auth_token(username=org_manager) - # First user is automatically owner, so created dummy - # user to keep operator as manager only. - self._create_org_user(user=self._get_user(), is_admin=True) - self._create_org_user(user=org_manager, is_admin=True) - self._test_access_shared_object( - token, - expected_status_codes={ - "create": 400, - "list": 200, - "retrieve": 200, - "update": 403, - "delete": 403, - "head": 200, - "option": 200, - }, + template = self._create_template(organization=None) + self._test_org_user_access_shared_object( + listview_path=reverse("test_template_list"), + detailview_path=reverse("test_template_detail", args=[template.pk]), + create_payload={"name": "test", "organization": ""}, + update_payload={"name": "updated-test"}, + expected_count=1, ) def test_org_owner_access_shared_object(self): @@ -328,17 +256,14 @@ def test_org_owner_access_shared_object(self): # becomes organization owner. org_owner = self._create_administrator(organizations=[self._get_org()]) token = self._obtain_auth_token(username=org_owner) - self._test_access_shared_object( - token, - expected_status_codes={ - "create": 400, - "list": 200, - "retrieve": 200, - "update": 403, - "delete": 403, - "head": 200, - "option": 200, - }, + template = self._create_template(organization=None) + self._test_org_user_access_shared_object( + listview_path=reverse("test_template_list"), + detailview_path=reverse("test_template_detail", args=[template.pk]), + create_payload={"name": "test", "organization": ""}, + update_payload={"name": "updated-test"}, + expected_count=1, + token=token, ) def test_org_user_access_shared_object(self): @@ -348,9 +273,14 @@ def test_org_user_access_shared_object(self): user = self._create_administrator() token = self._obtain_auth_token(username=user) self._create_org_user(user=user, is_admin=False) - self._test_access_shared_object( - token, - expected_templates_count=0, + template = self._create_template(organization=None) + self._test_org_user_access_shared_object( + listview_path=reverse("test_template_list"), + detailview_path=reverse("test_template_detail", args=[template.pk]), + create_payload={"name": "test", "organization": ""}, + update_payload={"name": "updated-test"}, + expected_count=0, + token=token, expected_status_codes={ "create": 400, "list": 200, @@ -361,3 +291,21 @@ def test_org_user_access_shared_object(self): "option": 200, }, ) + + def test_template_sensitive_fields_visibility(self): + """ + Test that sensitive fields are hidden for shared objects for non-superusers. + """ + org = self._get_org() + shared_template = self._create_template( + organization=None, secrets="shared-secret" + ) + org_template = self._create_template(organization=org, secrets="org-secret") + self._test_sensitive_fields_visibility_on_shared_and_org_objects( + sensitive_fields=["secrets"], + shared_obj=shared_template, + org_obj=org_template, + detailview_name="test_template_detail", + listview_name="test_template_list", + organization=org, + ) diff --git a/tests/testapp/tests/test_selenium.py b/tests/testapp/tests/test_selenium.py index 4d99b71c..4ecb57b1 100644 --- a/tests/testapp/tests/test_selenium.py +++ b/tests/testapp/tests/test_selenium.py @@ -24,13 +24,17 @@ def setUp(self): ) def _test_multitenant_autocomplete_org_field( - self, username, password, path, visible, hidden + self, + username, + password, + path, + visible, + hidden, + select2_selector="#select2-id_organization-container", ): self.login(username=username, password=password) self.open(path) - self.web_driver.find_element( - By.CSS_SELECTOR, "#select2-id_organization-container" - ).click() + self.web_driver.find_element(By.CSS_SELECTOR, select2_selector).click() WebDriverWait(self.web_driver, 2).until( EC.invisibility_of_element_located( (By.CSS_SELECTOR, ".select2-results__option.loading-results") @@ -143,3 +147,30 @@ def test_shelf_add_form_organization_field(self): self.assertEqual(len(org_select.all_selected_options), 1) self.assertEqual(org_select.first_selected_option.text, org1.name) self.logout() + + def test_organization_autocomplete_filter(self): + """ + The autocomplete_filter should show the "Shared systemwide (no organization)" + option to non-superuser users. + """ + path = reverse("admin:testapp_shelf_changelist") + org1 = self._create_org(name="org1") + administrator = self._create_administrator( + organizations=[org1], username="tester", password="tester" + ) + administrator.user_permissions.add( + *Permission.objects.filter( + Q(codename__contains="shelf") | Q(codename="view_organization") + ).values_list("id", flat=True), + ) + self._test_multitenant_autocomplete_org_field( + path=path, + username="tester", + password="tester", + visible=[org1.name, "Shared systemwide (no organization)"], + hidden=list( + Organization.objects.exclude(id=org1.id).values_list("name", flat=True) + ), + select2_selector="#select2-id-organization-dal-filter-container", + ) + self.logout()