|
15 | 15 | from pydantic import ValidationError |
16 | 16 |
|
17 | 17 | import parameter_parser |
| 18 | +from authorisation.ApiOperationCode import ApiOperationCode |
| 19 | +from authorisation.Authoriser import Authoriser |
18 | 20 | from fhir_repository import ImmunizationRepository |
19 | | -from base_utils.base_utils import obtain_field_value |
20 | | -from models.field_names import FieldNames |
21 | | -from models.errors import InvalidPatientId, CustomValidationError, UnhandledResponseError |
| 21 | +from models.errors import InvalidPatientId, CustomValidationError, UnauthorizedVaxError |
22 | 22 | from models.fhir_immunization import ImmunizationValidator |
23 | 23 | from models.utils.generic_utils import nhs_number_mod11_check, get_occurrence_datetime, create_diagnostics, form_json, get_contained_patient |
24 | | -from models.constants import Constants |
25 | 24 | from models.errors import MandatoryError |
26 | 25 | from timer import timed |
27 | 26 | from filter import Filter |
@@ -54,27 +53,27 @@ def __init__( |
54 | 53 | imms_repo: ImmunizationRepository, |
55 | 54 | validator: ImmunizationValidator = ImmunizationValidator(), |
56 | 55 | ): |
| 56 | + self.authoriser = Authoriser() |
57 | 57 | self.immunization_repo = imms_repo |
58 | 58 | self.validator = validator |
59 | 59 |
|
60 | 60 | def get_immunization_by_identifier( |
61 | | - self, identifier_pk: str, imms_vax_type_perms: list[str], identifier: str, element: str |
| 61 | + self, identifier_pk: str, supplier_name: str, identifier: str, element: str |
62 | 62 | ) -> Optional[dict]: |
63 | 63 | """ |
64 | 64 | Get an Immunization by its ID. Return None if not found. If the patient doesn't have an NHS number, |
65 | 65 | return the Immunization without calling PDS or checking S flag. |
66 | 66 | """ |
67 | | - imms_resp = self.immunization_repo.get_immunization_by_identifier( |
68 | | - identifier_pk, imms_vax_type_perms |
69 | | - ) |
| 67 | + base_url = f"{get_service_url()}/Immunization" |
| 68 | + imms_resp, vaccination_type = self.immunization_repo.get_immunization_by_identifier(identifier_pk) |
| 69 | + |
70 | 70 | if not imms_resp: |
71 | | - base_url = f"{get_service_url()}/Immunization" |
72 | | - response = form_json(imms_resp, None, None, base_url) |
73 | | - return response |
74 | | - else: |
75 | | - base_url = f"{get_service_url()}/Immunization" |
76 | | - response = form_json(imms_resp, element, identifier, base_url) |
77 | | - return response |
| 71 | + return form_json(imms_resp, None, None, base_url) |
| 72 | + |
| 73 | + if not self.authoriser.authorise(supplier_name, ApiOperationCode.SEARCH, {vaccination_type}): |
| 74 | + raise UnauthorizedVaxError |
| 75 | + |
| 76 | + return form_json(imms_resp, element, identifier, base_url) |
78 | 77 |
|
79 | 78 | def get_immunization_by_id(self, imms_id: str, imms_vax_type_perms: list[str]) -> Optional[dict]: |
80 | 79 | """ |
|
0 commit comments