Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ansible_base.rbac.models.content_type
import ansible_base.rbac.remote
from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def create_types_if_needed(apps, schema_editor):
create_DAB_contenttypes(apps=apps)



def migrate_content_type(apps, schema_editor):
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
ct_cls.objects.clear_cache()
Expand Down
45 changes: 45 additions & 0 deletions ansible_base/rbac/migrations/0009_object_created_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Generated by Django 4.2.21 on 2025-09-10 17:11
# Modified by Claude (Sonnet 4) - Added object_created field and data migration

from django.db import migrations, models
import django.utils.timezone


def populate_object_created_field(apps, schema_editor):
"""Populate the object_created field for existing role assignments."""
from ._utils import populate_object_created_field as _populate_object_created_field
return _populate_object_created_field(apps, schema_editor)


class Migration(migrations.Migration):

dependencies = [
('dab_rbac', '0008_remote_permissions_cleanup'),
]

operations = [
# Added field for a checksum like purpose for remote models
migrations.AddField(
model_name='roleuserassignment',
name='object_created',
field=models.DateTimeField(help_text='The created timestamp of related object, if applicable.', null=True),
),
migrations.AddField(
model_name='roleteamassignment',
name='object_created',
field=models.DateTimeField(help_text='The created timestamp of related object, if applicable.', null=True),
),
# Make assignment created timestamp backdateable
migrations.AlterField(
model_name='roleteamassignment',
name='created',
field=models.DateTimeField(default=django.utils.timezone.now, editable=False, help_text='The date/time this resource was created.'),
),
migrations.AlterField(
model_name='roleuserassignment',
name='created',
field=models.DateTimeField(default=django.utils.timezone.now, editable=False, help_text='The date/time this resource was created.'),
),
# Data migration to populate object_created field
migrations.RunPython(populate_object_created_field, migrations.RunPython.noop),
]
68 changes: 68 additions & 0 deletions ansible_base/rbac/migrations/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging
from datetime import datetime
from django.db import models

logger = logging.getLogger(__name__)

# This method has moved, and this is put here temporarily to make branch management easier
from ansible_base.rbac.management import create_dab_permissions as create_custom_permissions # noqa

Expand Down Expand Up @@ -45,3 +49,67 @@ def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_
for team_id in teams
]
RoleTeamAssignment.objects.bulk_create(team_assignments, ignore_conflicts=True)


def get_model_class_from_content_type(apps, content_type):
"""
Get a model class from a content type in a migration-safe way.

This is needed because content_type.model_class() is not available in migrations.
"""
try:
return apps.get_model(content_type.app_label, content_type.model)
except (LookupError, AttributeError):
return None


def populate_object_created_field(apps, schema_editor=None):
"""Populate the object_created field for existing role assignments."""
assignment_models = [
('roleuserassignment', 'RoleUserAssignment'),
('roleteamassignment', 'RoleTeamAssignment'),
]

updated_count = 0

for model_name, model_class_name in assignment_models:
assignment_cls = apps.get_model('dab_rbac', model_name)
assignments_to_update = assignment_cls.objects.filter(object_created__isnull=True)

for assignment in assignments_to_update:
object_created_value = None

# Try to get the actual object to extract its created timestamp
if assignment.object_id and assignment.content_type:
try:
# Get the model class from the old content_type field (before migration to DABContentType)
model_class = get_model_class_from_content_type(apps, assignment.content_type)
if model_class:
try:
# Try to get the actual object
actual_object = model_class.objects.get(pk=assignment.object_id)

# Try to get created timestamp from common field names
for field_name in ('created', 'created_at'):
if hasattr(actual_object, field_name):
val = getattr(actual_object, field_name)
if isinstance(val, datetime):
object_created_value = val
break
except (model_class.DoesNotExist, ValueError, TypeError):
# Object doesn't exist or can't be retrieved, skip
pass
except (AttributeError, LookupError):
# Content type or model class issues, skip
pass

# Update the assignment if we found a created timestamp
if object_created_value:
assignment.object_created = object_created_value
assignment.save(update_fields=['object_created'])
updated_count += 1

if updated_count:
logger.info(f'Populated object_created field for {updated_count} existing role assignments')

return updated_count
63 changes: 55 additions & 8 deletions ansible_base/rbac/models/role.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from collections.abc import Iterable
from datetime import datetime
from typing import Optional, Type, Union
from uuid import UUID

Expand All @@ -9,6 +10,7 @@
from django.db.models.functions import Cast
from django.db.models.query import QuerySet
from django.db.utils import IntegrityError
from django.utils import timezone
from django.utils.translation import gettext_lazy as _

# Django-rest-framework
Expand Down Expand Up @@ -63,6 +65,18 @@ def __getattr__(self, attr):
return rd


