Skip to content

Commit 6d89be0

Browse files
committed
Refactored and updated tests for filename proc
1 parent 2ddbe3e commit 6d89be0

11 files changed

+152
-633
lines changed
Lines changed: 6 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,20 @@
11
"""Add the filename to the audit table and check for duplicates."""
2-
3-
from typing import Union
4-
from boto3.dynamodb.conditions import Key
5-
from clients import dynamodb_client, dynamodb_resource, logger
6-
from errors import DuplicateFileError, UnhandledAuditTableError
7-
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, AUDIT_TABLE_FILENAME_GSI, AuditTableKeys, FileStatus
8-
9-
10-
def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
11-
"""
12-
Checks for queued files.
13-
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
14-
"""
15-
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
16-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
17-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
18-
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
19-
)
20-
21-
queued_files_details: list = queued_files_found_in_audit_table["Items"]
22-
23-
# Return the oldest queued file
24-
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None
25-
26-
27-
def ensure_file_is_not_a_duplicate(file_key: str, created_at_formatted_string: str) -> None:
28-
"""Raises an error if the file is a duplicate."""
29-
files_already_in_audit_table = (
30-
dynamodb_resource.Table(AUDIT_TABLE_NAME)
31-
.query(IndexName=AUDIT_TABLE_FILENAME_GSI, KeyConditionExpression=Key(AuditTableKeys.FILENAME).eq(file_key))
32-
.get("Items")
33-
)
34-
if files_already_in_audit_table:
35-
logger.error("%s file duplicate added to s3 at the following time: %s", file_key, created_at_formatted_string)
36-
raise DuplicateFileError(f"Duplicate file: {file_key} added at {created_at_formatted_string}")
2+
from clients import dynamodb_client, logger
3+
from errors import UnhandledAuditTableError
4+
from constants import AUDIT_TABLE_NAME, AuditTableKeys
375

386

397
def upsert_audit_table(
408
message_id: str,
419
file_key: str,
4210
created_at_formatted_str: str,
4311
queue_name: str,
44-
file_status: str,
45-
is_existing_file: bool,
46-
) -> bool:
12+
file_status: str
13+
) -> None:
4714
"""
48-
Updates the audit table with the file details. Returns a bool indicating whether the file status is queued
49-
(i.e. if the file has passed initial validation and there are no other files in the queue, then the file is status
50-
will be 'processing' and the file is ready to be sent for row level processing.)
15+
Updates the audit table with the file details
5116
"""
5217
try:
53-
# If the file is not new, then the lambda has been invoked by the next file in the queue for processing
54-
if is_existing_file:
55-
dynamodb_client.update_item(
56-
TableName=AUDIT_TABLE_NAME,
57-
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
58-
UpdateExpression="SET #status = :status",
59-
ExpressionAttributeNames={"#status": "status"},
60-
ExpressionAttributeValues={":status": {"S": file_status}},
61-
ConditionExpression="attribute_exists(message_id)",
62-
)
63-
logger.info("%s file set for processing, and the status successfully updated in audit table", file_key)
64-
return False
65-
66-
# If the file is not already processed, check whether there is a file ahead in the queue already processing
67-
if file_status not in (FileStatus.PROCESSED, FileStatus.DUPLICATE):
68-
files_in_processing = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
69-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
70-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
71-
& Key(AuditTableKeys.STATUS).eq(FileStatus.PROCESSING),
72-
)
73-
# TODO: There is a short time lag between a file being marked as processed, and the next queued file being
74-
# marked as processing. If a third file is added to the queue during this time this could result in
75-
# two files processing simultanously. This is a known issue which needs to be addressed in a future
76-
# iteration.
77-
if files_in_processing["Items"]:
78-
file_status = FileStatus.QUEUED
79-
logger.info("%s file queued for processing", file_key)
80-
8118
# Add to the audit table (regardless of whether it is a duplicate)
8219
dynamodb_client.put_item(
8320
TableName=AUDIT_TABLE_NAME,
@@ -92,9 +29,6 @@ def upsert_audit_table(
9229
)
9330
logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id)
9431

