Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions ansible_base/authentication/utils/claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from ansible_base.lib.abstract_models import AbstractOrganization, AbstractTeam, CommonModel
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
from ansible_base.lib.utils.string import is_empty
from ansible_base.rbac.models import DABContentType
from ansible_base.rbac.remote import get_local_resource_prefix

from .trigger_definition import TRIGGER_DEFINITION

Expand All @@ -32,6 +30,9 @@
User = get_user_model()


is_rbac_installed = 'ansible_base.rbac' in settings.INSTALLED_APPS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really want to stop doing this referencing of settings. on import. It's an import circularity problem, and here you don't need it anyway. You can make this into a method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we do this all over. We might want to just have a generic method like is_dab_app_installed('rbac')



class TriggerResult(Enum):
ALLOW = auto()
DENY = auto()
Expand Down Expand Up @@ -722,7 +723,7 @@ def reconcile_user_claims(cls, user: AbstractUser, authenticator_user: Authentic

claims = getattr(user, 'claims', authenticator_user.claims)

if 'ansible_base.rbac' in settings.INSTALLED_APPS:
if is_rbac_installed:
cls(claims, user, authenticator_user).manage_permissions()
else:
logger.info(_("Skipping user claims with RBAC roles, because RBAC app is not installed"))
Expand Down Expand Up @@ -876,7 +877,11 @@ class RoleUserAssignmentsCache:
def __init__(self):
self.cache = {}
# NOTE(cutwater): We may probably execute this query once and cache the query results.
self.content_types = {content_type.model: content_type for content_type in DABContentType.objects.get_for_models(Organization, Team).values()}
self.content_types = {}
if is_rbac_installed:
from ansible_base.rbac.models import DABContentType

self.content_types = {content_type.model: content_type for content_type in DABContentType.objects.get_for_models(Organization, Team).values()}
self.role_definitions = {}

def items(self):
Expand Down Expand Up @@ -956,6 +961,12 @@ def cache_existing(self, role_assignments: Iterable[models.Model]) -> None:
- All cached assignments are marked with STATUS_EXISTING status
- Role definitions are also cached separately in self.role_definitions
"""
local_resource_prefixes = ["shared"]
if is_rbac_installed:
from ansible_base.rbac.remote import get_local_resource_prefix

local_resource_prefixes.append(get_local_resource_prefix())

for role_assignment in role_assignments:
# Cache role definition
if (role_definition := self._rd_by_id(role_assignment)) is None:
Expand All @@ -965,7 +976,7 @@ def cache_existing(self, role_assignments: Iterable[models.Model]) -> None:
# Skip role assignments that should not be cached
if not (
role_assignment.content_type is None # Global/system roles (e.g., System Auditor)
or role_assignment.content_type.service in [get_local_resource_prefix(), "shared"]
or role_assignment.content_type.service in local_resource_prefixes
): # Local object roles
continue

Expand Down
11 changes: 6 additions & 5 deletions ansible_base/lib/routers/association_resource_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from rest_framework.response import Response
from rest_framework.viewsets import ViewSetMixin

from ansible_base.rbac.permission_registry import permission_registry

logger = logging.getLogger('ansible_base.lib.routers.association_resource_router')


Expand Down Expand Up @@ -119,10 +117,13 @@ def check_parent_object_permissions(self, request, parent_obj: Model) -> None:
will not check "change" permissions to the parent object on POST
this method checks parent change permission, view permission should be handled by filter_queryset
"""
if (request.method not in SAFE_METHODS) and 'ansible_base.rbac' in settings.INSTALLED_APPS and permission_registry.is_registered(parent_obj):
from ansible_base.rbac.policies import check_content_obj_permission
if (request.method not in SAFE_METHODS) and 'ansible_base.rbac' in settings.INSTALLED_APPS:
from ansible_base.rbac.permission_registry import permission_registry

if permission_registry.is_registered(parent_obj):
from ansible_base.rbac.policies import check_content_obj_permission

check_content_obj_permission(request.user, parent_obj)
check_content_obj_permission(request.user, parent_obj)

def get_parent_object(self) -> Model:
"""Modeled mostly after DRF get_object, but for the parent model
Expand Down
8 changes: 7 additions & 1 deletion ansible_base/resource_registry/models/service_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ def save(self, *args, **kwargs):
def service_id():
global _service_id
if not _service_id:
_service_id = str(ServiceID.objects.first().pk)
service_obj = ServiceID.objects.first()
if service_obj:
_service_id = str(service_obj.pk)
else:
# Create a ServiceID if none exists
service_obj = ServiceID.objects.create()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is a major thing I had on my agenda.

https://issues.redhat.com/browse/AAP-51352

I had assumed we couldn't do it like this. But I'm not saying no.

_service_id = str(service_obj.pk)
return _service_id
19 changes: 12 additions & 7 deletions ansible_base/resource_registry/registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import namedtuple
from typing import List, Optional

from django.conf import settings
from django.contrib.auth import authenticate
from django.utils.translation import gettext_lazy as _

Expand All @@ -23,12 +24,16 @@ class ServiceAPIConfig:
This will be the interface for configuring the resource registry for each service.
"""

_default_resource_processors = {
"shared.team": ResourceTypeProcessor,
"shared.organization": ResourceTypeProcessor,
"shared.user": ResourceTypeProcessor,
"shared.roledefinition": RoleDefinitionProcessor,
}
@classmethod
def _get_default_resource_processors(cls):
processors = {
"shared.team": ResourceTypeProcessor,
"shared.organization": ResourceTypeProcessor,
"shared.user": ResourceTypeProcessor,
}
if 'ansible_base.rbac' in settings.INSTALLED_APPS:
processors["shared.roledefinition"] = RoleDefinitionProcessor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agreed.

This should cause no ill-effects. For the major services, we simply don't care about the case that the RBAC app is not installed.

return processors

custom_resource_processors = {}

Expand All @@ -43,7 +48,7 @@ def authenticate_local_user(username: str, password: str):

@classmethod
def get_processor(cls, resource_type):
combined_processors = {**cls._default_resource_processors, **cls.custom_resource_processors}
combined_processors = {**cls._get_default_resource_processors(), **cls.custom_resource_processors}
return combined_processors[resource_type]


Expand Down
15 changes: 15 additions & 0 deletions ansible_base/resource_registry/rest_client.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somewhat disagree with this. There is no problem with making requests to the RBAC related endpoints when RBAC is not installed locally.

Some methods are still invalid to call, but not all of these.

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
import requests
import urllib3
from django.apps import apps
from django.conf import settings

from ansible_base.resource_registry.resource_server import get_resource_server_config, get_service_token


def _check_rbac_installed():
"""Check if ansible_base.rbac is installed and raise RuntimeError if not."""
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
raise RuntimeError("This operation requires ansible_base.rbac to be installed")


ResourceRequestBody = namedtuple(
"ResourceRequestBody",
["ansible_id", "service_id", "is_partially_migrated", "resource_type", "resource_data"],
Expand Down Expand Up @@ -166,26 +174,31 @@ def get_resource_type_manifest(self, name, filters: Optional[dict] = None):

# RBAC related methods
def list_role_types(self, filters: Optional[dict] = None):
_check_rbac_installed()
return self._make_request("get", "role-types/", params=filters)

def list_role_permissions(self, filters: Optional[dict] = None):
_check_rbac_installed()
return self._make_request("get", "role-permissions/", params=filters)

def list_user_assignments(self, user_ansible_id: Optional[str] = None, filters: Optional[dict] = None):
"""List user role assignments."""
_check_rbac_installed()
params = (filters or {}).copy()
if user_ansible_id is not None:
params['user_ansible_id'] = user_ansible_id
return self._make_request("get", "role-user-assignments/", params=params)

def list_team_assignments(self, team_ansible_id: Optional[str] = None, filters: Optional[dict] = None):
"""List team role assignments."""
_check_rbac_installed()
params = (filters or {}).copy()
if team_ansible_id is not None:
params['team_ansible_id'] = team_ansible_id
return self._make_request("get", "role-team-assignments/", params=params)

def sync_assignment(self, assignment):
_check_rbac_installed()
from ansible_base.rbac.service_api.serializers import ServiceRoleTeamAssignmentSerializer, ServiceRoleUserAssignmentSerializer

if assignment._meta.model_name == 'roleuserassignment':
Expand All @@ -196,6 +209,7 @@ def sync_assignment(self, assignment):
return self._sync_assignment(serializer.data)

def sync_unassignment(self, role_definition, actor, content_object):
_check_rbac_installed()
data = {'role_definition': role_definition.name}
data[f'{actor._meta.model_name}_ansible_id'] = str(actor.resource.ansible_id)

Expand All @@ -214,6 +228,7 @@ def sync_unassignment(self, role_definition, actor, content_object):

def sync_object_deletion(self, content_object):
"""Sync object deletion to Gateway for cleanup of all related role assignments"""
_check_rbac_installed()
from ansible_base.rbac.models import DABContentType

# Get the content type information
Expand Down
28 changes: 21 additions & 7 deletions ansible_base/resource_registry/shared_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.conf import settings
from rest_framework import serializers
from rest_framework.exceptions import ValidationError

from ansible_base.rbac.models import DABContentType, DABPermission
from ansible_base.resource_registry.utils.resource_type_serializers import AnsibleResourceForeignKeyField, SharedResourceTypeSerializer
from ansible_base.resource_registry.utils.sso_provider import get_sso_provider_server

Expand Down Expand Up @@ -84,6 +84,10 @@ class LenientPermissionSlugListField(serializers.ListField):
child = serializers.CharField()

def to_internal_value(self, data):
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
raise RuntimeError("LenientPermissionSlugListField requires ansible_base.rbac to be installed")
from ansible_base.rbac.models import DABPermission
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems entirely unnecessary. The rest of changes in this file, yes.


data = super().to_internal_value(data)
return list(DABPermission.objects.filter(api_slug__in=data))

Expand All @@ -98,14 +102,24 @@ class RoleDefinitionType(SharedResourceTypeSerializer):
name = serializers.CharField()
description = serializers.CharField(default="", allow_blank=True)
managed = serializers.BooleanField()
content_type = serializers.SlugRelatedField(
slug_field='api_slug',
queryset=DABContentType.objects.all(),
allow_null=True,
default=None,
)
permissions = LenientPermissionSlugListField()

def __init__(self, *args, **kwargs):
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
raise RuntimeError("RoleDefinitionType requires ansible_base.rbac to be installed")

super().__init__(*args, **kwargs)

# Set up content_type field only when rbac is available
from ansible_base.rbac.models import DABContentType

self.fields['content_type'] = serializers.SlugRelatedField(
slug_field='api_slug',
queryset=DABContentType.objects.all(),
allow_null=True,
default=None,
)

def is_valid(self, raise_exception=False):
try:
return super().is_valid(raise_exception=raise_exception)
Expand Down
29 changes: 26 additions & 3 deletions ansible_base/resource_registry/tasks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
from django.db.utils import Error, IntegrityError
from requests import HTTPError

from ansible_base.rbac.models.role import AssignmentBase, RoleDefinition, RoleTeamAssignment, RoleUserAssignment
from ansible_base.resource_registry.models import Resource, ResourceType
from ansible_base.resource_registry.models.service_identifier import service_id
from ansible_base.resource_registry.registry import get_registry
from ansible_base.resource_registry.rest_client import ResourceAPIClient, get_resource_server_client

logger = logging.getLogger('ansible_base.resources_api.tasks.sync')

_is_rbac_installed = 'ansible_base.rbac' in settings.INSTALLED_APPS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, I want to bring this in-line.



class ManifestNotFound(HTTPError):
"""Raise when server returns 404 for a manifest"""
Expand Down Expand Up @@ -139,7 +140,9 @@ def fetch_manifest(
return [ManifestItem(**row) for row in csv_reader]


def get_ansible_id_or_pk(assignment: AssignmentBase) -> str:
def get_ansible_id_or_pk(assignment) -> str:
if not _is_rbac_installed:
raise RuntimeError("get_ansible_id_or_pk requires ansible_base.rbac to be installed")
# For object-scoped assignments, try to get the object's ansible_id
if assignment.content_type.model in ('organization', 'team'):
object_resource = Resource.objects.filter(object_id=assignment.object_id, content_type__model=assignment.content_type.model).first()
Expand All @@ -153,7 +156,9 @@ def get_ansible_id_or_pk(assignment: AssignmentBase) -> str:
return str(ansible_id_or_pk)


def get_content_object(role_definition: RoleDefinition, assignment_tuple: AssignmentTuple) -> Any:
def get_content_object(role_definition, assignment_tuple: AssignmentTuple) -> Any:
if not _is_rbac_installed:
raise RuntimeError("get_content_object requires ansible_base.rbac to be installed")
content_object = None
if role_definition.content_type.model in ('organization', 'team'):
object_resource = Resource.objects.get(ansible_id=assignment_tuple.ansible_id_or_pk)
Expand All @@ -167,6 +172,8 @@ def get_content_object(role_definition: RoleDefinition, assignment_tuple: Assign

def get_remote_assignments(api_client: ResourceAPIClient) -> set[AssignmentTuple]:
"""Fetch remote assignments from the resource server and convert to tuples."""
if not _is_rbac_installed:
raise RuntimeError("get_remote_assignments requires ansible_base.rbac to be installed")
assignments = set()

# Fetch user assignments with pagination
Expand Down Expand Up @@ -238,6 +245,10 @@ def get_remote_assignments(api_client: ResourceAPIClient) -> set[AssignmentTuple

def get_local_assignments() -> set[AssignmentTuple]:
"""Get local assignments and convert to tuples."""
if not _is_rbac_installed:
raise RuntimeError("get_local_assignments requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleTeamAssignment, RoleUserAssignment

assignments = set()

# Get user assignments
Expand Down Expand Up @@ -294,6 +305,10 @@ def get_local_assignments() -> set[AssignmentTuple]:

def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Delete a local assignment based on the tuple."""
if not _is_rbac_installed:
raise RuntimeError("delete_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

try:
role_definition = RoleDefinition.objects.get(name=assignment_tuple.role_definition_name)

Expand All @@ -320,6 +335,10 @@ def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:

def create_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Create a local assignment based on the tuple."""
if not _is_rbac_installed:
raise RuntimeError("create_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

try:
role_definition = RoleDefinition.objects.get(name=assignment_tuple.role_definition_name)

Expand Down Expand Up @@ -694,6 +713,10 @@ def _sync_assignments(self):
if not self.sync_assignments:
return

if not _is_rbac_installed:
self.write(">>> Skipping role assignments sync (rbac not installed)")
return

self.write(">>> Syncing role assignments")

try:
Expand Down
20 changes: 14 additions & 6 deletions test_app/resource_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from django.conf import settings
from django.contrib.auth import get_user_model

from ansible_base.authentication.models import Authenticator
from ansible_base.rbac.models import RoleDefinition
from ansible_base.resource_registry.registry import ResourceConfig, ServiceAPIConfig, SharedResource
from ansible_base.resource_registry.shared_types import OrganizationType, RoleDefinitionType, TeamType, UserType
from ansible_base.resource_registry.shared_types import OrganizationType, TeamType, UserType
from ansible_base.resource_registry.utils.resource_type_processor import ResourceTypeProcessor
from test_app.models import Organization, Original1, Proxy2, ResourceMigrationTestModel, Team

Expand Down Expand Up @@ -38,13 +38,21 @@ class APIConfig(ServiceAPIConfig):
Organization,
shared_resource=SharedResource(serializer=OrganizationType, is_provider=False),
),
ResourceConfig(
RoleDefinition,
shared_resource=SharedResource(serializer=RoleDefinitionType, is_provider=False),
),
# Authenticators won't be a shared resource in production, but it's a convenient model to use for testing.
ResourceConfig(Authenticator),
ResourceConfig(ResourceMigrationTestModel),
ResourceConfig(Original1),
ResourceConfig(Proxy2),
]

# Conditionally add RoleDefinition if RBAC is installed
if 'ansible_base.rbac' in settings.INSTALLED_APPS:
from ansible_base.rbac.models import RoleDefinition
from ansible_base.resource_registry.shared_types import RoleDefinitionType

RESOURCE_LIST.append(
ResourceConfig(
RoleDefinition,
shared_resource=SharedResource(serializer=RoleDefinitionType, is_provider=False),
)
)
Loading
Loading