|
| 1 | +"""Authoriser class""" |
| 2 | +import json |
| 3 | + |
| 4 | +from authorisation.api_operation_code import ApiOperationCode |
| 5 | +from clients import redis_client, logger |
| 6 | +from constants import SUPPLIER_PERMISSIONS_HASH_KEY |
| 7 | + |
| 8 | + |
| 9 | +class Authoriser: |
| 10 | + """Authoriser class. Used for authorising operations on FHIR vaccinations.""" |
| 11 | + def __init__(self): |
| 12 | + self._cache_client = redis_client |
| 13 | + |
| 14 | + @staticmethod |
| 15 | + def _expand_permissions(permissions: list[str]) -> dict[str, list[ApiOperationCode]]: |
| 16 | + """Parses and expands permissions data into a dictionary mapping vaccination types to a list of permitted |
| 17 | + API operations. The raw string from Redis will be in the form VAC.PERMS e.g. COVID19.CRUDS""" |
| 18 | + expanded_permissions = {} |
| 19 | + |
| 20 | + for permission in permissions: |
| 21 | + vaccine_type, operation_codes_str = permission.split(".", maxsplit=1) |
| 22 | + vaccine_type = vaccine_type.lower() |
| 23 | + operation_codes = [ |
| 24 | + operation_code |
| 25 | + for operation_code in operation_codes_str.lower() |
| 26 | + if operation_code in list(ApiOperationCode) |
| 27 | + ] |
| 28 | + expanded_permissions[vaccine_type] = operation_codes |
| 29 | + |
| 30 | + return expanded_permissions |
| 31 | + |
| 32 | + def _get_supplier_permissions(self, supplier_system: str) -> dict[str, list[ApiOperationCode]]: |
| 33 | + raw_permissions_data = self._cache_client.hget(SUPPLIER_PERMISSIONS_HASH_KEY, supplier_system) |
| 34 | + permissions_data = json.loads(raw_permissions_data) if raw_permissions_data else [] |
| 35 | + |
| 36 | + return self._expand_permissions(permissions_data) |
| 37 | + |
| 38 | + def authorise( |
| 39 | + self, |
| 40 | + supplier_system: str, |
| 41 | + requested_operation: ApiOperationCode, |
| 42 | + vaccination_types: set[str] |
| 43 | + ) -> bool: |
| 44 | + """Checks that the supplier system is permitted to carry out the requested operation on the given vaccination |
| 45 | + type(s)""" |
| 46 | + supplier_permissions = self._get_supplier_permissions(supplier_system) |
| 47 | + |
| 48 | + logger.info( |
| 49 | + f"operation: {requested_operation}, supplier_permissions: {supplier_permissions}, " |
| 50 | + f"vaccine_types: {vaccination_types}" |
| 51 | + ) |
| 52 | + return all( |
| 53 | + requested_operation in supplier_permissions.get(vaccination_type.lower(), []) |
| 54 | + for vaccination_type in vaccination_types |
| 55 | + ) |
| 56 | + |
| 57 | + def filter_permitted_vacc_types( |
| 58 | + self, |
| 59 | + supplier_system: str, |
| 60 | + requested_operation: ApiOperationCode, |
| 61 | + vaccination_types: set[str] |
| 62 | + ) -> set[str]: |
| 63 | + """Returns the set of vaccine types that a given supplier can interact with for a given operation type. |
| 64 | + This is a more permissive form of authorisation e.g. used in search as it will filter out any requested vacc |
| 65 | + types that they cannot interact with without throwing an error""" |
| 66 | + supplier_permissions = self._get_supplier_permissions(supplier_system) |
| 67 | + |
| 68 | + return { |
| 69 | + vaccine_type |
| 70 | + for vaccine_type in vaccination_types |
| 71 | + if requested_operation in supplier_permissions.get(vaccine_type.lower(), []) |
| 72 | + } |
0 commit comments