Skip to content

Commit b41f0af

Browse files
committed
refactor filenameprocessor file path into two
1 parent ee5f86b commit b41f0af

File tree

4 files changed

+224
-90
lines changed

4 files changed

+224
-90
lines changed

lambdas/filenameprocessor/src/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
)
1212

1313
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
14-
DPS_DESTINATION_BUCKET_NAME = os.getenv("DPS_DESTINATION_BUCKET_NAME")
14+
DPS_DESTINATION_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
1515
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
1616
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
1717
VALID_VERSIONS = ["V5"]

lambdas/filenameprocessor/src/file_name_processor.py

Lines changed: 154 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ def handle_record(record) -> dict:
5555
"error": str(error),
5656
}
5757

58-
vaccine_type = "unknown"
59-
supplier = "unknown"
6058
expiry_timestamp = "unknown"
6159

6260
if bucket_name != SOURCE_BUCKET_NAME:
@@ -74,56 +72,128 @@ def handle_record(record) -> dict:
7472
message_id = "Message id was not created"
7573
created_at_formatted_string = "created_at_time not identified"
7674

77-
try:
78-
message_id = str(uuid4())
79-
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
80-
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
75+
message_id = str(uuid4())
76+
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
77+
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
8178

82-
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
83-
queue_name = validate_extended_attributes_file_key(file_key)
84-
move_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
79+
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
80+
return handle_extended_attributes_file(
81+
file_key,
82+
bucket_name,
83+
message_id,
84+
created_at_formatted_string,
85+
expiry_timestamp,
86+
)
87+
else:
88+
return handle_batch_file(
89+
file_key,
90+
bucket_name,
91+
message_id,
92+
created_at_formatted_string,
93+
expiry_timestamp,
94+
)
95+
96+
97+
def get_file_status_for_error(error: Exception) -> str:
98+
"""Creates a file status based on the type of error that was thrown"""
99+
if isinstance(error, VaccineTypePermissionsError):
100+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
85101

