Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 16 additions & 30 deletions lambdas/handlers/bulk_upload_metadata_processor_handler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from services.bulk_upload.metadata_general_preprocessor import (
MetadataGeneralPreprocessor,
)
from services.bulk_upload.metadata_usb_preprocessor import (
MetadataUsbPreprocessorService,
)
from services.bulk_upload_metadata_processor_service import (
BulkUploadMetadataProcessorService,
get_formatter_service,
)
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
Expand All @@ -24,11 +19,25 @@
)
@handle_lambda_exceptions
def lambda_handler(event, _context):
practice_directory = event.get("practiceDirectory", "")
raw_pre_format_type = event.get(
"preFormatType", LloydGeorgePreProcessFormat.GENERAL
)
formatter_service_class = get_formatter_service(raw_pre_format_type)
practice_directory = event.get("practiceDirectory", "")

remappings = event.get("metadataFieldRemappings", {})
metadata_formatter_service = formatter_service_class(practice_directory)
metadata_service = BulkUploadMetadataProcessorService(
metadata_formatter_service=metadata_formatter_service,
metadata_heading_remap=remappings,
)

if "source" in event and event.get("source") == "aws.s3":
logger.info("Handling EventBridge event from S3")

metadata_service.handle_expedite_event(event)
return

if not practice_directory:
logger.error(
"Failed to start metadata processing due to missing practice directory"
Expand All @@ -39,27 +48,4 @@ def lambda_handler(event, _context):
f"Starting metadata processing for practice directory: {practice_directory}"
)

remappings = event.get("metadataFieldRemappings", {})

metadata_formatter_service = formatter_service_class(practice_directory)
metadata_service = BulkUploadMetadataProcessorService(
metadata_formatter_service=metadata_formatter_service,
metadata_heading_remap=remappings,
)
metadata_service.process_metadata()


def get_formatter_service(raw_pre_format_type):
try:
pre_format_type = LloydGeorgePreProcessFormat(raw_pre_format_type)
if pre_format_type == LloydGeorgePreProcessFormat.GENERAL:
logger.info("Using general preFormatType")
return MetadataGeneralPreprocessor
elif pre_format_type == LloydGeorgePreProcessFormat.USB:
logger.info("Using usb preFormatType")
return MetadataUsbPreprocessorService
except ValueError:
logger.warning(
f"Invalid preFormatType: '{raw_pre_format_type}', defaulting to {LloydGeorgePreProcessFormat.GENERAL}."
)
return MetadataGeneralPreprocessor
31 changes: 31 additions & 0 deletions lambdas/repositories/bulk_upload/bulk_upload_s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,34 @@ def rollback_transaction(self):

def file_exists_on_staging_bucket(self, file_key: str) -> bool:
return self.s3_repository.file_exist_on_s3(self.staging_bucket_name, file_key)

def check_file_tag_status(self, file_key: str) -> str:
"""
Retrieves the virus scan tag value for a single file.
Raises specific exceptions based on the tag's presence or S3 access.
"""
s3_service = self.s3_repository

try:
# Call the underlying S3 method to get the tag value
raw_scan_result = s3_service.get_tag_value(
s3_bucket_name=self.staging_bucket_name,
file_key=file_key,
tag_key=SCAN_RESULT_TAG_KEY,
)
return raw_scan_result

except TagNotFoundException:
raise VirusScanNoResultException(
f"Virus scan result not found for document: {file_key}"
)
except ClientError as e:
error_msg = str(e)
if "AccessDenied" in str(e) or "NoSuchKey" in error_msg:
_logger.info(
f"Failed to check object tag for given file_path: {file_key}"
)
_logger.info("file_path may be incorrect or contain invalid character")
raise S3FileNotFoundException(f"Failed to access file {file_key}")
else:
raise e
83 changes: 83 additions & 0 deletions lambdas/services/bulk_upload_metadata_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
import os
import shutil
import tempfile
import urllib.parse
import uuid
from collections import defaultdict
from datetime import datetime