def get_created_timestamp(obj: Union[models.Model, RemoteObject]) -> Optional[datetime]:
"""Given some obj from the users app, try to infer the created timestamp"""
if isinstance(obj, RemoteObject):
return obj.created
for field_name in ('created', 'created_at'):
if hasattr(obj, field_name):
val = getattr(obj, field_name)
if isinstance(val, datetime):
return val
return None


class RoleDefinitionManager(models.Manager):
def contribute_to_class(self, cls: Type[models.Model], name: str) -> None:
"""After Django populates the model for the manager, attach the manager role manager"""
Expand Down Expand Up @@ -178,13 +192,15 @@ def __str__(self):
managed_str = ', managed=True'
return f'RoleDefinition(pk={self.id}, name={self.name}{managed_str})'

def give_global_permission(self, actor):
return self.give_or_remove_global_permission(actor, giving=True)
def give_global_permission(self, actor, assignment_created=None, assignment_object_created=None):
return self.give_or_remove_global_permission(
actor, giving=True, assignment_created=assignment_created, assignment_object_created=assignment_object_created
)

def remove_global_permission(self, actor):
return self.give_or_remove_global_permission(actor, giving=False)

def give_or_remove_global_permission(self, actor, giving=True):
def give_or_remove_global_permission(self, actor, giving=True, assignment_created=None, assignment_object_created=None):
if giving and (self.content_type is not None):
raise ValidationError('Role definition content type must be null to assign globally')

Expand All @@ -202,6 +218,10 @@ def give_or_remove_global_permission(self, actor, giving=True):
raise RuntimeError(f'Cannot {giving and "give" or "remove"} permission for {actor}, must be a user or team')

if giving:
if assignment_created:
kwargs['created'] = assignment_created
if assignment_object_created:
kwargs['object_created'] = assignment_object_created
assignment, _ = cls.objects.get_or_create(**kwargs)
else:
assignment = cls.objects.filter(**kwargs).first()
Expand All @@ -221,8 +241,10 @@ def give_or_remove_global_permission(self, actor, giving=True):

return assignment

def give_permission(self, actor, content_object):
return self.give_or_remove_permission(actor, content_object, giving=True)
def give_permission(self, actor, content_object, assignment_created=None, assignment_object_created=None):
return self.give_or_remove_permission(
actor, content_object, giving=True, assignment_created=assignment_created, assignment_object_created=assignment_object_created
)

def remove_permission(self, actor, content_object):
return self.give_or_remove_permission(actor, content_object, giving=False)
Expand All @@ -247,7 +269,7 @@ def get_or_create_object_role(self, kwargs, defaults):
object_role = ObjectRole.objects.create(**kwargs, **defaults)
return (object_role, True)

def give_or_remove_permission(self, actor, content_object, giving=True, sync_action=False):
def give_or_remove_permission(self, actor, content_object, giving=True, sync_action=False, assignment_created=None, assignment_object_created=None):
"Shortcut method to do whatever needed to give user or team these permissions"
validate_assignment(self, actor, content_object)

Expand Down Expand Up @@ -278,15 +300,32 @@ def give_or_remove_permission(self, actor, content_object, giving=True, sync_act

update_teams, to_update = needed_updates_on_assignment(self, actor, object_role, created=created, giving=True)

assignment_defaults = {}
# Use provided object_created if available, otherwise get from content_object
if assignment_object_created:
assignment_defaults['object_created'] = assignment_object_created
else:
object_created = get_created_timestamp(content_object)
if object_created:
assignment_defaults['object_created'] = object_created
if assignment_created:
assignment_defaults['created'] = assignment_created

assignment = None
if actor._meta.model_name == 'user':
if giving:
assignment, created = RoleUserAssignment.objects.get_or_create(user=actor, object_role=object_role)
try:
assignment = RoleUserAssignment.objects.get(user=actor, object_role=object_role)
except RoleUserAssignment.DoesNotExist:
assignment = RoleUserAssignment.objects.create(user=actor, object_role=object_role, **assignment_defaults)
else:
object_role.users.remove(actor)
elif isinstance(actor, permission_registry.team_model):
if giving:
assignment, created = RoleTeamAssignment.objects.get_or_create(team=actor, object_role=object_role)
try:
assignment = RoleTeamAssignment.objects.get(team=actor, object_role=object_role)
except RoleTeamAssignment.DoesNotExist:
assignment = RoleTeamAssignment.objects.create(team=actor, object_role=object_role, **assignment_defaults)
else:
object_role.teams.remove(actor)

Expand Down Expand Up @@ -410,6 +449,14 @@ class AssignmentBase(ImmutableCommonModel, ObjectRoleFields):
null=True, blank=True, help_text=_('The primary key of the object this assignment applies to; null value indicates system-wide assignment.')
)
content_type = models.ForeignKey(DABContentType, on_delete=models.CASCADE, null=True, help_text=_("The content type this applies to."))
# The object_created field can be used for a checksum-like purpose to verify nothing strange happened with the related object
object_created = models.DateTimeField(help_text=_("The created timestamp of related object, if applicable."), null=True)
# Define this with default to make it possible to backdate if necessary, for sync
created = models.DateTimeField(
default=timezone.now,
editable=False,
help_text=_("The date/time this resource was created."),
)