86-
upsert_audit_table(
87-
message_id,
102+
return FileStatus.FAILED
103+
104+
105+
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
106+
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
107+
config and overarching design"""
108+
try:
109+
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
110+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
111+
logger.error(
112+
"Unable to process file %s due to unexpected bucket name %s",
88113
file_key,
89-
created_at_formatted_string,
90-
expiry_timestamp,
91-
queue_name,
92-
FileStatus.PROCESSING,
114+
bucket_name,
93115
)
116+
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
117+
return {
118+
"statusCode": 500,
119+
"message": message,
120+
"file_key": file_key,
121+
"vaccine_supplier_info": extended_attribute_identifier,
122+
}
94123
else:
95124
vaccine_type, supplier = validate_batch_file_key(file_key)
96-
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
97-
queue_name = f"{supplier}_{vaccine_type}"
98-
upsert_audit_table(
99-
message_id,
100-
file_key,
101-
created_at_formatted_string,
102-
expiry_timestamp,
103-
queue_name,
104-
FileStatus.QUEUED,
105-
)
106-
make_and_send_sqs_message(
125+
logger.error(
126+
"Unable to process file %s due to unexpected bucket name %s",
107127
file_key,
108-
message_id,
109-
permissions,
110-
vaccine_type,
111-
supplier,
112-
created_at_formatted_string,
128+
bucket_name,
113129
)
130+
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
114131

115-
logger.info("Lambda invocation successful for file '%s'", file_key)
116-
117-
# Return details for logs
118132
return {
119-
"statusCode": 200,
120-
"message": "Successfully sent to SQS for further processing",
133+
"statusCode": 500,
134+
"message": message,
121135
"file_key": file_key,
122-
"message_id": message_id,
123136
"vaccine_type": vaccine_type,
124137
"supplier": supplier,
125138
}
126139

140+
except Exception as error:
141+
logger.error(
142+
"Unable to process file due to unexpected bucket name %s and file key %s",
143+
bucket_name,
144+
file_key,
145+
)
146+
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
147+
148+
return {
149+
"statusCode": 500,
150+
"message": message,
151+
"file_key": file_key,
152+
"vaccine_type": "unknown",
153+
"supplier": "unknown",
154+
"error": str(error),
155+
}
156+
157+
158+
def handle_batch_file(file_key, bucket_name, message_id, created_at_formatted_string, expiry_timestamp) -> dict:
159+
"""
160+
Processes a single record for batch file.
161+
Returns a dictionary containing information to be included in the logs.
162+
"""
163+
vaccine_type = "unknown"
164+
supplier = "unknown"
165+
try:
166+
vaccine_type, supplier = validate_batch_file_key(file_key)
167+
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
168+
queue_name = f"{supplier}_{vaccine_type}"
169+
170+
upsert_audit_table(
171+
message_id,
172+
file_key,
173+
created_at_formatted_string,
174+
expiry_timestamp,
175+
queue_name,
176+
FileStatus.QUEUED,
177+
)
178+
make_and_send_sqs_message(
179+
file_key,
180+
message_id,
181+
permissions,
182+
vaccine_type,
183+
supplier,
184+
created_at_formatted_string,
185+
)
186+
logger.info("Lambda invocation successful for file '%s'", file_key)
187+
188+
return {
189+
"statusCode": 200,
190+
"message": "Batch file successfully processed",
191+
"file_key": file_key,
192+
"message_id": message_id,
193+
"vaccine_type": vaccine_type,
194+
"supplier": supplier,
195+
"queue_name": queue_name,
196+
}
127197
except ( # pylint: disable=broad-exception-caught
128198
VaccineTypePermissionsError,
129199
InvalidFileKeyError,
@@ -133,8 +203,8 @@ def handle_record(record) -> dict:
133203
) as error:
134204
logger.error("Error processing file '%s': %s", file_key, str(error))
135205

136-
queue_name = f"{supplier}_{vaccine_type}"
137206
file_status = get_file_status_for_error(error)
207+
queue_name = f"{supplier}_{vaccine_type}"
138208

139209
upsert_audit_table(
140210
message_id,
@@ -165,64 +235,61 @@ def handle_record(record) -> dict:
165235
}
166236

167237

168-
def get_file_status_for_error(error: Exception) -> str:
169-
"""Creates a file status based on the type of error that was thrown"""
170-
if isinstance(error, VaccineTypePermissionsError):
171-
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
172-
173-
return FileStatus.FAILED
174-
175-
176-
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
177-
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
178-
config and overarching design"""
238+
def handle_extended_attributes_file(
239+
file_key, bucket_name, message_id, created_at_formatted_string, expiry_timestamp
240+
) -> dict:
241+
"""
242+
Processes a single record for extended attributes file.
243+
Returns a dictionary containing information to be included in the logs.
244+
"""
179245
try:
180-
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
181-
validate_extended_attributes_file_key(file_key)
182-
logger.error(
183-
"Unable to process file %s due to unexpected bucket name %s",
184-
file_key,
185-
bucket_name,
186-
)
187-
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
188-
return {
189-
"statusCode": 500,
190-
"message": message,
191-
"file_key": file_key,
192-
"vaccine_type": "extended_attributes",
193-
"supplier": "unknown",
194-
}
195-
else:
196-
vaccine_type, supplier = validate_batch_file_key(file_key)
197-
logger.error(
198-
"Unable to process file %s due to unexpected bucket name %s",
199-
file_key,
200-
bucket_name,
201-
)
202-
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
246+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
247+
move_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
248+
queue_name = extended_attribute_identifier
203249

204-
return {
205-
"statusCode": 500,
206-
"message": message,
207-
"file_key": file_key,
208-
"vaccine_type": vaccine_type,
209-
"supplier": supplier,
210-
}
250+
upsert_audit_table(
251+
message_id,
252+
file_key,
253+
created_at_formatted_string,
254+
expiry_timestamp,
255+
queue_name,
256+
FileStatus.PROCESSING,
257+
)
258+
return {
259+
"statusCode": 200,
260+
"message": "Extended Attributes file successfully processed",
261+
"file_key": file_key,
262+
"message_id": message_id,
263+
"queue_name": queue_name,
264+
}
265+
except ( # pylint: disable=broad-exception-caught
266+
VaccineTypePermissionsError,
267+
InvalidFileKeyError,
268+
UnhandledAuditTableError,
269+
UnhandledSqsError,
270+
Exception,
271+
) as error:
272+
logger.error("Error processing file '%s': %s", file_key, str(error))
211273

212-
except Exception as error:
213-
logger.error(
214-
"Unable to process file due to unexpected bucket name %s and file key %s",
215-
bucket_name,
274+
file_status = get_file_status_for_error(error)
275+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
276+
queue_name = extended_attribute_identifier
277+
278+
upsert_audit_table(
279+
message_id,
216280
file_key,
281+
created_at_formatted_string,
282+
expiry_timestamp,
283+
extended_attribute_identifier,
284+
file_status,
285+
error_details=str(error),
217286
)
218-
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
219287

220288
return {
221289
"statusCode": 500,
222-
"message": message,
290+
"message": f"Failed to process extended attributes file {file_key} from bucket {bucket_name}",
223291
"file_key": file_key,
224-
"vaccine_type": "unknown",
225-
"supplier": "unknown",
292+
"message_id": message_id,
226293
"error": str(error),
227294
}
228295

lambdas/filenameprocessor/tests/test_lambda_handler.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
MOCK_BATCH_FILE_CONTENT,
2929
MOCK_CREATED_AT_FORMATTED_STRING,
3030
MOCK_EXPIRES_AT,
31+
MOCK_EXTENDED_ATTRIBUTES_FILE_CONTENT,
3132
MockFileDetails,
3233
)
3334

@@ -239,6 +240,67 @@ def test_lambda_handler_non_root_file(self):
239240
self.assert_no_sqs_message()
240241
self.assert_no_ack_file(file_details)
241242

243+
def test_lambda_handler_extended_attributes_success(self):
244+
"""
245+
Tests that for an extended attributes file (prefix starts with 'Vaccination_Extended_Attributes'):
246+
* The file is added to the audit table with a status of 'Processing'
247+
* The queue_name stored is the extended attribute identifier
248+
* The file is moved to the destination bucket under archive/
249+
* No SQS message is sent
250+
* No ack file is created
251+
"""
252+
253+
# Build an extended attributes file.
254+
# FileDetails supports this when vaccine_type starts with 'Vaccination_Extended_Attributes'.
255+
test_cases = [MockFileDetails.extended_attributes_file]
256+
257+
# Put file in source bucket
258+
s3_client.put_object(
259+
Bucket=BucketNames.SOURCE,
260+
Key=test_cases[0].file_key,
261+
Body=MOCK_EXTENDED_ATTRIBUTES_FILE_CONTENT,
262+
)
263+
264+
# Patch uuid4 (message id), the identifier extraction, and prevent external copy issues by simulating move
265+
with (
266+
patch("file_name_processor.uuid4", return_value=test_cases[0].message_id),
267+
patch(
268+
"file_name_processor.validate_extended_attributes_file_key",
269+
return_value=test_cases[0].ods_code + "_COVID",
270+
),
271+
patch(
272+
"file_name_processor.move_file_outside_bucket",
273+
side_effect=lambda src_bucket, key, dst_bucket, dst_key: (
274+
s3_client.put_object(
275+
Bucket=BucketNames.DESTINATION,
276+
Key=dst_key,
277+
Body=s3_client.get_object(Bucket=src_bucket, Key=key)["Body"].read(),
278+
),
279+
s3_client.delete_object(Bucket=src_bucket, Key=key),
280+
),
281+
),
282+
):
283+
lambda_handler(self.make_event([self.make_record(test_cases[0].file_key)]), None)
284+
285+
# Assert audit table entry captured with Processing and queue_name set to the identifier
286+
table_items = self.get_audit_table_items()
287+
self.assertEqual(len(table_items), 1)
288+
item = table_items[0]
289+
self.assertEqual(item[AuditTableKeys.MESSAGE_ID]["S"], test_cases[0].message_id)
290+
self.assertEqual(item[AuditTableKeys.FILENAME]["S"], test_cases[0].file_key)
291+
self.assertEqual(item[AuditTableKeys.QUEUE_NAME]["S"], test_cases[0].ods_code + "_COVID")
292+
self.assertEqual(item[AuditTableKeys.TIMESTAMP]["S"], test_cases[0].created_at_formatted_string)
293+
self.assertEqual(item[AuditTableKeys.EXPIRES_AT]["N"], str(test_cases[0].expires_at))
294+
# File should be moved to destination under archive/
295+
dest_key = f"archive/{test_cases[0].file_key}"
296+
print(f" destination file is at {s3_client.list_objects(Bucket=BucketNames.DESTINATION)}")
297+
retrieved = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=dest_key)
298+
self.assertIsNotNone(retrieved)
299+
300+
# No SQS and no ack file
301+
self.assert_no_sqs_message()
302+
self.assert_no_ack_file(test_cases[0])
303+
242304
@patch("elasticache.get_redis_client")
243305
def test_lambda_invalid_file_key_no_other_files_in_queue(self, mock_get_redis_client):
244306
"""

0 commit comments

Comments
 (0)