Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 15 additions & 54 deletions lambdas/handlers/bulk_upload_metadata_processor_handler.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import urllib.parse

from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from services.bulk_upload.metadata_general_preprocessor import (
MetadataGeneralPreprocessor,
)
from services.bulk_upload.metadata_usb_preprocessor import (
MetadataUsbPreprocessorService,
)
from services.bulk_upload_metadata_processor_service import (
BulkUploadMetadataProcessorService,
get_formatter_service,
)
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.exceptions import BulkUploadMetadataException

logger = LoggingService(__name__)

Expand All @@ -27,64 +19,33 @@
)
@handle_lambda_exceptions
def lambda_handler(event, _context):
if "source" in event and event.get("source") == "aws.s3":
logger.info("Handling EventBridge event from S3")
handle_expedite_event(event)
return

practice_directory = event.get("practiceDirectory", "")
raw_pre_format_type = event.get(
"preFormatType", LloydGeorgePreProcessFormat.GENERAL
)
formatter_service_class = get_formatter_service(raw_pre_format_type)
if not practice_directory:
logger.error(
"Failed to start metadata processing due to missing practice directory"
)
return

logger.info(
f"Starting metadata processing for practice directory: {practice_directory}"
)
practice_directory = event.get("practiceDirectory", "")

remappings = event.get("metadataFieldRemappings", {})

metadata_formatter_service = formatter_service_class(practice_directory)
metadata_service = BulkUploadMetadataProcessorService(
metadata_formatter_service=metadata_formatter_service,
metadata_heading_remap=remappings,
)
metadata_service.process_metadata()

if "source" in event and event.get("source") == "aws.s3":
logger.info("Handling EventBridge event from S3")

metadata_service.handle_expedite_event(event)
return

def get_formatter_service(raw_pre_format_type):
try:
pre_format_type = LloydGeorgePreProcessFormat(raw_pre_format_type)
if pre_format_type == LloydGeorgePreProcessFormat.GENERAL:
logger.info("Using general preFormatType")
return MetadataGeneralPreprocessor
elif pre_format_type == LloydGeorgePreProcessFormat.USB:
logger.info("Using usb preFormatType")
return MetadataUsbPreprocessorService
except ValueError:
logger.warning(
f"Invalid preFormatType: '{raw_pre_format_type}', defaulting to {LloydGeorgePreProcessFormat.GENERAL}."
if not practice_directory:
logger.error(
"Failed to start metadata processing due to missing practice directory"
)
return MetadataGeneralPreprocessor
return

logger.info(
f"Starting metadata processing for practice directory: {practice_directory}"
)

def handle_expedite_event(event):
try:
key_string = event["detail"]["object"]["key"]
key = urllib.parse.unquote_plus(key_string, encoding="utf-8")
if key.startswith("expedite/"):
logger.info("Processing file from expedite folder")
return # To be added upon by ticket PRMP-540
else:
failure_msg = f"Unexpected directory or file location received from EventBridge: {key_string}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
except KeyError as e:
failure_msg = f"Failed due to missing key: {str(e)}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
metadata_service.process_metadata()
38 changes: 34 additions & 4 deletions lambdas/repositories/bulk_upload/bulk_upload_s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
VirusScanNoResultException,
)

_logger = LoggingService(__name__)
logger = LoggingService(__name__)


