Skip to content

Commit c3ee09e

Browse files
committed
[PRMP-1218] Implement bulk upload review queue handling and feature flag integration
1 parent d2b7cec commit c3ee09e

27 files changed

+378
-166
lines changed

lambdas/enums/document_review_reason.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,5 @@
22

33

44
class DocumentReviewReason(StrEnum):
5-
UNKNOWN_NHS_NUMBER = "Unknown NHS number"
6-
DEMOGRAPHIC_MISMATCHES = "Demographic mismatches"
7-
DUPLICATE_RECORD = "Duplicate records error"
8-
FILE_COUNT_MISMATCH = "More or less files than we expected"
9-
FILE_NAME_MISMATCH = "Filename Naming convention error"
10-
GP2GP_ERROR = "GP2GP failure"
11-
GENERAL_ERROR = "General error"
5+
NEW_DOCUMENT = "New document to review"
6+
UNSUCCESSFUL_UPLOAD = "Unsuccessful upload"

lambdas/enums/feature_flags.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ class FeatureFlags(StrEnum):
1111
)
1212
UPLOAD_DOCUMENT_ITERATION_2_ENABLED = "uploadDocumentIteration2Enabled"
1313
UPLOAD_DOCUMENT_ITERATION_3_ENABLED = "uploadDocumentIteration3Enabled"
14+
BULK_UPLOAD_SEND_TO_REVIEW_ENABLED = "bulkUploadSendToReviewEnabled"

lambdas/handlers/bulk_upload_handler.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,22 @@ def lambda_handler(event, _context):
2727
validation_strict_mode = validation_strict_mode_flag_object[
2828
FeatureFlags.LLOYD_GEORGE_VALIDATION_STRICT_MODE_ENABLED.value
2929
]
30+
31+
send_to_review_flag_object = feature_flag_service.get_feature_flags_by_flag(
32+
FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value
33+
)
34+
send_to_review_enabled = send_to_review_flag_object[
35+
FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value
36+
]
37+
3038
bypass_pds = os.getenv("BYPASS_PDS", "false").lower() == "true"
3139

3240
if validation_strict_mode:
3341
logger.info("Lloyd George validation strict mode is enabled")
3442

43+
if send_to_review_enabled:
44+
logger.info("Bulk upload send to review queue is enabled")
45+
3546
if "Records" not in event or len(event["Records"]) < 1:
3647
http_status_code = 400
3748
response_body = (
@@ -43,7 +54,9 @@ def lambda_handler(event, _context):
4354
).create_api_gateway_response()
4455

4556
bulk_upload_service = BulkUploadService(
46-
strict_mode=validation_strict_mode, bypass_pds=bypass_pds
57+
strict_mode=validation_strict_mode,
58+
bypass_pds=bypass_pds,
59+
send_to_review_enabled=send_to_review_enabled,
4760
)
4861

4962
try:

lambdas/models/document_review.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@
55
from enums.document_review_reason import DocumentReviewReason
66
from enums.document_review_status import DocumentReviewStatus
77
from enums.metadata_field_names import DocumentReferenceMetadataFields
8-
from enums.upload_forbidden_file_extensions import is_file_type_allowed
98
from enums.snomed_codes import SnomedCodes
10-
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator, ValidationError
9+
from enums.upload_forbidden_file_extensions import is_file_type_allowed
10+
from pydantic import (
11+
BaseModel,
12+
ConfigDict,
13+
Field,
14+
field_validator,
15+
model_validator,
16+
)
1117
from pydantic.alias_generators import to_camel, to_pascal
12-
from utils.exceptions import InvalidNhsNumberException, ConfigNotFoundException, InvalidFileTypeException
1318
from utils import upload_file_configs
19+
from utils.exceptions import (
20+
ConfigNotFoundException,
21+
InvalidFileTypeException,
22+
InvalidNhsNumberException,
23+
)
1424
from utils.utilities import validate_nhs_number
1525

1626