95-
# Return a bool indicating whether the file status is queued
96-
return True if file_status == FileStatus.QUEUED else False
97-
9832
except Exception as error: # pylint: disable = broad-exception-caught
9933
logger.error(error)
10034
raise UnhandledAuditTableError(error) from error

filenameprocessor/src/constants.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,7 @@
1313
)
1414

1515
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
16-
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")
1716
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
18-
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
19-
AUDIT_TABLE_FILENAME_GSI = "filename_index"
20-
DATA_SOURCES_BUCKET_SUFFIX = "data-sources"
2117
VALID_VERSIONS = ["V5"]
2218

2319
SUPPLIER_PERMISSIONS_HASH_KEY = "supplier_permissions"

filenameprocessor/src/file_name_processor.py

Lines changed: 92 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88

99
import argparse
1010
from uuid import uuid4
11-
from utils_for_filenameprocessor import get_created_at_formatted_string, move_file, invoke_filename_lambda
11+
from utils_for_filenameprocessor import get_created_at_formatted_string, move_file
1212
from file_key_validation import validate_file_key, is_file_in_directory_root
1313
from send_sqs_message import make_and_send_sqs_message
1414
from make_and_upload_ack_file import make_and_upload_the_ack_file
15-
from audit_table import upsert_audit_table, get_next_queued_file_details, ensure_file_is_not_a_duplicate
15+
from audit_table import upsert_audit_table
1616
from clients import logger
1717
from logging_decorator import logging_decorator
1818
from supplier_permissions import validate_vaccine_type_permissions
@@ -21,10 +21,9 @@
2121
InvalidFileKeyError,
2222
InvalidSupplierError,
2323
UnhandledAuditTableError,
24-
DuplicateFileError,
2524
UnhandledSqsError,
2625
)
27-
from constants import FileStatus, DATA_SOURCES_BUCKET_SUFFIX, ERROR_TYPE_TO_STATUS_CODE_MAP
26+
from constants import FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
2827

2928

3029
# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
@@ -47,115 +46,100 @@ def handle_record(record) -> dict:
4746
vaccine_type = "unknown"
4847
supplier = "unknown"
4948

50-
if DATA_SOURCES_BUCKET_SUFFIX in bucket_name:
51-
52-
# In addition to when a batch file is added to the S3 bucket root for processing, this Lambda is also invoked
53-
# when the file is moved to the processing/ directory and finally the /archive directory. We want to ignore
54-
# those events. Unfortunately S3 event filtering does not support triggering for root files only. See VED-781
55-
# for more info.
56-
if not is_file_in_directory_root(file_key):
57-
message = "Processing not required. Event was for a file moved to /archive or /processing"
58-
return {"statusCode": 200, "message": message, "file_key": file_key}
59-
60-
# Set default values for file-specific variables
61-
message_id = "Message id was not created"
62-
created_at_formatted_string = "created_at_time not identified"
63-
64-
try:
65-
# If the record contains a message_id, then the lambda has been invoked by a file already in the queue
66-
is_existing_file = "message_id" in record
67-
68-
# Get message_id if the file is not new, else assign one
69-
message_id = record.get("message_id", str(uuid4()))
70-
71-
created_at_formatted_string = get_created_at_formatted_string(bucket_name, file_key)
72-
73-
vaccine_type, supplier = validate_file_key(file_key)
74-
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
75-
if not is_existing_file:
76-
ensure_file_is_not_a_duplicate(file_key, created_at_formatted_string)
77-
78-
queue_name = f"{supplier}_{vaccine_type}"
79-
file_status_is_queued = upsert_audit_table(
80-
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.PROCESSING, is_existing_file
81-
)
82-
83-
if file_status_is_queued:
84-
message_for_logs = "File is successfully queued for processing"
85-
else:
86-
make_and_send_sqs_message(
87-
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
88-
)
89-
message_for_logs = "Successfully sent to SQS for further processing"
90-
91-
logger.info("Lambda invocation successful for file '%s'", file_key)
92-
93-
# Return details for logs
94-
return {
95-
"statusCode": 200,
96-
"message": message_for_logs,
97-
"file_key": file_key,
98-
"message_id": message_id,
99-
"vaccine_type": vaccine_type,
100-
"supplier": supplier,
101-
}
49+
if bucket_name != SOURCE_BUCKET_NAME:
50+
return handle_unexpected_bucket_name(bucket_name, file_key, vaccine_type, supplier)
10251