class BulkUploadS3Repository:
Expand Down Expand Up @@ -54,17 +54,17 @@ def check_virus_result(
)
except ClientError as e:
if "AccessDenied" in str(e) or "NoSuchKey" in str(e):
_logger.info(
logger.info(
f"Failed to check object tag for given file_path: {file_path}"
)
_logger.info(
logger.info(
"file_path may be incorrect or contain invalid character"
)
raise S3FileNotFoundException(f"Failed to access file {file_path}")
else:
raise e

_logger.info(
logger.info(
f"Verified that all documents for patient {staging_metadata.nhs_number} are clean."
)

Expand Down Expand Up @@ -96,3 +96,33 @@ def rollback_transaction(self):

def file_exists_on_staging_bucket(self, file_key: str) -> bool:
return self.s3_repository.file_exist_on_s3(self.staging_bucket_name, file_key)

def check_file_tag_status_on_staging_bucket(self, file_key: str) -> str:
"""
Retrieves the virus scan tag value for a single file.
Raises specific exceptions based on the tag's presence or S3 access.
"""
s3_service = self.s3_repository

try:
# Call the underlying S3 method to get the tag value
raw_scan_result = s3_service.get_tag_value(
s3_bucket_name=self.staging_bucket_name,
file_key=file_key,
tag_key=SCAN_RESULT_TAG_KEY,
)
return raw_scan_result

except TagNotFoundException:
return ""

except ClientError as e:
error_msg = str(e)
if "AccessDenied" in str(e) or "NoSuchKey" in error_msg:
logger.error(
f"Failed to check object tag for given file_path: {file_key}"
)
logger.error("file_path may be incorrect or contain invalid character")
raise S3FileNotFoundException(f"Failed to access file {file_key}")
else:
raise e
84 changes: 84 additions & 0 deletions lambdas/services/bulk_upload_metadata_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
import os
import shutil
import tempfile
import urllib.parse
from collections import defaultdict
from datetime import datetime

import pydantic
from botocore.exceptions import ClientError

from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from enums.upload_status import UploadStatus
from enums.virus_scan_result import VirusScanResult
from models.staging_metadata import (
METADATA_FILENAME,
BulkUploadQueueMetadata,
Expand All @@ -17,8 +21,15 @@
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
BulkUploadDynamoRepository,
)
from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository
from services.base.s3_service import S3Service
from services.base.sqs_service import SQSService
from services.bulk_upload.metadata_general_preprocessor import (
MetadataGeneralPreprocessor,
)
from services.bulk_upload.metadata_usb_preprocessor import (
MetadataUsbPreprocessorService,
)
from services.bulk_upload_metadata_preprocessor_service import (
MetadataPreprocessorService,
)
Expand All @@ -28,8 +39,10 @@
BulkUploadMetadataException,
InvalidFileNameException,
LGInvalidFilesException,
VirusScanFailedException,
)
from utils.lloyd_george_validator import validate_file_name
from utils.utilities import get_virus_scan_service

logger = LoggingService(__name__)
UNSUCCESSFUL = "Unsuccessful bulk upload"
Expand All @@ -47,6 +60,9 @@ def __init__(
self.s3_service = S3Service()
self.sqs_service = SQSService()
self.dynamo_repository = BulkUploadDynamoRepository()
self.s3_repo = BulkUploadS3Repository()
self.virus_scan_service = get_virus_scan_service()

self.metadata_heading_remap = metadata_heading_remap

self.temp_download_dir = tempfile.mkdtemp()
Expand Down Expand Up @@ -245,3 +261,71 @@ def clear_temp_storage(self):
shutil.rmtree(self.temp_download_dir)
except FileNotFoundError:
pass

def check_file_status(self, file_key: str):
scan_result = self.s3_repo.check_file_tag_status_on_staging_bucket(file_key)
if scan_result != VirusScanResult.CLEAN:
logger.info(f"Found an issue with the file {file_key}.")
raise VirusScanFailedException(
f"Encountered an issue when scanning the file {file_key}, scan result was {scan_result}"
)

def enforce_virus_scanner(self, file_key: str):
logger.info(
f"Checking virus scan result for file: {file_key} in {self.staging_bucket_name}"
)

try:
result = self.s3_repo.check_file_tag_status_on_staging_bucket(file_key)
if(result != ""):
logger.info("The file has been scanned before")
return
logger.info(f"Virus scan tag missing for {file_key}.")
self.virus_scan_service.scan_file(file_ref=file_key)

except ClientError as e:
error_message = str(e)
if "NoSuchKey" in error_message or "AccessDenied" in error_message:
logger.error(f"S3 access error when checking tag for {file_key}.")
raise BulkUploadMetadataException(
f"Failed to access S3 file {file_key} during tag check."
)
else:
raise

def handle_expedite_event(self, event):
try:
key_string = event["detail"]["object"]["key"]
key = urllib.parse.unquote_plus(key_string, encoding="utf-8")

if key.startswith("expedite/"):
logger.info("Processing file from expedite folder")

self.enforce_virus_scanner(key)
self.check_file_status(key)

return # To be added upon by ticket PRMP-540
else:
failure_msg = f"Unexpected directory or file location received from EventBridge: {key_string}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
except KeyError as e:
failure_msg = f"Failed due to missing key: {str(e)}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)


def get_formatter_service(raw_pre_format_type):
try:
pre_format_type = LloydGeorgePreProcessFormat(raw_pre_format_type)
if pre_format_type == LloydGeorgePreProcessFormat.GENERAL:
logger.info("Using general preFormatType")
return MetadataGeneralPreprocessor
elif pre_format_type == LloydGeorgePreProcessFormat.USB:
logger.info("Using usb preFormatType")
return MetadataUsbPreprocessorService
except ValueError:
logger.warning(
f"Invalid preFormatType: '{raw_pre_format_type}', defaulting to {LloydGeorgePreProcessFormat.GENERAL}."
)
return MetadataGeneralPreprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def eventbridge_event_with_s3_key(key: str):
return {
"source": "aws.s3",
"detail": {
"object":{
"key": key,
},
}
"object": {
"key": key,
},
},
}


Expand All @@ -41,35 +41,50 @@ def test_metadata_processor_lambda_handler_empty_event(
mock_metadata_service.process_metadata.assert_not_called()


def test_metadata_processor_lambda_handler_s3_event_triggers_expedite(
set_env, context, mock_metadata_service
):
event = {
"source": "aws.s3",
"detail": {
"object": {
"key": "expedite/folder/file.pdf",
}
},
}

lambda_handler(event, context)

mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()


def test_s3_event_with_expedite_key_processes(
set_env, context, mock_metadata_service, caplog
):
event = eventbridge_event_with_s3_key(
"expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
)
lambda_handler(event, context)

with caplog.at_level("INFO"):
lambda_handler(event, context)

assert any(
f"Handling EventBridge event from S3"
in r.message
for r in caplog.records
)
assert any(
"Processing file from expedite folder" in r.message for r in caplog.records
"Handling EventBridge event from S3" in r.message for r in caplog.records
)

mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()


def test_s3_event_with_non_expedite_key_is_rejected(
set_env, context, mock_metadata_service, caplog
):
key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf"
event = eventbridge_event_with_s3_key(key_string)

lambda_handler(event, context)
with caplog.at_level("INFO"):
lambda_handler(event, context)

assert any(
f"Unexpected directory or file location received from EventBridge: {key_string}"
in r.message
for r in caplog.records
)
mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()
Loading
Loading