@@ -43,7 +53,7 @@ class DocumentUploadReviewReference(BaseModel):
4353
default=DocumentReviewStatus.PENDING_REVIEW
4454
)
4555
review_reason: DocumentReviewReason = Field(
46-
default=DocumentReviewReason.GENERAL_ERROR
56+
default=DocumentReviewReason.UNSUCCESSFUL_UPLOAD
4757
)
4858
review_date: int | None = Field(default=None)
4959
reviewer: str | None = Field(default=None)
@@ -165,12 +175,13 @@ def verify_nhs_number(cls, value) -> str | None:
165175
@model_validator(mode="after")
166176
def validate_file_extension(self) -> Self:
167177
try:
168-
accepted_file_types = upload_file_configs.get_config_by_snomed_code(self.snomed_code.code).accepted_file_types
178+
accepted_file_types = upload_file_configs.get_config_by_snomed_code(
179+
self.snomed_code.code
180+
).accepted_file_types
169181

170182
for file in self.documents:
171183
if not is_file_type_allowed(file, accepted_file_types):
172184
raise InvalidFileTypeException("Invalid file extension.")
173185
return self
174186
except ConfigNotFoundException:
175187
raise InvalidFileTypeException("Unable to find file configuration.")
176-

lambdas/models/sqs/review_message_body.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33

44

55
class ReviewMessageFile(BaseModel):
6-
"""Model for individual file in SQS message body from the document review queue."""
6+
"""Model for an individual file in the SQS message body from the document review queue"""
77

88
file_name: str
99
file_path: str = Field(description="Location in the staging bucket")
1010
"""Location in the staging bucket"""
1111

1212

1313
class ReviewMessageBody(BaseModel):
14-
"""Model for SQS message body from the document review queue."""
14+
"""Model for SQS message body from the document review queue"""
1515

1616
model_config = ConfigDict(
1717
use_enum_values=True,
@@ -20,8 +20,7 @@ class ReviewMessageBody(BaseModel):
2020
files: list[ReviewMessageFile]
2121
nhs_number: str
2222
failure_reason: DocumentReviewReason = Field(
23-
default=DocumentReviewReason.GENERAL_ERROR
23+
default=DocumentReviewReason.UNSUCCESSFUL_UPLOAD
2424
)
2525
upload_date: str
2626
uploader_ods: str
27-
current_gp: str

lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,81 @@
11
import os
22
import uuid
3+
from datetime import datetime
34

5+
from enums.document_review_reason import DocumentReviewReason
46
from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage
7+
from models.sqs.review_message_body import ReviewMessageBody, ReviewMessageFile
58
from models.staging_metadata import StagingSqsMetadata
69
from services.base.sqs_service import SQSService
710
from utils.audit_logging_setup import LoggingService
811
from utils.request_context import request_context
912

10-
_logger = LoggingService(__name__)
13+
logger = LoggingService(__name__)
1114

1215

1316
class BulkUploadSqsRepository:
1417
def __init__(self):
1518
self.sqs_repository = SQSService()
16-
self.invalid_queue_url = os.environ["INVALID_SQS_QUEUE_URL"]
1719
self.metadata_queue_url = os.environ["METADATA_SQS_QUEUE_URL"]
20+
self.review_queue_url = os.environ["REVIEW_SQS_QUEUE_URL"]
1821

1922
def put_staging_metadata_back_to_queue(self, staging_metadata: StagingSqsMetadata):
2023
request_context.patient_nhs_no = staging_metadata.nhs_number
2124
setattr(staging_metadata, "retries", (staging_metadata.retries + 1))
22-
_logger.info("Returning message to sqs queue...")
25+
logger.info("Returning message to sqs queue...")
2326
self.sqs_repository.send_message_with_nhs_number_attr_fifo(
2427
queue_url=self.metadata_queue_url,
2528
message_body=staging_metadata.model_dump_json(by_alias=True),
2629
nhs_number=staging_metadata.nhs_number,
2730
group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}",
2831
)
2932

