|
| 1 | +# Python |
1 | 2 | import logging |
2 | 3 |
|
3 | | -from django.contrib.contenttypes.models import ContentType |
4 | | - |
5 | 4 | from ansible_base.jwt_consumer.common.auth import JWTAuthentication |
6 | | -from ansible_base.jwt_consumer.common.exceptions import InvalidService |
7 | | -from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment |
8 | | -from ansible_base.resource_registry.models import Resource |
9 | | - |
10 | | -logger = logging.getLogger('ansible_base.jwt_consumer.hub.auth') |
11 | 5 |
|
12 | 6 |
|
13 | 7 | class HubJWTAuth(JWTAuthentication): |
14 | | - |
15 | | - def get_galaxy_models(self): |
16 | | - '''This is separate from process_permissions purely for testability.''' |
17 | | - try: |
18 | | - from galaxy_ng.app.models import Organization, Team |
19 | | - except ImportError: |
20 | | - raise InvalidService("automation-hub") |
21 | | - |
22 | | - return Organization, Team |
23 | | - |
24 | | - def process_permissions(self): |
25 | | - # Map teams in the JWT to Automation Hub groups. |
26 | | - Organization, Team = self.get_galaxy_models() |
27 | | - self.team_content_type = ContentType.objects.get_for_model(Team) |
28 | | - self.org_content_type = ContentType.objects.get_for_model(Organization) |
29 | | - |
30 | | - # TODO - galaxy does not have an org admin roledef yet |
31 | | - # admin_orgs = [] |
32 | | - |
33 | | - # TODO - galaxy does not have an org member roledef yet |
34 | | - # member_orgs = [] |
35 | | - |
36 | | - # The "shared" [!local] teams this user admins |
37 | | - admin_teams = [] |
38 | | - |
39 | | - # the teams this user should have a "shared" [!local] assignment to |
40 | | - member_teams = [] |
41 | | - |
42 | | - for role_name in self.common_auth.token.get('object_roles', {}).keys(): |
43 | | - if role_name.startswith('Team'): |
44 | | - for object_index in self.common_auth.token['object_roles'][role_name]['objects']: |
45 | | - team_data = self.common_auth.token['objects']['team'][object_index] |
46 | | - ansible_id = team_data['ansible_id'] |
47 | | - try: |
48 | | - team = Resource.objects.get(ansible_id=ansible_id).content_object |
49 | | - except Resource.DoesNotExist: |
50 | | - team = self.common_auth.get_or_create_resource('team', team_data)[1] |
51 | | - |
52 | | - if role_name == 'Team Admin': |
53 | | - admin_teams.append(team) |
54 | | - elif role_name == 'Team Member': |
55 | | - member_teams.append(team) |
56 | | - |
57 | | - for roledef_name, teams in [('Team Admin', admin_teams), ('Team Member', member_teams)]: |
58 | | - |
59 | | - # the "shared" "non-local" definition ... |
60 | | - roledef = RoleDefinition.objects.get(name=roledef_name) |
61 | | - |
62 | | - # pks for filtering ... |
63 | | - team_pks = [team.pk for team in teams] |
64 | | - |
65 | | - # delete all assignments not defined by this jwt ... |
66 | | - for assignment in RoleUserAssignment.objects.filter(user=self.common_auth.user, role_definition=roledef).exclude(object_id__in=team_pks): |
67 | | - team = Team.objects.get(pk=assignment.object_id) |
68 | | - roledef.remove_permission(self.common_auth.user, team) |
69 | | - |
70 | | - # assign "non-local" for each team ... |
71 | | - for team in teams: |
72 | | - roledef.give_permission(self.common_auth.user, team) |
73 | | - |
74 | | - auditor_roledef = RoleDefinition.objects.get(name='Platform Auditor') |
75 | | - if "Platform Auditor" in self.common_auth.token.get('global_roles', []): |
76 | | - auditor_roledef.give_global_permission(self.common_auth.user) |
77 | | - else: |
78 | | - auditor_roledef.remove_global_permission(self.common_auth.user) |
| 8 | + use_rbac_permissions = True |
0 commit comments