-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathfhir_api_exception_handler.py
More file actions
75 lines (68 loc) · 2.56 KB
/
fhir_api_exception_handler.py
File metadata and controls
75 lines (68 loc) · 2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Module for the global FHIR API exception handler"""
import functools
import uuid
from typing import Callable, Type
from common.clients import logger
from common.models.errors import (
CustomValidationError,
IdentifierDuplicationError,
InconsistentIdentifierError,
InconsistentResourceVersionError,
ResourceNotFoundError,
)
from constants import GENERIC_SERVER_ERROR_DIAGNOSTICS_MESSAGE
from controller.aws_apig_response_utils import create_response
from models.errors import (
Code,
InconsistentIdError,
InvalidImmunizationIdError,
InvalidJsonError,
InvalidResourceVersionError,
InvalidStoredDataError,
ParameterExceptionError,
ResourceVersionNotProvidedError,
Severity,
TooManyResultsError,
UnauthorizedError,
UnauthorizedVaxError,
UnhandledResponseError,
create_operation_outcome,
)
_CUSTOM_EXCEPTION_TO_STATUS_MAP: dict[Type[Exception], int] = {
InconsistentResourceVersionError: 400,
InconsistentIdentifierError: 400, # Identifier refers to the local FHIR identifier composed of system and value.
InconsistentIdError: 400, # ID refers to the top-level ID of the FHIR resource.
InvalidImmunizationIdError: 400,
InvalidJsonError: 400,
InvalidResourceVersionError: 400,
CustomValidationError: 400,
ResourceVersionNotProvidedError: 400,
ParameterExceptionError: 400,
TooManyResultsError: 400,
UnauthorizedError: 403,
UnauthorizedVaxError: 403,
ResourceNotFoundError: 404,
IdentifierDuplicationError: 422,
UnhandledResponseError: 500,
InvalidStoredDataError: 500,
}
def fhir_api_exception_handler(function: Callable) -> Callable:
"""Decorator to handle any expected FHIR API exceptions or unexpected exception and provide a valid response to
the client"""
@functools.wraps(function)
def wrapper(*args, **kwargs):
try:
return function(*args, **kwargs)
except tuple(_CUSTOM_EXCEPTION_TO_STATUS_MAP) as exc:
status_code = _CUSTOM_EXCEPTION_TO_STATUS_MAP[type(exc)]
return create_response(status_code=status_code, body=exc.to_operation_outcome())
except Exception: # pylint: disable = broad-exception-caught
logger.exception("Unhandled exception")
server_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
severity=Severity.error,
code=Code.server_error,
diagnostics=GENERIC_SERVER_ERROR_DIAGNOSTICS_MESSAGE,
)
return create_response(500, server_error)
return wrapper