Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions lambdas/enums/document_review_reason.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,5 @@


class DocumentReviewReason(StrEnum):
UNKNOWN_NHS_NUMBER = "Unknown NHS number"
DEMOGRAPHIC_MISMATCHES = "Demographic mismatches"
DUPLICATE_RECORD = "Duplicate records error"
FILE_COUNT_MISMATCH = "More or less files than we expected"
FILE_NAME_MISMATCH = "Filename Naming convention error"
GP2GP_ERROR = "GP2GP failure"
GENERAL_ERROR = "General error"
NEW_DOCUMENT = "New document to review"
UNSUCCESSFUL_UPLOAD = "Unsuccessful upload"
1 change: 1 addition & 0 deletions lambdas/enums/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class FeatureFlags(StrEnum):
)
UPLOAD_DOCUMENT_ITERATION_2_ENABLED = "uploadDocumentIteration2Enabled"
UPLOAD_DOCUMENT_ITERATION_3_ENABLED = "uploadDocumentIteration3Enabled"
BULK_UPLOAD_SEND_TO_REVIEW_ENABLED = "bulkUploadSendToReviewEnabled"
15 changes: 14 additions & 1 deletion lambdas/handlers/bulk_upload_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,22 @@ def lambda_handler(event, _context):
validation_strict_mode = validation_strict_mode_flag_object[
FeatureFlags.LLOYD_GEORGE_VALIDATION_STRICT_MODE_ENABLED.value
]

send_to_review_flag_object = feature_flag_service.get_feature_flags_by_flag(
FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value
)
send_to_review_enabled = send_to_review_flag_object[
FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value
]

bypass_pds = os.getenv("BYPASS_PDS", "false").lower() == "true"

if validation_strict_mode:
logger.info("Lloyd George validation strict mode is enabled")

if send_to_review_enabled:
logger.info("Bulk upload send to review queue is enabled")

if "Records" not in event or len(event["Records"]) < 1:
http_status_code = 400
response_body = (
Expand All @@ -43,7 +54,9 @@ def lambda_handler(event, _context):
).create_api_gateway_response()

bulk_upload_service = BulkUploadService(
strict_mode=validation_strict_mode, bypass_pds=bypass_pds
strict_mode=validation_strict_mode,
bypass_pds=bypass_pds,
send_to_review_enabled=send_to_review_enabled,
)

try:
Expand Down
23 changes: 17 additions & 6 deletions lambdas/models/document_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@
from enums.document_review_reason import DocumentReviewReason
from enums.document_review_status import DocumentReviewStatus
from enums.metadata_field_names import DocumentReferenceMetadataFields
from enums.upload_forbidden_file_extensions import is_file_type_allowed
from enums.snomed_codes import SnomedCodes
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator, ValidationError
from enums.upload_forbidden_file_extensions import is_file_type_allowed
from pydantic import (
BaseModel,
ConfigDict,
Field,
field_validator,
model_validator,
)
from pydantic.alias_generators import to_camel, to_pascal
from utils.exceptions import InvalidNhsNumberException, ConfigNotFoundException, InvalidFileTypeException
from utils import upload_file_configs
from utils.exceptions import (
ConfigNotFoundException,
InvalidFileTypeException,
InvalidNhsNumberException,
)
from utils.utilities import validate_nhs_number


Expand Down Expand Up @@ -43,7 +53,7 @@ class DocumentUploadReviewReference(BaseModel):
default=DocumentReviewStatus.PENDING_REVIEW
)
review_reason: DocumentReviewReason = Field(
default=DocumentReviewReason.GENERAL_ERROR
default=DocumentReviewReason.UNSUCCESSFUL_UPLOAD
)
review_date: int | None = Field(default=None)
reviewer: str | None = Field(default=None)
Expand Down Expand Up @@ -165,12 +175,13 @@ def verify_nhs_number(cls, value) -> str | None:
@model_validator(mode="after")
def validate_file_extension(self) -> Self:
try:
accepted_file_types = upload_file_configs.get_config_by_snomed_code(self.snomed_code.code).accepted_file_types
accepted_file_types = upload_file_configs.get_config_by_snomed_code(
self.snomed_code.code
).accepted_file_types

for file in self.documents:
if not is_file_type_allowed(file, accepted_file_types):
raise InvalidFileTypeException("Invalid file extension.")
return self
except ConfigNotFoundException:
raise InvalidFileTypeException("Unable to find file configuration.")

