|
1 | 1 | import logging
|
2 |
| -from contextlib import contextmanager |
3 | 2 | from typing import Optional, Union
|
4 | 3 | from uuid import UUID
|
5 | 4 |
|
@@ -285,90 +284,6 @@ def post_migration_rbac_setup(sender, *args, **kwargs):
|
285 | 284 | compute_object_role_permissions()
|
286 | 285 |
|
287 | 286 |
|
288 |
| -class TrackedRelationship: |
289 |
| - def __init__(self, cls, role_name): |
290 |
| - self.cls = cls |
291 |
| - self.role_name = role_name |
292 |
| - self.user_relationship = None |
293 |
| - self.team_relationship = None |
294 |
| - self._active_sync_flag = False |
295 |
| - |
296 |
| - def initialize(self, relationship): |
297 |
| - manager = getattr(self.cls, relationship) |
298 |
| - related_model_name = manager.field.related_model._meta.model_name |
299 |
| - if related_model_name == permission_registry.team_model._meta.model_name: |
300 |
| - self.team_relationship = relationship |
301 |
| - m2m_changed.connect(self.sync_team_to_role, sender=manager.through) |
302 |
| - elif related_model_name == permission_registry.user_model._meta.model_name: |
303 |
| - self.user_relationship = relationship |
304 |
| - m2m_changed.connect(self.sync_user_to_role, sender=manager.through) |
305 |
| - else: |
306 |
| - raise RuntimeError(f'Can only register user or team relationships, obtained {related_model_name}') |
307 |
| - |
308 |
| - @contextmanager |
309 |
| - def sync_active(self): |
310 |
| - try: |
311 |
| - self._active_sync_flag = True |
312 |
| - yield |
313 |
| - finally: |
314 |
| - self._active_sync_flag = False |
315 |
| - |
316 |
| - def sync_relationship(self, actor, content_object, giving=True): |
317 |
| - # Exit if role does not apply for the intended model type, for example |
318 |
| - # if user is given "team-member" role to organization, do not add user to the team members |
319 |
| - if content_object._meta.model_name != self.cls._meta.model_name: |
320 |
| - return |
321 |
| - |
322 |
| - if actor._meta.model_name == permission_registry.team_model._meta.model_name: |
323 |
| - if self.team_relationship is None: |
324 |
| - return |
325 |
| - manager = getattr(content_object, self.team_relationship) |
326 |
| - elif actor._meta.model_name == permission_registry.user_model._meta.model_name: |
327 |
| - if self.user_relationship is None: |
328 |
| - return |
329 |
| - manager = getattr(content_object, self.user_relationship) |
330 |
| - |
331 |
| - if giving: |
332 |
| - manager.add(actor) |
333 |
| - else: |
334 |
| - manager.remove(actor) |
335 |
| - |
336 |
| - def _sync_actor_to_role(self, actor_model: type, instance: Model, action: str, pk_set: Optional[set[int]]): |
337 |
| - if self._active_sync_flag: |
338 |
| - return |
339 |
| - if action.startswith('pre_'): |
340 |
| - return |
341 |
| - rd = RoleDefinition.objects.get(name=self.role_name) |
342 |
| - |
343 |
| - if action in ('post_add', 'post_remove'): |
344 |
| - actor_set = pk_set |
345 |
| - elif action == 'post_clear': |
346 |
| - ct = permission_registry.content_type_model.objects.get_for_model(instance) |
347 |
| - role = ObjectRole.objects.get(object_id=instance.pk, content_type=ct, role_definition=rd) |
348 |
| - if actor_model._meta.model_name == 'team': |
349 |
| - actor_set = set(role.teams.values_list('id', flat=True)) |
350 |
| - else: |
351 |
| - actor_set = set(role.users.values_list('id', flat=True)) |
352 |
| - |
353 |
| - giving = bool(action == 'post_add') |
354 |
| - for actor in actor_model.objects.filter(pk__in=actor_set): |
355 |
| - rd.give_or_remove_permission(actor, instance, giving=giving, sync_action=True) |
356 |
| - |
357 |
| - def sync_team_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): |
358 |
| - if not reverse: |
359 |
| - self._sync_actor_to_role(permission_registry.team_model, instance, action, pk_set) |
360 |
| - else: |
361 |
| - for pk in pk_set: |
362 |
| - self._sync_actor_to_role(permission_registry.team_model, model(pk=pk), action, {instance.pk}) |
363 |
| - |
364 |
| - def sync_user_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): |
365 |
| - if not reverse: |
366 |
| - self._sync_actor_to_role(permission_registry.user_model, instance, action, pk_set) |
367 |
| - else: |
368 |
| - for pk in pk_set: |
369 |
| - self._sync_actor_to_role(permission_registry.user_model, model(pk=pk), action, {instance.pk}) |
370 |
| - |
371 |
| - |
372 | 287 | def connect_rbac_signals(cls):
|
373 | 288 | if cls._meta.model_name == permission_registry.team_model._meta.model_name:
|
374 | 289 | pre_delete.connect(team_pre_delete, sender=cls, dispatch_uid='stash-team-roles-before-delete')
|
|
0 commit comments