103-
except ( # pylint: disable=broad-exception-caught
104-
VaccineTypePermissionsError,
105-
InvalidFileKeyError,
106-
InvalidSupplierError,
107-
UnhandledAuditTableError,
108-
DuplicateFileError,
109-
UnhandledSqsError,
110-
Exception,
111-
) as error:
112-
logger.error("Error processing file '%s': %s", file_key, str(error))
113-
114-
file_status = FileStatus.DUPLICATE if isinstance(error, DuplicateFileError) else FileStatus.PROCESSED
115-
queue_name = f"{supplier}_{vaccine_type}"
116-
upsert_audit_table(
117-
message_id, file_key, created_at_formatted_string, queue_name, file_status, is_existing_file
118-
)
119-
120-
# Create ack file
121-
message_delivered = False
122-
make_and_upload_the_ack_file(message_id, file_key, message_delivered, created_at_formatted_string)
123-
124-
# Move file to archive
125-
move_file(bucket_name, file_key, f"archive/{file_key}")
126-
127-
# If there is another file waiting in the queue, invoke the filename lambda with the next file
128-
next_queued_file_details = get_next_queued_file_details(queue_name=f"{supplier}_{vaccine_type}")
129-
if next_queued_file_details:
130-
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
131-
132-
# Return details for logs
133-
return {
134-
"statusCode": ERROR_TYPE_TO_STATUS_CODE_MAP.get(type(error), 500),
135-
"message": "Infrastructure Level Response Value - Processing Error",
136-
"file_key": file_key,
137-
"message_id": message_id,
138-
"error": str(error),
139-
"vaccine_type": vaccine_type,
140-
"supplier": supplier
141-
}
52+
# In addition to when a batch file is added to the S3 bucket root for processing, this Lambda is also invoked
53+
# when the file is moved to the processing/ directory and finally the /archive directory. We want to ignore
54+
# those events. Unfortunately S3 event filtering does not support triggering for root files only. See VED-781
55+
# for more info.
56+
if not is_file_in_directory_root(file_key):
57+
message = "Processing not required. Event was for a file moved to /archive or /processing"
58+
return {"statusCode": 200, "message": message, "file_key": file_key}
59+
60+
# Set default values for file-specific variables
61+
message_id = "Message id was not created"
62+
created_at_formatted_string = "created_at_time not identified"
14263