33+
def send_message_to_review_queue(
34+
self,
35+
staging_metadata: StagingSqsMetadata,
36+
uploader_ods: str,
37+
failure_reason: DocumentReviewReason = DocumentReviewReason.UNSUCCESSFUL_UPLOAD,
38+
):
39+
request_context.patient_nhs_no = staging_metadata.nhs_number
40+
review_files = [
41+
ReviewMessageFile(
42+
file_name=file.stored_file_name.split("/")[-1],
43+
file_path=file.file_path,
44+
)
45+
for file in staging_metadata.files
46+
]
47+
48+
upload_id = f"{uuid.uuid4()}"
49+
50+
upload_date = datetime.now().isoformat()
51+
52+
review_message = ReviewMessageBody(
53+
upload_id=upload_id,
54+
files=review_files,
55+
nhs_number=staging_metadata.nhs_number,
56+
failure_reason=failure_reason,
57+
upload_date=upload_date,
58+
uploader_ods=uploader_ods,
59+
)
60+
61+
logger.info(
62+
f"Sending message to review queue for NHS number {staging_metadata.nhs_number} "
63+
f"with failure reason: {failure_reason}"
64+
)
65+
66+
self.sqs_repository.send_message_standard(
67+
queue_url=self.review_queue_url,
68+
message_body=review_message.model_dump_json(),
69+
)
70+
3071
def put_sqs_message_back_to_queue(self, sqs_message: dict):
3172
try:
3273
nhs_number = sqs_message["messageAttributes"]["NhsNumber"]["stringValue"]
3374
request_context.patient_nhs_no = nhs_number
3475
except KeyError:
3576
nhs_number = ""
3677

37-
_logger.info("Returning message to sqs queue...")
78+
logger.info("Returning message to sqs queue...")
3879
self.sqs_repository.send_message_with_nhs_number_attr_fifo(
3980
queue_url=self.metadata_queue_url,
4081
message_body=sqs_message["body"],

lambdas/services/bulk_upload_service.py

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@
44

55
import pydantic
66
from botocore.exceptions import ClientError
7+
from enums.document_review_reason import DocumentReviewReason
78
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
89
from enums.snomed_codes import SnomedCodes
910
from enums.upload_status import UploadStatus
1011
from enums.virus_scan_result import VirusScanResult
1112
from models.document_reference import DocumentReference
1213
from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage
13-
from models.staging_metadata import BulkUploadQueueMetadata, StagingSqsMetadata
14+
from models.staging_metadata import (
15+
NHS_NUMBER_PLACEHOLDER,
16+
BulkUploadQueueMetadata,
17+
StagingSqsMetadata,
18+
)
1419
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
1520
BulkUploadDynamoRepository,
1621
)
@@ -52,7 +57,7 @@
5257

5358

5459
class BulkUploadService:
55-
def __init__(self, strict_mode, bypass_pds=False):
60+
def __init__(self, strict_mode, bypass_pds=False, send_to_review_enabled=False):
5661
self.dynamo_repository = BulkUploadDynamoRepository()
5762
self.sqs_repository = BulkUploadSqsRepository()
5863
self.bulk_upload_s3_repository = BulkUploadS3Repository()
@@ -62,6 +67,7 @@ def __init__(self, strict_mode, bypass_pds=False):
6267
self.file_path_cache = {}
6368
self.pdf_stitching_queue_url = os.environ["PDF_STITCHING_SQS_URL"]
6469
self.bypass_pds = bypass_pds
70+
self.send_to_review_enabled = send_to_review_enabled
6571

