Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions ansible_base/authentication/utils/claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ansible_base.authentication.models import Authenticator, AuthenticatorMap, AuthenticatorUser
from ansible_base.authentication.utils.authenticator_map import check_role_type, expand_syntax
from ansible_base.lib.abstract_models import AbstractOrganization, AbstractTeam, CommonModel
from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
from ansible_base.lib.utils.string import is_empty

Expand All @@ -30,9 +31,6 @@
User = get_user_model()


is_rbac_installed = 'ansible_base.rbac' in settings.INSTALLED_APPS


class TriggerResult(Enum):
ALLOW = auto()
DENY = auto()
Expand Down Expand Up @@ -723,7 +721,7 @@ def reconcile_user_claims(cls, user: AbstractUser, authenticator_user: Authentic

claims = getattr(user, 'claims', authenticator_user.claims)

if is_rbac_installed:
if is_rbac_installed():
cls(claims, user, authenticator_user).manage_permissions()
else:
logger.info(_("Skipping user claims with RBAC roles, because RBAC app is not installed"))
Expand Down Expand Up @@ -878,7 +876,7 @@ def __init__(self):
self.cache = {}
# NOTE(cutwater): We may probably execute this query once and cache the query results.
self.content_types = {}
if is_rbac_installed:
if is_rbac_installed():
from ansible_base.rbac.models import DABContentType

self.content_types = {content_type.model: content_type for content_type in DABContentType.objects.get_for_models(Organization, Team).values()}
Expand Down Expand Up @@ -962,7 +960,7 @@ def cache_existing(self, role_assignments: Iterable[models.Model]) -> None:
- Role definitions are also cached separately in self.role_definitions
"""
local_resource_prefixes = ["shared"]
if is_rbac_installed:
if is_rbac_installed():
from ansible_base.rbac.remote import get_local_resource_prefix

local_resource_prefixes.append(get_local_resource_prefix())
Expand Down
4 changes: 4 additions & 0 deletions ansible_base/lib/utils/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
def is_rbac_installed() -> bool:
from django.conf import settings

return bool('ansible_base.rbac' in settings.INSTALLED_APPS)
8 changes: 1 addition & 7 deletions ansible_base/resource_registry/models/service_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,5 @@ def save(self, *args, **kwargs):
def service_id():
global _service_id
if not _service_id:
service_obj = ServiceID.objects.first()
if service_obj:
_service_id = str(service_obj.pk)
else:
# Create a ServiceID if none exists
service_obj = ServiceID.objects.create()
_service_id = str(service_obj.pk)
_service_id = str(ServiceID.objects.first().pk)
return _service_id
7 changes: 2 additions & 5 deletions ansible_base/resource_registry/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from django.apps import apps
from django.conf import settings

from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.resource_server import get_resource_server_config, get_service_token


def _check_rbac_installed():
"""Check if ansible_base.rbac is installed and raise RuntimeError if not."""
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
if not is_rbac_installed():
raise RuntimeError("This operation requires ansible_base.rbac to be installed")


Expand Down Expand Up @@ -174,24 +175,20 @@ def get_resource_type_manifest(self, name, filters: Optional[dict] = None):

# RBAC related methods
def list_role_types(self, filters: Optional[dict] = None):
_check_rbac_installed()
return self._make_request("get", "role-types/", params=filters)

def list_role_permissions(self, filters: Optional[dict] = None):
_check_rbac_installed()
return self._make_request("get", "role-permissions/", params=filters)

def list_user_assignments(self, user_ansible_id: Optional[str] = None, filters: Optional[dict] = None):
"""List user role assignments."""
_check_rbac_installed()
params = (filters or {}).copy()
if user_ansible_id is not None:
params['user_ansible_id'] = user_ansible_id
return self._make_request("get", "role-user-assignments/", params=params)

def list_team_assignments(self, team_ansible_id: Optional[str] = None, filters: Optional[dict] = None):
"""List team role assignments."""
_check_rbac_installed()
params = (filters or {}).copy()
if team_ansible_id is not None:
params['team_ansible_id'] = team_ansible_id
Expand Down
6 changes: 2 additions & 4 deletions ansible_base/resource_registry/shared_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.conf import settings
from rest_framework import serializers
from rest_framework.exceptions import ValidationError

from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.utils.resource_type_serializers import AnsibleResourceForeignKeyField, SharedResourceTypeSerializer
from ansible_base.resource_registry.utils.sso_provider import get_sso_provider_server

Expand Down Expand Up @@ -84,8 +84,6 @@ class LenientPermissionSlugListField(serializers.ListField):
child = serializers.CharField()

def to_internal_value(self, data):
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
raise RuntimeError("LenientPermissionSlugListField requires ansible_base.rbac to be installed")
from ansible_base.rbac.models import DABPermission

data = super().to_internal_value(data)
Expand All @@ -105,7 +103,7 @@ class RoleDefinitionType(SharedResourceTypeSerializer):
permissions = LenientPermissionSlugListField()

def __init__(self, *args, **kwargs):
if 'ansible_base.rbac' not in settings.INSTALLED_APPS:
if not is_rbac_installed():
raise RuntimeError("RoleDefinitionType requires ansible_base.rbac to be installed")

super().__init__(*args, **kwargs)
Expand Down
17 changes: 7 additions & 10 deletions ansible_base/resource_registry/tasks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from django.db.utils import Error, IntegrityError
from requests import HTTPError

from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.models import Resource, ResourceType
from ansible_base.resource_registry.models.service_identifier import service_id
from ansible_base.resource_registry.registry import get_registry
from ansible_base.resource_registry.rest_client import ResourceAPIClient, get_resource_server_client

logger = logging.getLogger('ansible_base.resources_api.tasks.sync')

_is_rbac_installed = 'ansible_base.rbac' in settings.INSTALLED_APPS


class ManifestNotFound(HTTPError):
"""Raise when server returns 404 for a manifest"""
Expand Down Expand Up @@ -141,7 +140,7 @@ def fetch_manifest(


def get_ansible_id_or_pk(assignment) -> str:
if not _is_rbac_installed:
if not is_rbac_installed():
raise RuntimeError("get_ansible_id_or_pk requires ansible_base.rbac to be installed")
# For object-scoped assignments, try to get the object's ansible_id
if assignment.content_type.model in ('organization', 'team'):
Expand All @@ -157,7 +156,7 @@ def get_ansible_id_or_pk(assignment) -> str:


def get_content_object(role_definition, assignment_tuple: AssignmentTuple) -> Any:
if not _is_rbac_installed:
if not is_rbac_installed():
raise RuntimeError("get_content_object requires ansible_base.rbac to be installed")
content_object = None
if role_definition.content_type.model in ('organization', 'team'):
Expand All @@ -172,8 +171,6 @@ def get_content_object(role_definition, assignment_tuple: AssignmentTuple) -> An

def get_remote_assignments(api_client: ResourceAPIClient) -> set[AssignmentTuple]:
"""Fetch remote assignments from the resource server and convert to tuples."""
if not _is_rbac_installed:
raise RuntimeError("get_remote_assignments requires ansible_base.rbac to be installed")
assignments = set()

# Fetch user assignments with pagination
Expand Down Expand Up @@ -245,7 +242,7 @@ def get_remote_assignments(api_client: ResourceAPIClient) -> set[AssignmentTuple

def get_local_assignments() -> set[AssignmentTuple]:
"""Get local assignments and convert to tuples."""
if not _is_rbac_installed:
if not is_rbac_installed():
raise RuntimeError("get_local_assignments requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleTeamAssignment, RoleUserAssignment

Expand Down Expand Up @@ -305,7 +302,7 @@ def get_local_assignments() -> set[AssignmentTuple]:

def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Delete a local assignment based on the tuple."""
if not _is_rbac_installed:
if not is_rbac_installed():
raise RuntimeError("delete_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

Expand Down Expand Up @@ -335,7 +332,7 @@ def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:

def create_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Create a local assignment based on the tuple."""
if not _is_rbac_installed:
if not is_rbac_installed():
raise RuntimeError("create_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

Expand Down Expand Up @@ -713,7 +710,7 @@ def _sync_assignments(self):
if not self.sync_assignments:
return

if not _is_rbac_installed:
if not is_rbac_installed():
self.write(">>> Skipping role assignments sync (rbac not installed)")
return

Expand Down
Loading