Skip to content

Commit 65c6b17

Browse files
committed
[feature] Allowed ready only access of shared objects to non-superusers #238
Closes #238
1 parent 1cafaf9 commit 65c6b17

File tree

12 files changed

+437
-140
lines changed

12 files changed

+437
-140
lines changed

docs/user/basic-concepts.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ different roles in each.
157157

158158
Here's a summary of the default organization roles.
159159

160+
.. _users_organization_manager:
161+
160162
Organization Manager
161163
~~~~~~~~~~~~~~~~~~~~
162164

@@ -241,6 +243,18 @@ objects are defined and managed by super administrators and can include
241243
configurations, policies, or other data that need to be consistent across
242244
all organizations.
243245

246+
Shared objects can only be created by superusers. Non-superusers (e.g.
247+
:ref:`users_organization_manager`) have view-only access to shared
248+
objects, both through the admin interface and the REST API. However, they
249+
can use these shared objects when creating related organization-specific
250+
resources. For example, an organization manager can use a shared VPN
251+
server to create a configuration template for their organization.
252+
253+
In some cases, non-superusers may be restricted from viewing sensitive
254+
details of shared objects—particularly if such information could allow
255+
them to gain unauthorized access to infrastructure or data used by other
256+
organizations.
257+
244258
By sharing common resources, global uniformity and consistency can be
245259
enforced across the entire system.
246260

openwisp_users/multitenancy.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,49 @@ def get_queryset(self, request):
4949
if user.is_superuser:
5050
return qs
5151
if hasattr(self.model, "organization"):
52-
return qs.filter(organization__in=user.organizations_managed)
52+
return qs.filter(
53+
Q(organization__in=user.organizations_managed) | Q(organization=None)
54+
)
5355
if self.model.__name__ == "Organization":
5456
return qs.filter(pk__in=user.organizations_managed)
5557
elif not self.multitenant_parent:
5658
return qs
5759
else:
58-
qsarg = "{0}__organization__in".format(self.multitenant_parent)
59-
return qs.filter(**{qsarg: user.organizations_managed})
60+
qsarg = f"{self.multitenant_parent}__organization"
61+
return qs.filter(
62+
Q(**{f"{qsarg}__in": user.organizations_managed}) | Q(**{qsarg: None})
63+
)
64+
65+
def _has_org_permission(self, request, obj, perm_func):
66+
"""
67+
Helper method to check object-level permissions for users
68+
associated with specific organizations.
69+
"""
70+
perm = perm_func(request, obj)
71+
if obj and self.multitenant_parent:
72+
# In case of a multitenant parent, we need to check if the
73+
# user has permission on the parent object.
74+
obj = getattr(obj, self.multitenant_parent)
75+
if not request.user.is_superuser and obj and hasattr(obj, "organization_id"):
76+
perm = perm and (
77+
obj.organization_id
78+
and str(obj.organization_id) in request.user.organizations_managed
79+
)
80+
return perm
81+
82+
def has_change_permission(self, request, obj=None):
83+
"""
84+
Returns True if the user has permission to change the object.
85+
Non-superusers cannot change shared objects.
86+
"""
87+
return self._has_org_permission(request, obj, super().has_change_permission)
88+
89+
def has_delete_permission(self, request, obj=None):
90+
"""
91+
Returns True if the user has permission to delete the object.
92+
Non-superusers cannot change shared objects.
93+
"""
94+
return self._has_org_permission(request, obj, super().has_delete_permission)
6095