6672
def process_message_queue(self, records: list):
6773
for index, message in enumerate(records, start=1):
@@ -74,9 +80,7 @@ def process_message_queue(self, records: list):
7480
logger.info(
7581
"Cannot validate patient due to PDS responded with Too Many Requests"
7682
)
77-
logger.info(
78-
"Cannot process for now due to PDS rate limit reached."
79-
)
83+
logger.info("Cannot process for now due to PDS rate limit reached.")
8084
logger.info(
8185
"All remaining messages in this batch will be returned to sqs queue to retry later."
8286
)
@@ -132,6 +136,9 @@ def handle_sqs_message(self, message: dict):
132136
for file_metadata in staging_metadata.files:
133137
file_names.append(os.path.basename(file_metadata.stored_file_name))
134138
file_metadata.scan_date = validate_scan_date(file_metadata.scan_date)
139+
file_metadata.file_path = self.strip_leading_slash(
140+
file_metadata.file_path
141+
)
135142
request_context.patient_nhs_no = staging_metadata.nhs_number
136143
validate_nhs_number(staging_metadata.nhs_number)
137144
pds_patient_details = getting_patient_info_from_pds(
@@ -194,9 +201,19 @@ def handle_sqs_message(self, message: dict):
194201
logger.info("Will stop processing Lloyd George record for this patient.")
195202

196203
reason = str(error)
204+
uploader_ods = (
205+
staging_metadata.files[0].gp_practice_code
206+
if staging_metadata.files
207+
else ""
208+
)
209+
197210
self.dynamo_repository.write_report_upload_to_dynamo(
198211
staging_metadata, UploadStatus.FAILED, reason, patient_ods_code
199212
)
213+
if isinstance(error, (InvalidNhsNumberException, PatientNotFoundException)):
214+
logger.info("Invalid NHS number detected. Will set as placeholder")
215+
staging_metadata.nhs_number = NHS_NUMBER_PLACEHOLDER
216+
self.send_to_review_queue_if_enabled(staging_metadata, uploader_ods)
200217
return
201218

202219
logger.info(
@@ -336,8 +353,7 @@ def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata):
336353
if not contains_accent_char(sample_file_path):
337354
logger.info("No accented character detected in file path.")
338355
self.file_path_cache = {
339-
file.file_path: self.strip_leading_slash(file.file_path)
340-
for file in staging_metadata.files
356+
file.file_path: file.file_path for file in staging_metadata.files
341357
}
342358
return
343359

@@ -347,11 +363,8 @@ def resolve_source_file_path(self, staging_metadata: StagingSqsMetadata):
347363
resolved_file_paths = {}
348364
for file in staging_metadata.files:
349365
file_path_in_metadata = file.file_path
350-
file_path_without_leading_slash = self.strip_leading_slash(
351-
file_path_in_metadata
352-
)
353-
file_path_in_nfc_form = convert_to_nfc_form(file_path_without_leading_slash)
354-
file_path_in_nfd_form = convert_to_nfd_form(file_path_without_leading_slash)
366+
file_path_in_nfc_form = convert_to_nfc_form(file_path_in_metadata)
367+
file_path_in_nfd_form = convert_to_nfd_form(file_path_in_metadata)
355368

356369
if self.bulk_upload_s3_repository.file_exists_on_staging_bucket(
357370
file_path_in_nfc_form
@@ -440,3 +453,29 @@ def strip_leading_slash(filepath: str) -> str:
440453
@staticmethod
441454
def concatenate_acceptance_reason(previous_reasons: str | None, new_reason: str):
442455
return previous_reasons + ", " + new_reason if previous_reasons else new_reason
456+
457+
def send_to_review_queue_if_enabled(
458+
self,
459+
staging_metadata: StagingSqsMetadata,
460+
uploader_ods: str,
461+
):
462+
if not self.send_to_review_enabled:
463+
return
464+
465+
review_reason = DocumentReviewReason.UNSUCCESSFUL_UPLOAD
466+
467+
try:
468+
self.sqs_repository.send_message_to_review_queue(
469+
staging_metadata=staging_metadata,
470+
failure_reason=review_reason,
471+
uploader_ods=uploader_ods,
472+
)
473+
logger.info(
474+
f"Sent failed record to review queue with reason: {review_reason}"
475+
)
476+
except Exception as e:
477+
logger.error(
478+
f"Failed to send message to review queue: {e}",
479+
{"Result": "Review queue send failed"},
480+
)
481+
raise e

0 commit comments

Comments
 (0)