|
42 | 42 | from auth_api.services.org import Org as OrgService |
43 | 43 | from auth_api.services.user import User as UserService |
44 | 44 | from auth_api.utils.enums import AccessType, AffiliationInvitationType, InvitationStatus, LoginSource, Status |
45 | | -from auth_api.utils.roles import ADMIN, COORDINATOR, STAFF |
| 45 | +from auth_api.utils.roles import ADMIN, COORDINATOR, STAFF, USER |
46 | 46 | from auth_api.utils.user_context import UserContext, user_context |
47 | 47 |
|
48 | 48 | from ..schemas.affiliation_invitation import AffiliationInvitationSchemaPublic |
@@ -270,7 +270,9 @@ def create_affiliation_invitation( |
270 | 270 | if from_org_id == to_org_id: |
271 | 271 | raise BusinessException(Error.DATA_ALREADY_EXISTS, None) |
272 | 272 |
|
273 | | - check_auth(org_id=from_org_id, one_of_roles=(ADMIN, COORDINATOR, STAFF)) |
| 273 | + AffiliationInvitation.check_auth_for_invitation( |
| 274 | + invitation_type=affiliation_invitation_type, from_org_id=from_org_id |
| 275 | + ) |
274 | 276 |
|
275 | 277 | entity, from_org, business = AffiliationInvitation._validate_prerequisites( |
276 | 278 | business_identifier=business_identifier, |
@@ -365,10 +367,10 @@ def get_business_name_from_alternative_name(business, business_name, business_id |
365 | 367 |
|
366 | 368 | def update_affiliation_invitation(self, user, invitation_origin, affiliation_invitation_info: Dict): |
367 | 369 | """Update the specified affiliation invitation with new data.""" |
368 | | - check_auth(org_id=self._model.from_org_id, one_of_roles=(ADMIN, COORDINATOR, STAFF)) |
369 | | - |
370 | 370 | invitation: AffiliationInvitationModel = self._model |
371 | 371 |
|
| 372 | + AffiliationInvitation.check_auth_for_invitation(invitation=self._model) |
| 373 | + |
372 | 374 | # Don't do any updates if the invitation is not in PENDING state |
373 | 375 | if invitation.invitation_status_code != InvitationStatus.PENDING.value: |
374 | 376 | return AffiliationInvitation(invitation) |
@@ -413,7 +415,7 @@ def delete_affiliation_invitation(invitation_id): |
413 | 415 | if not (invitation := AffiliationInvitationModel.find_invitation_by_id(invitation_id)): |
414 | 416 | raise BusinessException(Error.DATA_NOT_FOUND, None) |
415 | 417 |
|
416 | | - check_auth(org_id=invitation.from_org_id, one_of_roles=(ADMIN, COORDINATOR, STAFF)) |
| 418 | + AffiliationInvitation.check_auth_for_invitation(invitation=invitation) |
417 | 419 |
|
418 | 420 | if invitation.status == InvitationStatus.ACCEPTED.value: |
419 | 421 | invitation.is_deleted = True |
@@ -746,3 +748,20 @@ def refuse_affiliation_invitation(invitation_id: int, user: UserService): |
746 | 748 | ) |
747 | 749 |
|
748 | 750 | return AffiliationInvitation(invitation) |
| 751 | + |
| 752 | + @staticmethod |
| 753 | + def check_auth_for_invitation( |
| 754 | + invitation: AffiliationInvitationModel = None, |
| 755 | + invitation_type: AffiliationInvitationType = None, |
| 756 | + from_org_id: int = None, |
| 757 | + ): |
| 758 | + """Check if the user has the right to view the invitation.""" |
| 759 | + invitation_type = invitation_type or AffiliationInvitationType.from_value(invitation.type) |
| 760 | + from_org_id = from_org_id or invitation.from_org_id |
| 761 | + match invitation_type: |
| 762 | + case AffiliationInvitationType.REQUEST: |
| 763 | + check_auth(org_id=from_org_id, one_of_roles=(ADMIN, COORDINATOR, STAFF)) |
| 764 | + case AffiliationInvitationType.EMAIL: |
| 765 | + check_auth(org_id=from_org_id, one_of_roles=(ADMIN, COORDINATOR, USER, STAFF)) |
| 766 | + case _: |
| 767 | + raise BusinessException(Error.INVALID_AFFILIATION_INVITATION_TYPE, None) |
0 commit comments