8 changes: 3 additions & 5 deletions lambdas/models/sqs/review_message_body.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@


class ReviewMessageFile(BaseModel):
"""Model for individual file in SQS message body from the document review queue."""
"""Model for an individual file in the SQS message body from the document review queue"""

file_name: str
file_path: str = Field(description="Location in the staging bucket")
"""Location in the staging bucket"""


class ReviewMessageBody(BaseModel):
"""Model for SQS message body from the document review queue."""
"""Model for SQS message body from the document review queue"""

model_config = ConfigDict(
use_enum_values=True,
Expand All @@ -20,8 +20,6 @@ class ReviewMessageBody(BaseModel):
files: list[ReviewMessageFile]
nhs_number: str
failure_reason: DocumentReviewReason = Field(
default=DocumentReviewReason.GENERAL_ERROR
default=DocumentReviewReason.UNSUCCESSFUL_UPLOAD
)
upload_date: str
uploader_ods: str
current_gp: str
46 changes: 42 additions & 4 deletions lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,78 @@
import os
import uuid
from datetime import datetime

from enums.document_review_reason import DocumentReviewReason
from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage
from models.sqs.review_message_body import ReviewMessageBody, ReviewMessageFile
from models.staging_metadata import StagingSqsMetadata
from services.base.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
from utils.request_context import request_context

_logger = LoggingService(__name__)
logger = LoggingService(__name__)


class BulkUploadSqsRepository:
def __init__(self):
self.sqs_repository = SQSService()
self.invalid_queue_url = os.environ["INVALID_SQS_QUEUE_URL"]
self.metadata_queue_url = os.environ["METADATA_SQS_QUEUE_URL"]
self.review_queue_url = os.environ["REVIEW_SQS_QUEUE_URL"]

def put_staging_metadata_back_to_queue(self, staging_metadata: StagingSqsMetadata):
request_context.patient_nhs_no = staging_metadata.nhs_number
setattr(staging_metadata, "retries", (staging_metadata.retries + 1))
_logger.info("Returning message to sqs queue...")
logger.info("Returning message to sqs queue...")
self.sqs_repository.send_message_with_nhs_number_attr_fifo(
queue_url=self.metadata_queue_url,
message_body=staging_metadata.model_dump_json(by_alias=True),
nhs_number=staging_metadata.nhs_number,
group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}",
)

def send_message_to_review_queue(
self,
staging_metadata: StagingSqsMetadata,
uploader_ods: str,
failure_reason: DocumentReviewReason,
):
request_context.patient_nhs_no = staging_metadata.nhs_number
review_files = [
ReviewMessageFile(
file_name=file.stored_file_name.split("/")[-1],
file_path=file.file_path,
)
for file in staging_metadata.files
]

upload_id = f"{uuid.uuid4()}"

review_message = ReviewMessageBody(
upload_id=upload_id,
files=review_files,
nhs_number=staging_metadata.nhs_number,
failure_reason=failure_reason,
uploader_ods=uploader_ods,
)

logger.info(
f"Sending message to review queue for NHS number {staging_metadata.nhs_number} "
f"with failure reason: {failure_reason}"
)

self.sqs_repository.send_message_standard(
queue_url=self.review_queue_url,
message_body=review_message.model_dump_json(),
)

def put_sqs_message_back_to_queue(self, sqs_message: dict):
try:
nhs_number = sqs_message["messageAttributes"]["NhsNumber"]["stringValue"]
request_context.patient_nhs_no = nhs_number
except KeyError:
nhs_number = ""

_logger.info("Returning message to sqs queue...")
logger.info("Returning message to sqs queue...")
self.sqs_repository.send_message_with_nhs_number_attr_fifo(
queue_url=self.metadata_queue_url,
message_body=sqs_message["body"],
Expand Down
63 changes: 51 additions & 12 deletions lambdas/services/bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

import pydantic
from botocore.exceptions import ClientError
from enums.document_review_reason import DocumentReviewReason
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
from enums.snomed_codes import SnomedCodes
from enums.upload_status import UploadStatus
from enums.virus_scan_result import VirusScanResult
from models.document_reference import DocumentReference
from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage
from models.staging_metadata import BulkUploadQueueMetadata, StagingSqsMetadata
from models.staging_metadata import (
NHS_NUMBER_PLACEHOLDER,
BulkUploadQueueMetadata,
StagingSqsMetadata,
)
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
BulkUploadDynamoRepository,
)
Expand Down Expand Up @@ -52,7 +57,7 @@


