Skip to content

Commit 27555af

Browse files
authored
VED-760 Improve empty file checking (#826)
1 parent 3bb91ac commit 27555af

File tree

10 files changed

+83
-82
lines changed

10 files changed

+83
-82
lines changed

filenameprocessor/src/constants.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
VaccineTypePermissionsError,
88
InvalidFileKeyError,
99
UnhandledAuditTableError,
10-
EmptyFileError,
1110
UnhandledSqsError
1211
)
1312

@@ -23,15 +22,11 @@
2322
ERROR_TYPE_TO_STATUS_CODE_MAP = {
2423
VaccineTypePermissionsError: 403,
2524
InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier
26-
EmptyFileError: 400,
2725
UnhandledAuditTableError: 500,
2826
UnhandledSqsError: 500,
2927
Exception: 500,
3028
}
3129

32-
# The size in bytes of an empty batch file containing only the headers row
33-
EMPTY_BATCH_FILE_SIZE_IN_BYTES = 700
34-
3530

3631
class FileStatus(StrEnum):
3732
"""File status constants"""

filenameprocessor/src/errors.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
"""Custom exceptions for the Filename Processor."""
22

33

4-
class EmptyFileError(Exception):
5-
"""A custom exception for when the batch file contains only the header row or is completely empty"""
6-
7-
84
class UnhandledAuditTableError(Exception):
95
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""
106

filenameprocessor/src/file_name_processor.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import argparse
1010
from uuid import uuid4
1111
from utils_for_filenameprocessor import get_creation_and_expiry_times, move_file
12-
from file_validation import validate_file_key, is_file_in_directory_root, validate_file_not_empty
12+
from file_validation import validate_file_key, is_file_in_directory_root
1313
from send_sqs_message import make_and_send_sqs_message
1414
from make_and_upload_ack_file import make_and_upload_the_ack_file
1515
from audit_table import upsert_audit_table
@@ -20,8 +20,7 @@
2020
VaccineTypePermissionsError,
2121
InvalidFileKeyError,
2222
UnhandledAuditTableError,
23-
UnhandledSqsError,
24-
EmptyFileError
23+
UnhandledSqsError
2524
)
2625
from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME
2726

@@ -68,8 +67,6 @@ def handle_record(record) -> dict:
6867
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
6968

7069
vaccine_type, supplier = validate_file_key(file_key)
71-
# VED-757: Known issue with suppliers sometimes sending empty files
72-
validate_file_not_empty(s3_response)
7370
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
7471

7572
queue_name = f"{supplier}_{vaccine_type}"
@@ -94,17 +91,12 @@ def handle_record(record) -> dict:
9491

9592
except ( # pylint: disable=broad-exception-caught
9693
VaccineTypePermissionsError,
97-
EmptyFileError,
9894
InvalidFileKeyError,
9995
UnhandledAuditTableError,
10096
UnhandledSqsError,
10197
Exception,
10298
) as error:
103-
if isinstance(error, EmptyFileError):
104-
# Avoid error log noise for accepted scenario in which supplier provides a batch file with no records
105-
logger.warning("Error processing file '%s': %s", file_key, str(error))
106-
else:
107-
logger.error("Error processing file '%s': %s", file_key, str(error))
99+
logger.error("Error processing file '%s': %s", file_key, str(error))
108100

109101
queue_name = f"{supplier}_{vaccine_type}"
110102
file_status = get_file_status_for_error(error)
@@ -137,8 +129,6 @@ def get_file_status_for_error(error: Exception) -> str:
137129
"""Creates a file status based on the type of error that was thrown"""
138130
if isinstance(error, VaccineTypePermissionsError):
139131
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
140-
elif isinstance(error, EmptyFileError):
141-
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}"
142132

143133
return FileStatus.FAILED
144134