# object_role is internal, and not shown in serializer
# content_type does not have a link, and ResourceType will be used in lieu sometime
Expand Down
4 changes: 3 additions & 1 deletion ansible_base/rbac/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ def __init__(self, ct: models.Model, abstract=False):
class RemoteObject:
"""Placeholder for objects that live in another project."""

def __init__(self, content_type: models.Model, object_id: Union[int, str], parent_reference=None):
def __init__(self, content_type: models.Model, object_id: Union[int, str], parent_reference=None, created=None):
self.content_type = content_type
self.object_id = object_id
# Allow tracking details of the object
self.created = created
# Since object is remote, we do not have its properties here, so a pointer to the parent can be specified here
self.parent_reference = parent_reference
if not hasattr(self, '_meta'):
Expand Down
14 changes: 11 additions & 3 deletions ansible_base/rbac/service_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def to_internal_value(self, value):
return resource.object_id


assignment_common_fields = ('created', 'created_by_ansible_id', 'object_id', 'object_ansible_id', 'content_type', 'role_definition')
assignment_common_fields = ('created', 'object_created', 'created_by_ansible_id', 'object_id', 'object_ansible_id', 'content_type', 'role_definition')


class BaseAssignmentSerializer(serializers.ModelSerializer):
Expand All @@ -57,6 +57,10 @@ class BaseAssignmentSerializer(serializers.ModelSerializer):
object_ansible_id = ObjectIDAnsibleIDField(source='object_id', required=False, allow_null=True)
object_id = serializers.CharField(allow_blank=True, required=False, allow_null=True)
from_service = serializers.CharField(write_only=True)
# Force created field to be writable
created = serializers.DateTimeField(required=False)
# Force object_created field to be writable
object_created = serializers.DateTimeField(required=False, allow_null=True)

def to_representation(self, instance):
# hack to surface content_object for ObjectIDAnsibleIDField
Expand Down Expand Up @@ -131,10 +135,14 @@ def create(self, validated_data):
raise serializers.ValidationError({'object_id': _('Object must be specified for this role assignment')})

with transaction.atomic():
assignment = rd.give_permission(actor, obj)
assignment = rd.give_permission(
actor, obj, assignment_created=validated_data.get('created'), assignment_object_created=validated_data.get('object_created')
)
else:
with transaction.atomic():
assignment = rd.give_global_permission(actor)
assignment = rd.give_global_permission(
actor, assignment_created=validated_data.get('created'), assignment_object_created=validated_data.get('object_created')
)

return assignment

Expand Down
56 changes: 56 additions & 0 deletions test_app/tests/rbac/remote/test_remote_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,59 @@ def test_org_roles_same_type_different_service(rando, organization):
], f'User should have permission to exactly {service_name} resource'

rds[service_name].remove_permission(rando, organization)


@pytest.mark.django_db
def test_object_created_field_local_model(rando, org_inv_rd, organization):
"""
Test that the object_created field is set to the local object's created timestamp
when creating an assignment for a local model like inventory.

This test should FAIL because the object_created field doesn't exist yet.
"""
# Create the assignment
assignment = org_inv_rd.give_permission(rando, organization)

# Verify the assignment was created
assert assignment.user == rando
assert assignment.role_definition == org_inv_rd
assert assignment.object_id == organization.pk

# This should FAIL - the object_created field should be set to inventory.created
assert hasattr(assignment, 'object_created'), "Assignment should have an object_created field"

assert assignment.object_created == organization.created, (
f"object_created should be set to the organization's created timestamp. "
f"Expected: {organization.created}, but object_created field is missing or has wrong value"
)


@pytest.mark.django_db
def test_object_created_field_remote_object(rando, foo_type, foo_rd):
"""
Test that the object_created field is properly handled when creating an assignment
for a remote object (stand-in object pattern).

This test should FAIL because the object_created field doesn't exist yet.
"""
# Create a remote object stand-in
remote_foo = RemoteObject(content_type=foo_type, object_id=42)

# Create the assignment
assignment = foo_rd.give_permission(rando, remote_foo)

# Verify the assignment was created
assert assignment.user == rando
assert assignment.role_definition == foo_rd
assert assignment.object_id == 42
assert isinstance(assignment.content_object, RemoteObject)

# This should FAIL - the object_created field should exist and be handled for remote objects
assert hasattr(assignment, 'object_created'), "Assignment should have an object_created field even for remote objects"

# For remote objects, the object_created field might be None or set to a default value
# since we don't have the actual remote object's creation timestamp
# The exact behavior will depend on implementation, but the field should exist
assert assignment.object_created is not None or assignment.object_created is None, (
f"object_created field should exist for remote objects. " f"Current value: {getattr(assignment, 'object_created', 'FIELD_MISSING')}"
)
Loading
Loading