1- from typing import Dict , List , NamedTuple
1+ from typing import Any , Dict , List , NamedTuple , Optional
22
33import requests
4+ from enums .lambda_error import LambdaError
45from enums .repository_role import OrganisationRelationship
6+ from services .base .ssm_service import SSMService
57from services .token_handler_ssm_service import TokenHandlerSSMService
68from utils .audit_logging_setup import LoggingService
9+ from utils .constants .ssm import GP_ORG_ROLE_CODE
710from utils .exceptions import (
811 OdsErrorException ,
912 OrganisationNotFoundException ,
1013 TooManyOrgsException ,
1114)
15+ from utils .lambda_exceptions import LoginException
1216
1317logger = LoggingService (__name__ )
1418
1519token_handler_ssm_service = TokenHandlerSSMService ()
20+ ssm_service = SSMService ()
1621
1722
1823class Organisation (NamedTuple ):
@@ -51,32 +56,53 @@ def fetch_organisation_with_permitted_role(self, ods_code_list: list[str]) -> Di
5156 ods_code = ods_code_list [0 ]
5257 logger .info (f"ods_code selected: { ods_code } " )
5358
59+ itoc_ods_codes = token_handler_ssm_service .get_itoc_ods_codes ()
60+
61+ if ods_code in itoc_ods_codes :
62+ logger .info (f"ODS code { ods_code } is ITOC, returning org data" )
63+ return parse_ods_response ({}, "" , "ITOC" )
64+
5465 org_data = self .fetch_organisation_data (ods_code )
5566
5667 logger .info (f"Org Data: { org_data } " )
5768
58- pcse_ods = find_and_get_pcse_ods (ods_code )
69+ gp_org_role_code = get_user_gp_org_role_code (org_data )
70+
71+ if gp_org_role_code is not None :
72+ logger .info (f"ODS code { ods_code } is a GP, returning org data" )
73+ icb_ods_code = find_icb_for_user (org_data ["Organisation" ])
74+ response = parse_ods_response (org_data , gp_org_role_code , icb_ods_code )
75+ return response
76+
77+ pcse_ods_code = token_handler_ssm_service .get_pcse_ods_code ()
5978
60- if pcse_ods is not None :
79+ if ods_code == pcse_ods_code :
6180 logger .info (f"ODS code { ods_code } is PCSE, returning org data" )
6281 response = parse_ods_response (org_data , "" , "PCSE" )
6382 return response
6483
65- gpp_org = find_and_get_gpp_org_code (org_data )
84+ allowed_ods_code_list = (
85+ token_handler_ssm_service .get_allowed_list_of_ods_codes ()
86+ )
6687
67- if gpp_org is not None :
68- logger .info (f"ODS code { ods_code } is a GPP , returning org data" )
88+ if ods_code in allowed_ods_code_list :
89+ logger .info (f"ODS code { ods_code } is in allowed list , returning org data" )
6990 icb_ods_code = find_icb_for_user (org_data ["Organisation" ])
70- response = parse_ods_response (org_data , gpp_org , icb_ods_code )
91+ primary_org_role_code = get_user_primary_org_role_code (org_data )
92+ response = parse_ods_response (org_data , primary_org_role_code , icb_ods_code )
7193 return response
7294
73- logger .info (f"ODS code { ods_code } is not a GPP or PCSE, returning empty list" )
95+ logger .info (
96+ f"ODS code { ods_code } is not a GP, PCSE, ITOC nor in allowed list, returning empty list"
97+ )
7498 return {}
7599
76100
77101def parse_ods_response (org_data , role_code , icb_ods_code ) -> dict :
78- org_name = org_data ["Organisation" ]["Name" ]
79- org_ods_code = org_data ["Organisation" ]["OrgId" ]["extension" ]
102+ org_name = org_data .get ("Organisation" , {}).get ("Name" , "" )
103+ org_ods_code = (
104+ org_data .get ("Organisation" , {}).get ("OrgId" , {}).get ("extension" , "" )
105+ )
80106
81107 response_dictionary = {
82108 "name" : org_name ,
@@ -89,25 +115,36 @@ def parse_ods_response(org_data, role_code, icb_ods_code) -> dict:
89115 return response_dictionary
90116
91117
92- def find_and_get_gpp_org_code ( org_details ) :
93- logger .info ("Checking GPP Roles " )
94- json_roles : List [ Dict ] = org_details [ "Organisation" ][ "Roles" ][ "Role" ]
118+ def get_user_gp_org_role_code ( org_data : Dict [ str , Any ]) -> Optional [ str ] :
119+ logger .info ("starting ssm request to retrieve GP organisation role code " )
120+ gp_org_role_code = ssm_service . get_ssm_parameter ( GP_ORG_ROLE_CODE )
95121
96- org_role_codes = token_handler_ssm_service .get_org_role_codes ()
97- for json_role in json_roles :
98- if json_role ["id" ] in org_role_codes :
99- return json_role ["id" ]
100- return None
122+ if gp_org_role_code :
123+ logger .info ("Checking if GP organisation role is present" )
124+ json_roles : List [Dict ] = org_data ["Organisation" ]["Roles" ]["Role" ]
125+ for json_role in json_roles :
126+ if json_role ["id" ] == gp_org_role_code :
127+ return json_role ["id" ]
128+ return None
129+
130+ logger .error (
131+ LambdaError .LoginGpOrgRoleCode .to_str (),
132+ {"Result" : "Unsuccessful login" },
133+ )
134+ raise LoginException (500 , LambdaError .LoginGpOrgRoleCode )
101135
102136
103- def find_and_get_pcse_ods (ods_code ):
104- logger .info ("Checking PCSE Roles" )
105- if ods_code == token_handler_ssm_service .get_org_ods_codes ()[0 ]:
106- return ods_code
107- return None
137+ def get_user_primary_org_role_code (org_data : Dict [str , Any ]) -> str :
138+ logger .info ("Checking if a primary organisation role is present" )
139+ json_roles : List [Dict ] = org_data ["Organisation" ]["Roles" ]["Role" ]
140+
141+ for json_role in json_roles :
142+ if "primaryRole" in json_role :
143+ return json_role ["id" ]
144+ return ""
108145
109146
110- def find_icb_for_user (org_data ) :
147+ def find_icb_for_user (org_data : Dict [ str , Any ]) -> str :
111148 logger .info ("Checking relationships" )
112149 try :
113150 relationships : List [Dict ] = org_data ["Rels" ]["Rel" ]
0 commit comments