filenameprocessor/src/file_validation.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
from re import match
44
from datetime import datetime
5-
from constants import VALID_VERSIONS, EMPTY_BATCH_FILE_SIZE_IN_BYTES
5+
from constants import VALID_VERSIONS
66
from elasticache import get_valid_vaccine_types_from_cache, get_supplier_system_from_cache
7-
from errors import InvalidFileKeyError, EmptyFileError
7+
from errors import InvalidFileKeyError
88

99

1010
def is_file_in_directory_root(file_key: str) -> bool:
@@ -72,9 +72,3 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
7272
raise InvalidFileKeyError("Initial file validation failed: invalid file key")
7373

7474
return vaccine_type, supplier
75-
76-
77-
def validate_file_not_empty(s3_response: dict) -> None:
78-
"""Checks that the batch file from S3 is not empty or containing only the header row"""
79-
if s3_response.get("ContentLength", 0) <= EMPTY_BATCH_FILE_SIZE_IN_BYTES:
80-
raise EmptyFileError("Initial file validation failed: batch file was empty")

filenameprocessor/tests/test_lambda_handler.py

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020
from tests.utils_for_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Sqs
2121
from tests.utils_for_tests.values_for_tests import MOCK_CREATED_AT_FORMATTED_STRING, MockFileDetails, \
22-
MOCK_BATCH_FILE_CONTENT, MOCK_FILE_HEADERS, MOCK_EXPIRES_AT
22+
MOCK_BATCH_FILE_CONTENT, MOCK_EXPIRES_AT
2323

2424
# Ensure environment variables are mocked before importing from src files
2525
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
@@ -196,45 +196,6 @@ def test_lambda_handler_new_file_success_and_first_in_queue(self):
196196
self.assert_sqs_message(file_details)
197197
self.assert_no_ack_file(file_details)
198198