import pydantic
from botocore.exceptions import ClientError
from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from enums.upload_status import UploadStatus
from enums.virus_scan_result import VirusScanResult
from models.staging_metadata import (
METADATA_FILENAME,
BulkUploadQueueMetadata,
Expand All @@ -18,8 +21,15 @@
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
BulkUploadDynamoRepository,
)
from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository
from services.base.s3_service import S3Service
from services.base.sqs_service import SQSService
from services.bulk_upload.metadata_general_preprocessor import (
MetadataGeneralPreprocessor,
)
from services.bulk_upload.metadata_usb_preprocessor import (
MetadataUsbPreprocessorService,
)
from services.bulk_upload_metadata_preprocessor_service import (
MetadataPreprocessorService,
)
Expand All @@ -29,8 +39,10 @@
BulkUploadMetadataException,
InvalidFileNameException,
LGInvalidFilesException,
VirusScanNoResultException,
)
from utils.lloyd_george_validator import validate_file_name
from utils.utilities import get_virus_scan_service

logger = LoggingService(__name__)
UNSUCCESSFUL = "Unsuccessful bulk upload"
Expand All @@ -48,6 +60,9 @@ def __init__(
self.s3_service = S3Service()
self.sqs_service = SQSService()
self.dynamo_repository = BulkUploadDynamoRepository()
self.s3_repo = BulkUploadS3Repository()
self.virus_scan_service = get_virus_scan_service()

self.metadata_heading_remap = metadata_heading_remap

self.temp_download_dir = tempfile.mkdtemp()
Expand Down Expand Up @@ -248,3 +263,71 @@ def clear_temp_storage(self):
shutil.rmtree(self.temp_download_dir)
except FileNotFoundError:
pass

def check_file_status(self, file_key: str):
scan_result = self.s3_repo.check_file_tag_status(file_key)
if scan_result != VirusScanResult.CLEAN:
logger.info(f"Found an issue with the file {file_key}.")

def enforce_virus_scanner(self, file_key: str):
logger.info(
f"Checking virus scan result for file: {file_key} in {self.staging_bucket_name}"
)

try:
self.s3_repo.check_file_tag_status(file_key)
logger.info("The file has been scanned before")
return

except VirusScanNoResultException:
# File has not been scanned.
logger.info(f"Virus scan tag missing for {file_key}.")
# force scan the file
self.virus_scan_service.scan_file(file_ref=file_key)

except ClientError as e:
error_message = str(e)
if "NoSuchKey" in error_message or "AccessDenied" in error_message:
logger.error(f"S3 access error when checking tag for {file_key}.")
raise BulkUploadMetadataException(
f"Failed to access S3 file {file_key} during tag check."
)
else:
raise

def handle_expedite_event(self, event):
try:
key_string = event["detail"]["object"]["key"]
key = urllib.parse.unquote_plus(key_string, encoding="utf-8")

if key.startswith("expedite/"):
logger.info("Processing file from expedite folder")

self.enforce_virus_scanner(key)
self.check_file_status(key)

return # To be added upon by ticket PRMP-540
else:
failure_msg = f"Unexpected directory or file location received from EventBridge: {key_string}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)
except KeyError as e:
failure_msg = f"Failed due to missing key: {str(e)}"
logger.error(failure_msg)
raise BulkUploadMetadataException(failure_msg)


def get_formatter_service(raw_pre_format_type):
try:
pre_format_type = LloydGeorgePreProcessFormat(raw_pre_format_type)
if pre_format_type == LloydGeorgePreProcessFormat.GENERAL:
logger.info("Using general preFormatType")
return MetadataGeneralPreprocessor
elif pre_format_type == LloydGeorgePreProcessFormat.USB:
logger.info("Using usb preFormatType")
return MetadataUsbPreprocessorService
except ValueError:
logger.warning(
f"Invalid preFormatType: '{raw_pre_format_type}', defaulting to {LloydGeorgePreProcessFormat.GENERAL}."
)
return MetadataGeneralPreprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,21 @@ def test_metadata_processor_lambda_handler_empty_event(
lambda_handler({}, context)

mock_metadata_service.process_metadata.assert_not_called()


def test_metadata_processor_lambda_handler_s3_event_triggers_expedite(
set_env, context, mock_metadata_service
):
event = {
"source": "aws.s3",
"detail": {
"object": {
"key": "expedite/folder/file.pdf",
}
},
}

lambda_handler(event, context)

mock_metadata_service.handle_expedite_event.assert_called_once_with(event)
mock_metadata_service.process_metadata.assert_not_called()
Loading