|
13 | 13 | OktaGroup, |
14 | 14 | OktaGroupTagMap, |
15 | 15 | OktaUser, |
| 16 | + OktaUserGroupMember, |
16 | 17 | RoleGroup, |
17 | 18 | RoleRequest, |
| 19 | + Tag, |
18 | 20 | ) |
19 | 21 | from api.models.app_group import get_access_owners, get_app_managers |
20 | 22 | from api.models.okta_group import get_group_managers |
| 23 | +from api.models.tag import coalesce_constraints |
21 | 24 | from api.operations.approve_role_request import ApproveRoleRequest |
22 | 25 | from api.operations.reject_role_request import RejectRoleRequest |
23 | 26 | from api.plugins import get_conditional_access_hook, get_notification_hook |
@@ -46,12 +49,23 @@ def __init__( |
46 | 49 | self.requester_role = ( |
47 | 50 | RoleGroup.query.filter(RoleGroup.deleted_at.is_(None)).filter(RoleGroup.id == requester_role).first() |
48 | 51 | ) |
| 52 | + # self.requester_role = ( |
| 53 | + # db.session.query(RoleGroup) |
| 54 | + # .options(joinedload(OktaUserGroupMember.user)) |
| 55 | + # .filter(RoleGroup.deleted_at.is_(None)) |
| 56 | + # .filter(RoleGroup.id == requester_role) |
| 57 | + # .first() |
| 58 | + # ) |
49 | 59 | else: |
50 | 60 | self.requester_role = requester_role |
51 | 61 |
|
52 | 62 | self.requested_group = ( |
53 | 63 | db.session.query(OktaGroup) |
54 | | - .options(selectin_polymorphic(OktaGroup, [AppGroup]), joinedload(AppGroup.app)) |
| 64 | + .options( |
| 65 | + selectin_polymorphic(OktaGroup, [AppGroup]), |
| 66 | + joinedload(AppGroup.app), |
| 67 | + selectinload(OktaGroup.active_group_tags).options(joinedload(OktaGroupTagMap.active_tag)), |
| 68 | + ) |
55 | 69 | .filter(OktaGroup.deleted_at.is_(None)) |
56 | 70 | .filter(OktaGroup.id == (requested_group if isinstance(requested_group, str) else requested_group.id)) |
57 | 71 | .first() |
@@ -90,6 +104,39 @@ def execute(self) -> Optional[RoleRequest]: |
90 | 104 | # Fetch the users to notify |
91 | 105 | approvers = get_group_managers(self.requested_group.id) |
92 | 106 |
|
| 107 | + requested_group_tags = [tm.active_tag for tm in self.requested_group.active_group_tags] |
| 108 | + |
| 109 | + role_memberships = [ |
| 110 | + u.user_id |
| 111 | + for u in ( |
| 112 | + OktaUserGroupMember.query.filter(OktaUserGroupMember.group_id == self.requester_role.id) |
| 113 | + .filter(OktaUserGroupMember.is_owner.is_(False)) |
| 114 | + .filter( |
| 115 | + db.or_( |
| 116 | + OktaUserGroupMember.ended_at.is_(None), |
| 117 | + OktaUserGroupMember.ended_at > db.func.now(), |
| 118 | + ) |
| 119 | + ) |
| 120 | + .all() |
| 121 | + ) |
| 122 | + ] |
| 123 | + |
| 124 | + # If group tagged with disallow self add constraint, filter out approvers who are also members of the role |
| 125 | + if self.request_ownership: |
| 126 | + disallow_self_add_owner = coalesce_constraints( |
| 127 | + constraint_key=Tag.DISALLOW_SELF_ADD_OWNERSHIP_CONSTRAINT_KEY, |
| 128 | + tags=requested_group_tags, |
| 129 | + ) |
| 130 | + if disallow_self_add_owner: |
| 131 | + approvers = [a for a in approvers if a.id not in role_memberships] |
| 132 | + else: |
| 133 | + disallow_self_add_member = coalesce_constraints( |
| 134 | + constraint_key=Tag.DISALLOW_SELF_ADD_MEMBERSHIP_CONSTRAINT_KEY, |
| 135 | + tags=requested_group_tags, |
| 136 | + ) |
| 137 | + if disallow_self_add_member: |
| 138 | + approvers = [a for a in approvers if a.id not in role_memberships] |
| 139 | + |
93 | 140 | # If there are no approvers, try to get the app managers |
94 | 141 | # or if the only approver is the requester, try to get the app managers |
95 | 142 | if ( |
|
0 commit comments