199-
def test_lambda_handler_correctly_flags_empty_file(self):
200-
"""
201-
VED-757 Tests that for an empty batch file:
202-
* The file status is updated to 'Not processed - empty file' in the audit table
203-
* The message is not sent to SQS
204-
* The failure inf_ack file is created
205-
"""
206-
file_details = MockFileDetails.ravs_rsv_1
207-
208-
s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key, Body=MOCK_FILE_HEADERS)
209-
210-
with ( # noqa: E999
211-
patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999
212-
): # noqa: E999
213-
lambda_handler(
214-
self.make_event([self.make_record_with_message_id(file_details.file_key, file_details.message_id)]),
215-
None,
216-
)
217-
218-
expected_table_items = [
219-
{
220-
"message_id": {"S": file_details.message_id},
221-
"filename": {"S": file_details.file_key},
222-
"queue_name": {"S": "RAVS_RSV"},
223-
"status": {"S": "Not processed - Empty file"},
224-
"error_details": {"S": "Initial file validation failed: batch file was empty"},
225-
"timestamp": {"S": file_details.created_at_formatted_string},
226-
"expires_at": {"N": str(file_details.expires_at)},
227-
}
228-
]
229-
self.assertEqual(self.get_audit_table_items(), expected_table_items)
230-
self.assert_no_sqs_message()
231-
self.assert_ack_file_contents(file_details)
232-
self.mock_logger.warning.assert_called_once_with(
233-
"Error processing file '%s': %s",
234-
"RSV_Vaccinations_v5_X8E5B_20000101T00000001.csv",
235-
"Initial file validation failed: batch file was empty"
236-
)
237-
238199
def test_lambda_handler_non_root_file(self):
239200
"""
240201
Tests that when the file is not in the root of the source bucket, no action is taken:

recordprocessor/src/batch_processor.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
import json
44
import os
55
import time
6+
from csv import DictReader
67
from json import JSONDecodeError
78

8-
from constants import FileStatus
9+
from constants import FileStatus, FileNotProcessedReason, SOURCE_BUCKET_NAME, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME
910
from process_row import process_row
1011
from mappings import map_target_disease
1112
from audit_table import update_audit_table_status
1213
from send_to_kinesis import send_to_kinesis
1314
from clients import logger
14-
from file_level_validation import file_level_validation
15+
from file_level_validation import file_level_validation, file_is_empty, move_file
1516
from errors import NoOperationPermissions, InvalidHeaders
1617
from utils_for_recordprocessor import get_csv_content_dict_reader
1718
from typing import Optional
@@ -42,7 +43,6 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
4243

4344
target_disease = map_target_disease(vaccine)
4445

45-
row_count = 0
4646
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
4747
created_at_formatted_string, csv_reader, target_disease)
4848

@@ -55,22 +55,37 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
5555
encoder = new_encoder
5656

5757
# load alternative encoder
58-
csv_reader = get_csv_content_dict_reader(f"processing/{file_key}", encoder=encoder)
58+
csv_reader = get_csv_content_dict_reader(f"{PROCESSING_DIR_NAME}/{file_key}", encoder=encoder)
5959
# re-read the file and skip processed rows
6060
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
6161
created_at_formatted_string, csv_reader, target_disease, row_count)
6262
else:
6363
logger.error(f"Row Processing error: {err}")
6464
raise err
6565

66-
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
66+
file_status = FileStatus.PREPROCESSED
67+
68+
if file_is_empty(row_count):
69+
logger.warning("File was empty: %s. Moving file to archive directory.", file_key)
70+
move_file(SOURCE_BUCKET_NAME, f"{PROCESSING_DIR_NAME}/{file_key}", f"{ARCHIVE_DIR_NAME}/{file_key}")
71+
file_status = f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}"
72+
73+
update_audit_table_status(file_key, file_id, file_status)
6774
return row_count
6875

6976

7077
# Process the row to obtain the details needed for the message_body and ack file
71-
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
72-
csv_reader, target_disease,
73-
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
78+
def process_rows(
79+
file_id: str,
80+
vaccine: str,
81+
supplier: str,
82+
file_key: str,
83+
allowed_operations: set,
84+
created_at_formatted_string: str,
85+
csv_reader: DictReader,
86+
target_disease: list[dict],
87+
total_rows_processed_count: int = 0
88+
) -> tuple[int, Optional[Exception]]:
7489
"""
7590
Processes each row in the csv_reader starting from start_row.
7691
"""

recordprocessor/src/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
1111
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")
1212

13+
ARCHIVE_DIR_NAME = "archive"
14+
PROCESSING_DIR_NAME = "processing"
15+
1316
EXPECTED_CSV_HEADERS = [
1417
"NHS_NUMBER",
1518
"PERSON_FORENAME",
@@ -62,6 +65,7 @@ class FileStatus:
6265
class FileNotProcessedReason(StrEnum):
6366
"""Reasons why a file was not processed"""
6467
UNAUTHORISED = "Unauthorised"
68+
EMPTY = "Empty file"
6569

6670

6771
class AuditTableKeys:

recordprocessor/src/file_level_validation.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,29 @@
22
Functions for completing file-level validation
33
(validating headers and ensuring that the supplier has permission to perform at least one of the requested operations)
44
"""
5+
from csv import DictReader
6+
57
from clients import logger, s3_client
68
from make_and_upload_ack_file import make_and_upload_ack_file
79
from utils_for_recordprocessor import get_csv_content_dict_reader
810
from errors import InvalidHeaders, NoOperationPermissions
911
from logging_decorator import file_level_validation_logging_decorator
1012
from audit_table import update_audit_table_status
1113
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission, \
12-
FileNotProcessedReason
14+
FileNotProcessedReason, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME
1315

1416

15-
def validate_content_headers(csv_content_reader) -> None:
17+
def validate_content_headers(csv_content_reader: DictReader) -> None:
1618
"""Raises an InvalidHeaders error if the headers in the CSV file do not match the expected headers."""
1719
if csv_content_reader.fieldnames != EXPECTED_CSV_HEADERS:
1820
raise InvalidHeaders("File headers are invalid.")
1921