6196
def _edit_form(self, request, form):
6297
"""
@@ -149,7 +184,9 @@ class MultitenantOrgFilter(AutocompleteFilter):
149184
org_lookup = "id__in"
150185
title = _("organization")
151186
widget_attrs = AutocompleteFilter.widget_attrs.copy()
152-
widget_attrs.update({"data-empty-label": SHARED_SYSTEMWIDE_LABEL})
187+
widget_attrs.update(
188+
{"data-empty-label": SHARED_SYSTEMWIDE_LABEL, "data-is-filter": "true"}
189+
)
153190

154191

155192
class MultitenantRelatedOrgFilter(MultitenantOrgFilter):
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Custom override of Django's autocomplete.js to add the "is_filter" parameter
2+
// to AJAX requests. This parameter enables the backend autocomplete view to
3+
// determine if the request is for filtering, allowing it to include the shared
4+
// option when appropriate (e.g., for filtering shared objects in the admin).
5+
6+
"use strict";
7+
{
8+
const $ = django.jQuery;
9+
10+
$.fn.djangoAdminSelect2 = function () {
11+
$.each(this, function (i, element) {
12+
$(element).select2({
13+
ajax: {
14+
data: (params) => {
15+
return {
16+
term: params.term,
17+
page: params.page,
18+
app_label: element.dataset.appLabel,
19+
model_name: element.dataset.modelName,
20+
field_name: element.dataset.fieldName,
21+
is_filter: element.dataset.isFilter,
22+
};
23+
},
24+
},
25+
});
26+
});
27+
return this;
28+
};
29+
30+
$(function () {
31+
// Initialize all autocomplete widgets except the one in the template
32+
// form used when a new formset is added.
33+
$(".admin-autocomplete").not("[name*=__prefix__]").djangoAdminSelect2();
34+
});
35+
36+
document.addEventListener("formset:added", (event) => {
37+
$(event.target).find(".admin-autocomplete").djangoAdminSelect2();
38+
});
39+
}

openwisp_users/tests/test_api/__init__.py

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,159 @@ def _obtain_auth_token(self, username="operator", password="tester"):
1212
return response.data["token"]
1313

1414

15-
class APITestCase(TestMultitenantAdminMixin, AuthenticationMixin, TestCase):
15+
class TestMultitenantApiMixin(TestMultitenantAdminMixin):
16+
def _test_access_shared_object(
17+
self,
18+
token,
19+
listview_name=None,
20+
listview_path=None,
21+
detailview_name=None,
22+
detailview_path=None,
23+
create_payload=None,
24+
update_payload=None,
25+
expected_count=0,
26+
expected_status_codes=None,
27+
):
28+
auth = dict(HTTP_AUTHORIZATION=f"Bearer {token}")
29+
create_payload = create_payload or {}
30+
update_payload = update_payload or {}
31+
expected_status_codes = expected_status_codes or {}
32+
33+
if listview_name or listview_path:
34+
listview_path = listview_path or reverse(listview_name)
35+
with self.subTest("HEAD and OPTION methods"):
36+
response = self.client.head(listview_path, **auth)
37+
self.assertEqual(response.status_code, expected_status_codes["head"])
38+
39+
response = self.client.options(listview_path, **auth)
40+
self.assertEqual(response.status_code, expected_status_codes["option"])
41+
42+
with self.subTest("Create shared object"):
43+
response = self.client.post(
44+
listview_path,
45+
data=create_payload,
46+
content_type="application/json",
47+
**auth,
48+
)
49+
self.assertEqual(response.status_code, expected_status_codes["create"])
50+
if expected_status_codes["create"] == 400:
51+
self.assertEqual(
52+
str(response.data["organization"][0]),
53+
"This field may not be null.",
54+
)
55+
56+
with self.subTest("List all shared objects"):
57+
response = self.client.get(listview_path, **auth)
58+
self.assertEqual(response.status_code, expected_status_codes["list"])
59+
data = response.data
60+
if not isinstance(response.data, list):
61+
data = data.get("results", data)
62+
self.assertEqual(len(data), expected_count)
63+
64+
if detailview_name or detailview_path:
65+
if not detailview_path and listview_name and expected_count > 0:
66+
detailview_path = reverse(detailview_name, args=[data[0]["id"]])
67+
68+
with self.subTest("Retrieve shared object"):
69+
response = self.client.get(detailview_path, **auth)
70+
self.assertEqual(
71+
response.status_code, expected_status_codes["retrieve"]
72+
)
73+
74+
with self.subTest("Update shared object"):
75+
response = self.client.put(
76+
detailview_path,
77+
data=update_payload,
78+
content_type="application/json",
79+
**auth,
80+
)
81+
self.assertEqual(response.status_code, expected_status_codes["update"])
82+
83+
with self.subTest("Delete shared object"):
84+
response = self.client.delete(detailview_path, **auth)
85+
self.assertEqual(response.status_code, expected_status_codes["delete"])
86+
87+
def _test_org_user_access_shared_object(
88+
self,
89+
listview_name=None,
90+
listview_path=None,
91+
detailview_name=None,
92+
detailview_path=None,
93+
create_payload=None,
94+
update_payload=None,
95+
expected_count=0,
96+
expected_status_codes=None,
97+
token=None,
98+
):
99+
"""
100+
Non-superusers can only view shared objects.
101+
They cannot create, update, or delete them.
102+
"""
103+
if not token:
104+
user = self._create_administrator(organizations=[self._get_org()])
105+
token = self._obtain_auth_token(user.username, "tester")
106+
if not expected_status_codes:
107+
expected_status_codes = {
108+
"create": 400,
109+
"list": 200,
110+
"retrieve": 200,
111+
"update": 403,
112+
"delete": 403,
113+
"head": 200,
114+
"option": 200,
115+
}
116+
self._test_access_shared_object(
117+
token=token,
118+
listview_name=listview_name,
119+
listview_path=listview_path,
120+
detailview_name=detailview_name,
121+
detailview_path=detailview_path,
122+
create_payload=create_payload,
123+
update_payload=update_payload,
124+
expected_count=expected_count,
125+
expected_status_codes=expected_status_codes,
126+
)
127+
128+
def _test_superuser_access_shared_object(
129+
self,
130+
token,
131+
listview_path=None,
132+
listview_name=None,
133+
detailview_path=None,
134+
detailview_name=None,
135+
create_payload=None,
136+
update_payload=None,
137+
expected_count=1,
138+
expected_status_codes=None,
139+
):
140+
"""
141+
Superusers can perform all operations on shared objects.
142+
"""
143+
if not token:
144+
user = self._create_admin()
145+
token = self._obtain_auth_token(user.username, "tester")
146+
if not expected_status_codes:
147+
expected_status_codes = {
148+
"create": 201,
149+
"list": 200,
150+
"retrieve": 200,
151+
"update": 200,
152+
"delete": 204,
153+
"head": 200,
154+
"option": 200,
155+
}
156+
self._test_access_shared_object(
157+
token=token,
158+
listview_name=listview_name,
159+
listview_path=listview_path,
160+
detailview_name=detailview_name,
161+
detailview_path=detailview_path,
162+
create_payload=create_payload,
163+
update_payload=update_payload,
164+
expected_count=expected_count,
165+
expected_status_codes=expected_status_codes,
166+
)
167+
168+
169+
class APITestCase(TestMultitenantApiMixin, AuthenticationMixin, TestCase):
16170
pass

openwisp_users/tests/utils.py

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from datetime import date
22

3+
import django
34
from django.contrib.auth import get_user_model
45
from django.contrib.auth.models import Permission
56
from django.urls import reverse
67
from swapper import load_model
78

9+
from openwisp_users.multitenancy import SHARED_SYSTEMWIDE_LABEL
10+
811
Organization = load_model("openwisp_users", "Organization")
912
OrganizationOwner = load_model("openwisp_users", "OrganizationOwner")
1013
OrganizationUser = load_model("openwisp_users", "OrganizationUser")
@@ -236,9 +239,90 @@ def _test_recoverlist_operator_403(self, app_label, model_label):
236239
)
237240
self.assertEqual(response.status_code, 403)
238241

239-
def _get_autocomplete_view_path(self, app_label, model_name, field_name):
242+
def _test_org_admin_create_shareable_object(
243+
self,
244+
path,
245+
payload,
246+
model,
247+
expected_count=0,
248+
user=None,
249+
error_message=None,
250+
raises_error=True,
251+
):
252+
"""
253+
Verifies a non-superuser cannot create a shareable object
254+
"""
255+
if not user:
256+
user = self._create_administrator(organizations=[self._get_org()])
257+
self.client.force_login(user)
258+
response = self.client.post(
259+
path,
260+
data=payload,
261+
follow=True,
262+
)
263+
if raises_error:
264+
error_message = error_message or (
265+
'<div class="form-row errors field-organization">\n'
266+
' <ul class="errorlist"{}>'
267+
"<li>This field is required.</li></ul>"
268+
).format(' id="id_organization_error"' if django.VERSION >= (5, 2) else "")
269+
self.assertContains(response, error_message)
270+
self.assertEqual(model.objects.count(), expected_count)
271+
272+
def _test_org_admin_view_shareable_object(
273+
self, path, user=None, expected_element=None
274+
):
275+
"""
276+
Verifies a non-superuser can view a shareable object
277+
"""
278+
if not user:
279+
user = self._create_administrator(organizations=[self._get_org()])
280+
self.client.force_login(user)
281+
response = self.client.get(path, follow=True)
282+
self.assertEqual(response.status_code, 200)
283+
if not expected_element:
284+
expected_element = (
285+
'<div class="form-row field-organization">\n\n\n<div>\n\n'
286+
'<div class="flex-container">\n\n'
287+
"<label>Organization:</label>\n\n"
288+
'<div class="readonly">-</div>\n\n\n'
289+
"</div>\n\n</div>\n\n\n</div>"
290+
)
291+
self.assertContains(response, expected_element, html=True)
292+
293+
def _test_object_organization_fk_autocomplete_view(
294+
self,
295+
model,
296+
):
297+
app_label = model._meta.app_label
298+
model_name = model._meta.model_name
299+
path = self._get_autocomplete_view_path(app_label, model_name, "organization")
300+
org = self._get_org()
301+
admin = User.objects.filter(is_superuser=True).first()
302+
if not admin:
303+
admin = self._create_admin()
304+
org_admin = self._create_administrator(organizations=[org])
305+
306+
with self.subTest("Org admin should only see their own org"):
307+
self.client.force_login(org_admin)
308+
response = self.client.get(path)
309+
self.assertEqual(response.status_code, 200)
310+
self.assertContains(response, org.name)
311+
self.assertNotContains(response, SHARED_SYSTEMWIDE_LABEL)
312+
313+
with self.subTest("Superuser should see all orgs and shared label"):
314+
self.client.force_login(admin)
315+
response = self.client.get(path)
316+
self.assertEqual(response.status_code, 200)
317+
self.assertContains(response, org.name)
318+
self.assertContains(response, SHARED_SYSTEMWIDE_LABEL)
319+
320+
def _get_autocomplete_view_path(
321+
self, app_label, model_name, field_name, is_filter=False
322+
):
240323
path = reverse("admin:ow-auto-filter")
241324
return (
242325
f"{path}?app_label={app_label}"
243326
f"&model_name={model_name}&field_name={field_name}"
327+
"{}".format("&is_filter=true" if is_filter else "")
244328
)

0 commit comments

Comments
 (0)