|
19 | 19 | from models.errors import ( |
20 | 20 | Code, |
21 | 21 | IdentifierDuplicationError, |
| 22 | + InvalidImmunizationId, |
22 | 23 | ParameterException, |
23 | | - ResourceNotFoundError, |
24 | 24 | Severity, |
25 | 25 | UnauthorizedError, |
26 | 26 | UnauthorizedVaxError, |
@@ -96,8 +96,8 @@ def get_immunization_by_identifier(self, aws_event) -> dict: |
96 | 96 | def get_immunization_by_id(self, aws_event: APIGatewayProxyEventV1) -> dict: |
97 | 97 | imms_id = get_path_parameter(aws_event, "id") |
98 | 98 |
|
99 | | - if id_error := self._validate_id(imms_id): |
100 | | - return create_response(400, id_error) |
| 99 | + if not self._is_valid_immunization_id(imms_id): |
| 100 | + raise InvalidImmunizationId() |
101 | 101 |
|
102 | 102 | supplier_system = get_supplier_system_header(aws_event) |
103 | 103 |
|
@@ -157,10 +157,20 @@ def update_immunization(self, aws_event): |
157 | 157 |
|
158 | 158 | supplier_system = self._identify_supplier_system(aws_event) |
159 | 159 |
|
160 | | - # Validate the imms id - start |
161 | | - if id_error := self._validate_id(imms_id): |
162 | | - return create_response(400, json.dumps(id_error)) |
163 | | - # Validate the imms id - end |
| 160 | + # Refactor to raise InvalidImmunizationId when working on VED-747 |
| 161 | + if not self._is_valid_immunization_id(imms_id): |
| 162 | + return create_response( |
| 163 | + 400, |
| 164 | + json.dumps( |
| 165 | + create_operation_outcome( |
| 166 | + resource_id=str(uuid.uuid4()), |
| 167 | + severity=Severity.error, |
| 168 | + code=Code.invalid, |
| 169 | + diagnostics="Validation errors: the provided event ID is either missing or not in the expected " |
| 170 | + "format.", |
| 171 | + ) |
| 172 | + ), |
| 173 | + ) |
164 | 174 |
|
165 | 175 | # Validate the body of the request - start |
166 | 176 | try: |
@@ -320,31 +330,18 @@ def update_immunization(self, aws_event): |
320 | 330 | except UnauthorizedVaxError as unauthorized: |
321 | 331 | return create_response(403, unauthorized.to_operation_outcome()) |
322 | 332 |
|
323 | | - def delete_immunization(self, aws_event): |
324 | | - try: |
325 | | - if aws_event.get("headers"): |
326 | | - imms_id = aws_event["pathParameters"]["id"] |
327 | | - else: |
328 | | - raise UnauthorizedError() |
329 | | - except UnauthorizedError as unauthorized: |
330 | | - return create_response(403, unauthorized.to_operation_outcome()) |
| 333 | + @fhir_api_exception_handler |
| 334 | + def delete_immunization(self, aws_event: APIGatewayProxyEventV1) -> dict: |
| 335 | + imms_id = get_path_parameter(aws_event, "id") |
331 | 336 |
|
332 | | - # Validate the imms id |
333 | | - if id_error := self._validate_id(imms_id): |
334 | | - return create_response(400, json.dumps(id_error)) |
| 337 | + if not self._is_valid_immunization_id(imms_id): |
| 338 | + raise InvalidImmunizationId() |
335 | 339 |
|
336 | | - supplier_system = self._identify_supplier_system(aws_event) |
| 340 | + supplier_system = get_supplier_system_header(aws_event) |
337 | 341 |
|
338 | | - try: |
339 | | - self.fhir_service.delete_immunization(imms_id, supplier_system) |
340 | | - return create_response(204) |
| 342 | + self.fhir_service.delete_immunization(imms_id, supplier_system) |
341 | 343 |
|
342 | | - except ResourceNotFoundError as not_found: |
343 | | - return create_response(404, not_found.to_operation_outcome()) |
344 | | - except UnhandledResponseError as unhandled_error: |
345 | | - return create_response(500, unhandled_error.to_operation_outcome()) |
346 | | - except UnauthorizedVaxError as unauthorized: |
347 | | - return create_response(403, unauthorized.to_operation_outcome()) |
| 344 | + return create_response(204) |
348 | 345 |
|
349 | 346 | def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict: |
350 | 347 | try: |
@@ -406,17 +403,9 @@ def search_immunizations(self, aws_event: APIGatewayProxyEventV1) -> dict: |
406 | 403 | result_json_dict["total"] = 0 |
407 | 404 | return create_response(200, json.dumps(result_json_dict)) |
408 | 405 |
|
409 | | - def _validate_id(self, _id: str) -> Optional[dict]: |
410 | | - if not re.match(self.immunization_id_pattern, _id): |
411 | | - msg = "Validation errors: the provided event ID is either missing or not in the expected format." |
412 | | - return create_operation_outcome( |
413 | | - resource_id=str(uuid.uuid4()), |
414 | | - severity=Severity.error, |
415 | | - code=Code.invalid, |
416 | | - diagnostics=msg, |
417 | | - ) |
418 | | - |
419 | | - return None |
| 406 | + def _is_valid_immunization_id(self, immunization_id: str) -> bool: |
| 407 | + """Validates if the given unique Immunization ID is valid.""" |
| 408 | + return False if not re.match(self.immunization_id_pattern, immunization_id) else True |
420 | 409 |
|
421 | 410 | def _validate_identifier_system(self, _id: str, _elements: str) -> Optional[dict]: |
422 | 411 | if not _id: |
|
0 commit comments