Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 259 additions & 1 deletion geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,10 @@ def test_sort_resources(self):
reversed_resource_titles = sorted(resource_titles.copy())
self.assertNotEqual(resource_titles, reversed_resource_titles)

@override_settings(
EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=True,
EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=True,
)
def test_perms_resources(self):
"""
Ensure we can Get & Set Permissions across the Resource Base list.
Expand Down Expand Up @@ -1164,7 +1168,9 @@ def test_perms_resources(self):
resource_perm_spec,
)

# Remove perms to Norman
# Remove perms to Norman by explicitly setting permissions to "none".
# Note: Omitting a user from the request preserves their existing permissions.
# To actually remove permissions, you must explicitly set them to "none".
resource_perm_spec = {
"uuid": resource.uuid,
"users": [
Expand All @@ -1178,6 +1184,16 @@ def test_perms_resources(self):
"is_staff": False,
"is_superuser": False,
},
{
"id": norman.id,
"username": norman.username,
"first_name": norman.first_name,
"last_name": norman.last_name,
"avatar": build_absolute_uri(avatar_url(bobby)),
"permissions": "none",
"is_staff": False,
"is_superuser": False,
},
{
"avatar": build_absolute_uri(avatar_url(bobby)),
"first_name": "admin",
Expand Down Expand Up @@ -2341,6 +2357,248 @@ def test_manager_can_edit_map(self):
resource_perm_spec,
)

@override_settings(
EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=False,
EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=False,
)
def test_resource_service_permissions_with_restricted_settings(self):
"""
Test that when EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS and
EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS are set to False,
only staff/admin users can modify anonymous and registered members permissions.
Non-staff editors should not be able to modify these groups' permissions.
"""
resource = Dataset.objects.filter(owner__username="admin").first()
bobby = get_user_model().objects.get(username="bobby")

# Give bobby edit permissions on the resource including permission to manage permissions
resource.set_permissions(
{
"users": {
bobby: [
"base.change_resourcebase",
"base.change_resourcebase_metadata",
"base.change_resourcebase_permissions",
]
}
}
)

# Login as bobby (non-staff editor)
self.assertTrue(self.client.login(username="bobby", password="bob"))

anonymous_group = Group.objects.get(name="anonymous")
registered_group = Group.objects.get(name="registered-members")

# Try to update permissions including anonymous and registered members groups
set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions")
perm_spec = {
"uuid": resource.uuid,
"groups": [
{
"id": anonymous_group.id,
"name": "anonymous",
"permissions": "view",
},
{
"id": registered_group.id,
"name": "registered-members",
"permissions": "download",
},
],
}

response = self.client.put(set_perms_url, data=perm_spec, format="json")
self.assertEqual(response.status_code, 200)

resp_js = json.loads(response.content.decode("utf-8"))
execution_id = resp_js.get("execution_id", "")
status_url = resp_js.get("status_url", None)
self.assertIsNotNone(status_url)
self.assertIsNotNone(execution_id)

for _cnt in range(0, 10):
response = self.client.get(f"{status_url}")
self.assertEqual(response.status_code, 200)
resp_js = json.loads(response.content.decode("utf-8"))
if resp_js.get("status", "") == "finished":
break
else:
resouce_service_dispatcher.apply((execution_id,))
sleep(3.0)

# Get the current permissions to verify anonymous and registered groups were excluded
get_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions")
response = self.client.get(get_perms_url, format="json")
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data

groups_in_response = {g["name"]: g["permissions"] for g in resource_perm_spec.get("groups", [])}

# The groups should still have their original permissions (not the ones bobby tried to set)
# Bobby tried to set anonymous to "view" and registered-members to "download"
# But since he lacks permissions, these changes should NOT have been applied
# Verify the permissions remain unchanged from their original state
self.assertIn("anonymous", groups_in_response)
self.assertIn("registered-members", groups_in_response)
# The permissions should NOT be what bobby tried to set
self.assertNotEqual(
groups_in_response.get("anonymous"), "view", "Bobby should not have been able to set anonymous to view"
)
self.assertNotEqual(
groups_in_response.get("registered-members"),
"download",
"Bobby should not have been able to set registered-members to download",
)

# login as admin (staff user) and verify admin can modify these permissions
self.assertTrue(self.client.login(username="admin", password="admin"))

perm_spec_admin = {
"uuid": resource.uuid,
"groups": [
{
"id": anonymous_group.id,
"name": "anonymous",
"permissions": "view",
},
{
"id": registered_group.id,
"name": "registered-members",
"permissions": "view",
},
],
}

response = self.client.put(set_perms_url, data=perm_spec_admin, format="json")
self.assertEqual(response.status_code, 200)

# Wait for async task to complete
resp_js = json.loads(response.content.decode("utf-8"))
execution_id = resp_js.get("execution_id", "")
status_url = resp_js.get("status_url", None)

for _cnt in range(0, 10):
response = self.client.get(f"{status_url}")
self.assertEqual(response.status_code, 200)
resp_js = json.loads(response.content.decode("utf-8"))
if resp_js.get("status", "") == "finished":
break
else:
resouce_service_dispatcher.apply((execution_id,))
sleep(3.0)

# Verify admin successfully modified the permissions
response = self.client.get(get_perms_url, format="json")
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data

# Admin should have been able to set permissions for these groups
groups_in_response = {g["name"]: g["permissions"] for g in resource_perm_spec.get("groups", [])}
self.assertIn("anonymous", groups_in_response)
self.assertIn("registered-members", groups_in_response)
# Verify admin successfully changed the permissions to what was requested
self.assertEqual(
groups_in_response.get("anonymous"), "view", "Admin should have been able to set anonymous to view"
)
self.assertEqual(
groups_in_response.get("registered-members"),
"view",
"Admin should have been able to set registered-members to view",
)

@override_settings(
EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=True,
EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=False,
)
def test_resource_service_permissions_with_partial_restriction(self):
"""
Test that when only EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS is False,
editors can modify anonymous permissions but not registered members permissions.
"""
resource = Dataset.objects.filter(owner__username="admin").first()
bobby = get_user_model().objects.get(username="bobby")

# Give bobby edit permissions including permission to manage permissions
resource.set_permissions(
{
"users": {
bobby: [
"base.change_resourcebase",
"base.change_resourcebase_metadata",
"base.change_resourcebase_permissions",
]
}
}
)

# Login as bobby
self.assertTrue(self.client.login(username="bobby", password="bob"))

anonymous_group = Group.objects.get(name="anonymous")
registered_group = Group.objects.get(name="registered-members")

set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions")
perm_spec = {
"uuid": resource.uuid,
"groups": [
{
"id": anonymous_group.id,
"name": "anonymous",
"permissions": "view",
},
{
"id": registered_group.id,
"name": "registered-members",
"permissions": "download",
},
],
}

response = self.client.put(set_perms_url, data=perm_spec, format="json")
self.assertEqual(response.status_code, 200)

resp_js = json.loads(response.content.decode("utf-8"))
execution_id = resp_js.get("execution_id", "")
status_url = resp_js.get("status_url", None)

for _cnt in range(0, 10):
response = self.client.get(f"{status_url}")
self.assertEqual(response.status_code, 200)
resp_js = json.loads(response.content.decode("utf-8"))
if resp_js.get("status", "") == "finished":
break
else:
resouce_service_dispatcher.apply((execution_id,))
sleep(3.0)

# Verify bobby could modify anonymous but not registered members
# The registered-members group should have been filtered out from bobby's request
get_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions")
response = self.client.get(get_perms_url, format="json")
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data

groups_in_response = {g["name"]: g["permissions"] for g in resource_perm_spec.get("groups", [])}

# Bobby tried to set anonymous to "view" and registered-members to "download"
# Since EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=True, anonymous should be changed
# Since EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=False, registered-members should NOT be changed
self.assertIn("anonymous", groups_in_response)
self.assertIn("registered-members", groups_in_response)

# Verify anonymous was successfully changed by bobby
self.assertEqual(
groups_in_response.get("anonymous"), "view", "Bobby should have been able to set anonymous to view"
)

# Verify registered-members was NOT changed by bobby
self.assertNotEqual(
groups_in_response.get("registered-members"),
"download",
"Bobby should not have been able to set registered-members to download",
)

def test_resource_service_copy(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)
Expand Down
47 changes: 22 additions & 25 deletions geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
FavoriteFilter,
TKeywordsFilter,
)
from geonode.groups.models import GroupProfile
from geonode.security.permissions import get_compact_perms_list, PermSpec, PermSpecCompact
from geonode.groups.models import GroupProfile, Group
from geonode.security.permissions import get_compact_perms_list, PermSpec
from geonode.security.utils import (
get_visible_resources,
get_resources_with_perms,
Expand Down Expand Up @@ -109,6 +109,7 @@
from geonode.assets.utils import create_asset_and_link, unlink_asset
from geonode.assets.handlers import asset_handler_registry
from geonode.utils import get_supported_datasets_file_types
from geonode.base.utils import patch_perms

import logging

Expand Down Expand Up @@ -615,29 +616,25 @@ def resource_service_permissions(self, request, pk, *args, **kwargs):
action="permissions",
input_params={"uuid": request_params.get("uuid", resource.uuid)},
)
elif request.method == "PUT":

perms_spec_compact = PermSpecCompact(request.data, resource)

if resource.dirty_state:
raise Exception("Cannot update if the resource is in dirty state")
resource.set_dirty_state()
_exec_request = ExecutionRequest.objects.create(
user=request.user,
func_name="set_permissions",
geonode_resource=resource,
action="permissions",
input_params={
"uuid": request_params.get("uuid", resource.uuid),
"permissions": perms_spec_compact.extended,
"created": request_params.get("created", False),
},
)
elif request.method == "PATCH":

perms_spec_compact_patch = PermSpecCompact(request.data, resource)
perms_spec_compact_resource = PermSpecCompact(perms_spec.compact, resource)
perms_spec_compact_resource.merge(perms_spec_compact_patch)
elif request.method in ["PUT", "PATCH"]:
user_perms = permissions_registry.get_perms(instance=resource, user=request.user)
if request.data.get("groups"):
excluded_ids = []
if "can_manage_anonymous_permissions" not in user_perms:
anonymous_group = Group.objects.get(name="anonymous")
excluded_ids.append(anonymous_group.id)
logger.info(
f"User {request.user.username} cannot manage anonymous permissions on resource {resource.pk}"
)
if "can_manage_registered_member_permissions" not in user_perms:
registered_group = Group.objects.get(name=groups_settings.REGISTERED_MEMBERS_GROUP_NAME)
excluded_ids.append(registered_group.id)
logger.info(
f"User {request.user.username} cannot manage registered members permissions on resource {resource.pk}"
)
if excluded_ids:
request.data["groups"] = [g for g in request.data["groups"] if g.get("id") not in excluded_ids]
perms_spec_compact_resource = patch_perms(request.data, perms_spec.compact, resource)

if resource.dirty_state:
raise Exception("Cannot update if the resource is in dirty state")
Expand Down
11 changes: 11 additions & 0 deletions geonode/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from geonode.security.utils import AdvancedSecurityWorkflowManager
from geonode.thumbs.utils import get_thumbs, remove_thumb
from geonode.utils import get_legend_url
from geonode.security.permissions import PermSpecCompact

logger = logging.getLogger("geonode.base.utils")

Expand Down Expand Up @@ -213,3 +214,13 @@ def remove_country_from_languagecode(language: str):

lang, _, _ = language.lower().partition("-")
return lang


def patch_perms(updated_perms_compact, current_perms_compact, resource):
"""
Patch updated permission changes with current permissions.
"""
perms_spec_compact_patch = PermSpecCompact(updated_perms_compact, resource)
perms_spec_compact_resource = PermSpecCompact(current_perms_compact, resource)
perms_spec_compact_resource.merge(perms_spec_compact_patch)
return perms_spec_compact_resource
Loading