143-
else:
144-
try:
145-
vaccine_type, supplier = validate_file_key(file_key)
146-
logger.error("Unable to process file %s due to unexpected bucket name %s", file_key, bucket_name)
147-
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
64+
try:
65+
message_id = str(uuid4())
66+
created_at_formatted_string = get_created_at_formatted_string(bucket_name, file_key)
67+
68+
vaccine_type, supplier = validate_file_key(file_key)
69+
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
70+
71+
queue_name = f"{supplier}_{vaccine_type}"
72+
upsert_audit_table(
73+
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.QUEUED
74+
)
75+
make_and_send_sqs_message(
76+
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
77+
)
78+
79+
logger.info("Lambda invocation successful for file '%s'", file_key)
80+
81+
# Return details for logs
82+
return {
83+
"statusCode": 200,
84+
"message": "Successfully sent to SQS for further processing",
85+
"file_key": file_key,
86+
"message_id": message_id,
87+
"vaccine_type": vaccine_type,
88+
"supplier": supplier,
89+
}
90+
91+
except ( # pylint: disable=broad-exception-caught
92+
VaccineTypePermissionsError,
93+
InvalidFileKeyError,
94+
InvalidSupplierError,
95+
UnhandledAuditTableError,
96+
UnhandledSqsError,
97+
Exception,
98+
) as error:
99+
logger.error("Error processing file '%s': %s", file_key, str(error))
100+
101+
queue_name = f"{supplier}_{vaccine_type}"
102+
upsert_audit_table(
103+
message_id, file_key, created_at_formatted_string, queue_name, FileStatus.PROCESSED
104+
)
105+
106+
# Create ack file
107+
message_delivered = False
108+
make_and_upload_the_ack_file(message_id, file_key, message_delivered, created_at_formatted_string)
109+
110+
# Move file to archive
111+
move_file(bucket_name, file_key, f"archive/{file_key}")
112+
113+
# Return details for logs
114+
return {
115+
"statusCode": ERROR_TYPE_TO_STATUS_CODE_MAP.get(type(error), 500),
116+
"message": "Infrastructure Level Response Value - Processing Error",
117+
"file_key": file_key,
118+
"message_id": message_id,
119+
"error": str(error),
120+
"vaccine_type": vaccine_type,
121+
"supplier": supplier
122+
}
123+
124+
125+
def handle_unexpected_bucket_name(bucket_name: str, file_key: str, vaccine_type: str, supplier: str) -> dict:
126+
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
127+
config and overarching design"""
128+
try:
129+
vaccine_type, supplier = validate_file_key(file_key)
130+
logger.error("Unable to process file %s due to unexpected bucket name %s", file_key, bucket_name)
131+
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
148132

149-
return {"statusCode": 500, "message": message, "file_key": file_key,
150-
"vaccine_type": vaccine_type, "supplier": supplier}
133+
return {"statusCode": 500, "message": message, "file_key": file_key,
134+
"vaccine_type": vaccine_type, "supplier": supplier}
151135

152-
except Exception as error:
153-
logger.error("Unable to process file due to unexpected bucket name %s and file key %s",
154-
bucket_name, file_key)
155-
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
136+
except Exception as error:
137+
logger.error("Unable to process file due to unexpected bucket name %s and file key %s",
138+
bucket_name, file_key)
139+
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
156140

157-
return {"statusCode": 500, "message": message, "file_key": file_key,
158-
"vaccine_type": vaccine_type, "supplier": supplier, "error": str(error)}
141+
return {"statusCode": 500, "message": message, "file_key": file_key,
142+
"vaccine_type": vaccine_type, "supplier": supplier, "error": str(error)}
159143

160144

161145
def lambda_handler(event: dict, context) -> None: # pylint: disable=unused-argument
Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
"""Utils for filenameprocessor lambda"""
2-
3-
import json
4-
from constants import SOURCE_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
5-
from clients import s3_client, logger, lambda_client
2+
from clients import s3_client, logger
63

74

85
def get_created_at_formatted_string(bucket_name: str, file_key: str) -> str:
@@ -18,19 +15,3 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
1815
)
1916
s3_client.delete_object(Bucket=bucket_name, Key=source_file_key)
2017
logger.info("File moved from %s to %s", source_file_key, destination_file_key)
21-
22-
23-
def invoke_filename_lambda(file_key: str, message_id: str) -> None:
24-
"""Invokes the filenameprocessor lambda with the given file key and message id"""
25-
try:
26-
lambda_payload = {
27-
"Records": [
28-
{"s3": {"bucket": {"name": SOURCE_BUCKET_NAME}, "object": {"key": file_key}}, "message_id": message_id}
29-
]
30-
}
31-
lambda_client.invoke(
32-
FunctionName=FILE_NAME_PROC_LAMBDA_NAME, InvocationType="Event", Payload=json.dumps(lambda_payload)
33-
)
34-
except Exception as error:
35-
logger.error("Error invoking filename lambda: %s", error)
36-
raise

0 commit comments

Comments
 (0)