2022

23+
def file_is_empty(row_count: int) -> bool:
24+
"""Simple helper for readability to check if no rows were processed in a file i.e. empty"""
25+
return row_count == 0
26+
27+
2128
def get_permitted_operations(
2229
supplier: str, vaccine_type: str, allowed_permissions_list: list
2330
) -> set:
@@ -94,7 +101,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
94101

95102
make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string)
96103

97-
move_file(SOURCE_BUCKET_NAME, file_key, f"processing/{file_key}")
104+
move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}")
98105

99106
return {
100107
"message_id": message_id,
@@ -118,7 +125,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
118125
if isinstance(error, NoOperationPermissions) else FileStatus.FAILED
119126

120127
try:
121-
move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}")
128+
move_file(SOURCE_BUCKET_NAME, file_key, f"{ARCHIVE_DIR_NAME}/{file_key}")
122129
except Exception as move_file_error:
123130
logger.error("Failed to move file to archive: %s", move_file_error)
124131

recordprocessor/tests/test_recordprocessor_main.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ def make_kinesis_assertions(self, test_cases):
153153
kinesis_data.pop(key_to_ignore)
154154
self.assertEqual(kinesis_data, expected_kinesis_data)
155155

156+
def assert_object_moved_to_archive(self, file_key: str) -> None:
157+
"""Checks that the S3 object was moved to the archive directory"""
158+
with self.assertRaises(s3_client.exceptions.NoSuchKey):
159+
s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"processing/{file_key}")
160+
161+
response = s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"archive/{file_key}")
162+
self.assertIsNotNone(response)
163+
156164
def test_e2e_full_permissions(self):
157165
"""
158166
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier has
@@ -402,6 +410,36 @@ def test_e2e_kinesis_failed(self):
402410
" not found."}
403411
})
404412

413+
def test_e2e_empty_file_is_flagged_and_processed_correctly(self):
414+
"""
415+
Tests files that contain only the headers and no records are marked as empty and moved to archive.
416+
"""
417+
test_cases = [
418+
("File containing only headers", ValidMockFileContent.headers),
419+
("File containing headers and new line", ValidMockFileContent.headers + "\n"),
420+
("File containing headers and multiple new lines", ValidMockFileContent.empty_file_with_multiple_new_lines)
421+
]
422+
for description, file_content in test_cases:
423+
424+
with self.subTest(description=description):
425+
self.mock_batch_processor_logger.reset_mock()
426+
test_file = mock_rsv_emis_file
427+
self.upload_source_files(file_content)
428+
add_entry_to_table(test_file, FileStatus.PROCESSING)
429+
430+
main(test_file.event_full_permissions)
431+
432+
kinesis_records = kinesis_client.get_records(
433+
ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
434+
435+
self.mock_batch_processor_logger.warning.assert_called_once_with(
436+
"File was empty: %s. Moving file to archive directory.",
437+
"RSV_Vaccinations_v5_8HK48_20210730T12000000.csv"
438+
)
439+
self.assertListEqual(kinesis_records, [])
440+
assert_audit_table_entry(test_file, "Not processed - Empty file")
441+
self.assert_object_moved_to_archive(test_file.file_key)
442+
405443
def test_e2e_error_is_logged_if_invalid_json_provided(self):
406444
"""This scenario should not happen. If it does, it means our batch processing system config is broken and we
407445
have received malformed content from SQS -> EventBridge. In this case we log the error so we will be alerted.

recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class ValidMockFileContent:
125125
with_new_and_update_and_delete = (
126126
headers + "\n" + MockFileRows.NEW + "\n" + MockFileRows.UPDATE + "\n" + MockFileRows.DELETE
127127
)
128+
empty_file_with_multiple_new_lines = MockFileRows.HEADERS + "\n".join(["\n" for i in range(100)])
128129

129130

130131
class FileDetails:

0 commit comments

Comments
 (0)