class BulkUploadService:
def __init__(self, strict_mode, bypass_pds=False):
def __init__(self, strict_mode, bypass_pds=False, send_to_review_enabled=False):
self.dynamo_repository = BulkUploadDynamoRepository()
self.sqs_repository = BulkUploadSqsRepository()
self.bulk_upload_s3_repository = BulkUploadS3Repository()
Expand All @@ -62,6 +67,7 @@ def __init__(self, strict_mode, bypass_pds=False):
self.file_path_cache = {}
self.pdf_stitching_queue_url = os.environ["PDF_STITCHING_SQS_URL"]
self.bypass_pds = bypass_pds
self.send_to_review_enabled = send_to_review_enabled

def process_message_queue(self, records: list):
for index, message in enumerate(records, start=1):
Expand All @@ -74,9 +80,7 @@ def process_message_queue(self, records: list):
logger.info(
"Cannot validate patient due to PDS responded with Too Many Requests"
)
logger.info(
"Cannot process for now due to PDS rate limit reached."
)
logger.info("Cannot process for now due to PDS rate limit reached.")
logger.info(
"All remaining messages in this batch will be returned to sqs queue to retry later."
)
Expand Down Expand Up @@ -132,6 +136,9 @@ def handle_sqs_message(self, message: dict):
for file_metadata in staging_metadata.files:
file_names.append(os.path.basename(file_metadata.stored_file_name))
file_metadata.scan_date = validate_scan_date(file_metadata.scan_date)
file_metadata.file_path = self.strip_leading_slash(
file_metadata.file_path
)
request_context.patient_nhs_no = staging_metadata.nhs_number
validate_nhs_number(staging_metadata.nhs_number)
pds_patient_details = getting_patient_info_from_pds(
Expand Down Expand Up @@ -194,9 +201,19 @@ def handle_sqs_message(self, message: dict):
logger.info("Will stop processing Lloyd George record for this patient.")

reason = str(error)
uploader_ods = (
staging_metadata.files[0].gp_practice_code
if staging_metadata.files
else ""
)

self.dynamo_repository.write_report_upload_to_dynamo(
staging_metadata, UploadStatus.FAILED, reason, patient_ods_code
)
if isinstance(error, (InvalidNhsNumberException, PatientNotFoundException)):
logger.info("Invalid NHS number detected. Will set as placeholder")
staging_metadata.nhs_number = NHS_NUMBER_PLACEHOLDER
self.send_to_review_queue_if_enabled(staging_metadata, uploader_ods)
return

logger.info(
Expand Down Expand Up @@ -336,8 +353,7 @@ def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata):
if not contains_accent_char(sample_file_path):
logger.info("No accented character detected in file path.")
self.file_path_cache = {
file.file_path: self.strip_leading_slash(file.file_path)
for file in staging_metadata.files
file.file_path: file.file_path for file in staging_metadata.files
}
return

Expand All @@ -347,11 +363,8 @@ def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata):
resolved_file_paths = {}
for file in staging_metadata.files:
file_path_in_metadata = file.file_path
file_path_without_leading_slash = self.strip_leading_slash(
file_path_in_metadata
)
file_path_in_nfc_form = convert_to_nfc_form(file_path_without_leading_slash)
file_path_in_nfd_form = convert_to_nfd_form(file_path_without_leading_slash)
file_path_in_nfc_form = convert_to_nfc_form(file_path_in_metadata)
file_path_in_nfd_form = convert_to_nfd_form(file_path_in_metadata)

if self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
file_path_in_nfc_form
Expand Down Expand Up @@ -440,3 +453,29 @@ def strip_leading_slash(filepath: str) -> str:
@staticmethod
def concatenate_acceptance_reason(previous_reasons: str | None, new_reason: str):
return previous_reasons + ", " + new_reason if previous_reasons else new_reason

def send_to_review_queue_if_enabled(
self,
staging_metadata: StagingSqsMetadata,
uploader_ods: str,
):
if not self.send_to_review_enabled:
return

review_reason = DocumentReviewReason.UNSUCCESSFUL_UPLOAD

try:
self.sqs_repository.send_message_to_review_queue(
staging_metadata=staging_metadata,
failure_reason=review_reason,
uploader_ods=uploader_ods,
)
logger.info(
f"Sent failed record to review queue with reason: {review_reason}"
)
except Exception as e:
logger.error(
f"Failed to send message to review queue: {e}",
{"Result": "Review queue send failed"},
)
raise e
Loading
Loading