Skip to content

Commit ab8cf7b

Browse files
authored
Merge branch 'master' into dependabot/pip/ack_backend/pip-minor-patch-81abc5c005
2 parents b21efad + b81b753 commit ab8cf7b

39 files changed

+673
-216
lines changed

batch_processor_filter/src/batch_audit_repository.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class BatchAuditRepository:
1212
| Key(AuditTableKeys.STATUS).eq(FileStatus.PREPROCESSED)
1313
| Key(AuditTableKeys.STATUS).eq(FileStatus.PROCESSING)
1414
)
15+
_PROCESSING_AND_FAILED_STATUSES = {FileStatus.PROCESSING, FileStatus.FAILED}
1516

1617
def __init__(self):
1718
self._batch_audit_table = boto3.resource("dynamodb", region_name=REGION_NAME).Table(AUDIT_TABLE_NAME)
@@ -25,16 +26,20 @@ def is_duplicate_file(self, file_key: str) -> bool:
2526

2627
return len(matching_files) > 0
2728

28-
def is_event_processing_for_supplier_and_vacc_type(self, supplier: str, vacc_type: str) -> bool:
29+
def is_event_processing_or_failed_for_supplier_and_vacc_type(self, supplier: str, vacc_type: str) -> bool:
2930
queue_name = f"{supplier}_{vacc_type}"
3031

31-
files_in_processing = self._batch_audit_table.query(
32-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
33-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name) & Key(AuditTableKeys.STATUS)
34-
.eq(FileStatus.PROCESSING)
35-
).get("Items", [])
32+
for status in self._PROCESSING_AND_FAILED_STATUSES:
33+
files_in_queue = self._batch_audit_table.query(
34+
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
35+
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name) & Key(AuditTableKeys.STATUS)
36+
.eq(status)
37+
).get("Items", [])
38+
39+
if len(files_in_queue) > 0:
40+
return True
3641

37-
return len(files_in_processing) > 0
42+
return False
3843

3944
def update_status(self, message_id: str, updated_status: str) -> None:
4045
self._batch_audit_table.update_item(

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
"""Batch processor filter service module"""
22
import boto3
33
import json
4-
import logging
54

65
from batch_audit_repository import BatchAuditRepository
76
from batch_file_created_event import BatchFileCreatedEvent
87
from batch_file_repository import BatchFileRepository
9-
from constants import REGION_NAME, FileStatus, QUEUE_URL
8+
from constants import REGION_NAME, FileStatus, QUEUE_URL, FileNotProcessedReason
109
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError
10+
from logger import logger
1111
from send_log_to_firehose import send_log_to_firehose
1212

13-
logging.basicConfig(level="INFO")
14-
logger = logging.getLogger()
15-
logger.setLevel("INFO")
16-
1713

1814
class BatchProcessorFilterService:
1915
"""Batch processor filter service class. Provides the business logic for the Lambda function"""
@@ -38,15 +34,21 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
3834

3935
if self._is_duplicate_file(filename):
4036
# Mark as processed and return without error so next event will be picked up from queue
41-
logger.info("A duplicate file has already been processed. Filename: %s", filename)
42-
self._batch_audit_repository.update_status(message_id, FileStatus.DUPLICATE)
37+
logger.error("A duplicate file has already been processed. Filename: %s", filename)
38+
self._batch_audit_repository.update_status(
39+
message_id,
40+
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
41+
)
4342
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
4443
self._batch_file_repo.move_source_file_to_archive(filename)
4544
return
4645

47-
if self._batch_audit_repository.is_event_processing_for_supplier_and_vacc_type(supplier, vaccine_type):
46+
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
47+
supplier,
48+
vaccine_type
49+
):
4850
# Raise error so event is returned to queue and retried again later
49-
logger.info("Batch event already being processed for supplier and vacc type. Filename: %s", filename)
51+
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
5052
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
5153
f"{supplier} and vacc type: {vaccine_type}")
5254

batch_processor_filter/src/constants.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ class FileStatus(StrEnum):
1818
PROCESSING = "Processing"
1919
PREPROCESSED = "Preprocessed"
2020
PROCESSED = "Processed"
21-
DUPLICATE = "Not processed - duplicate"
21+
NOT_PROCESSED = "Not processed"
22+
FAILED = "Failed"
23+
24+
25+
class FileNotProcessedReason(StrEnum):
26+
"""Reasons why a file was not processed"""
27+
DUPLICATE = "Duplicate"
2228

2329

2430
class AuditTableKeys(StrEnum):
@@ -29,3 +35,4 @@ class AuditTableKeys(StrEnum):
2935
QUEUE_NAME = "queue_name"
3036
STATUS = "status"
3137
TIMESTAMP = "timestamp"
38+
ERROR_DETAILS = "error_details"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Module for the batch processor filter Lambda exception wrapper"""
2+
from functools import wraps
3+
from typing import Callable
4+
5+
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError
6+
from logger import logger
7+
8+
9+
def exception_decorator(func: Callable):
10+
"""Wrapper for the Lambda Handler. It ensures that any unhandled exceptions are logged for monitoring and alerting
11+
purposes."""
12+
13+
@wraps(func)
14+
def exception_wrapper(*args, **kwargs):
15+
try:
16+
func(*args, **kwargs)
17+
except EventAlreadyProcessingForSupplierAndVaccTypeError as exc:
18+
# Re-raise so event will be returned to SQS and retried for this expected error
19+
raise exc
20+
except Exception as exc: # pylint:disable = broad-exception-caught
21+
logger.error("An unhandled exception occurred in the batch processor filter Lambda", exc_info=exc)
22+
raise exc
23+
24+
return exception_wrapper

batch_processor_filter/src/lambda_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33

44
from batch_file_created_event import BatchFileCreatedEvent
55
from batch_processor_filter_service import BatchProcessorFilterService
6+
from exception_decorator import exception_decorator
67
from exceptions import InvalidBatchSizeError
78

89

910
service = BatchProcessorFilterService()
1011

1112

12-
def lambda_handler(event: events.SQSEvent, _: context):
13+
@exception_decorator
14+
def lambda_handler(event: events.SQSEvent, _: context.Context):
1315
event_records = event.get("Records", [])
1416

1517
# Terraform is configured so the Lambda will get a max batch size of 1. We are using SQS FIFO with the message group
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Module for the batch processor filter Lambda logger"""
2+
import logging
3+
4+
5+
logging.basicConfig(level="INFO")
6+
logger = logging.getLogger()
7+
logger.setLevel("INFO")

batch_processor_filter/tests/test_lambda_handler.py

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import json
2+
from json import JSONDecodeError
23

34
import boto3
45
import copy
56
from unittest import TestCase
6-
from unittest.mock import patch
7+
from unittest.mock import patch, Mock, ANY
78

89
import botocore
910
from moto import mock_aws
@@ -77,6 +78,8 @@ def setUp(self):
7778

7879
self.logger_patcher = patch("batch_processor_filter_service.logger")
7980
self.mock_logger = self.logger_patcher.start()
81+
self.exception_decorator_logger_patcher = patch("exception_decorator.logger")
82+
self.mock_exception_decorator_logger = self.exception_decorator_logger_patcher.start()
8083
self.firehose_log_patcher = patch("batch_processor_filter_service.send_log_to_firehose")
8184
self.mock_firehose_send_log = self.firehose_log_patcher.start()
8285

@@ -90,8 +93,7 @@ def tearDown(self):
9093
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
9194
s3_client.delete_bucket(Bucket=bucket_name)
9295

93-
self.logger_patcher.stop()
94-
self.firehose_log_patcher.stop()
96+
patch.stopall()
9597

9698
def _assert_source_file_moved(self, filename: str):
9799
"""Check used in the duplicate scenario to validate that the original uploaded file is moved"""
@@ -110,7 +112,7 @@ def _assert_ack_file_created(self, ack_file_key: str):
110112

111113
def test_lambda_handler_raises_error_when_empty_batch_received(self):
112114
with self.assertRaises(InvalidBatchSizeError) as exc:
113-
lambda_handler({"Records": []}, {})
115+
lambda_handler({"Records": []}, Mock())
114116

115117
self.assertEqual(str(exc.exception), "Received 0 records, expected 1")
116118

@@ -119,10 +121,24 @@ def test_lambda_handler_raises_error_when_more_than_one_record_in_batch(self):
119121
lambda_handler({"Records": [
120122
make_sqs_record(self.default_batch_file_event),
121123
make_sqs_record(self.default_batch_file_event),
122-
]}, {})
124+
]}, Mock())
123125

124126
self.assertEqual(str(exc.exception), "Received 2 records, expected 1")
125127

128+
def test_lambda_handler_decorator_logs_unhandled_exceptions(self):
129+
"""The exception decorator should log the error when an unhandled exception occurs"""
130+
with self.assertRaises(JSONDecodeError):
131+
lambda_handler({"Records": [
132+
{
133+
"body": "{'malformed}"
134+
}
135+
]}, Mock())
136+
137+
self.mock_exception_decorator_logger.error.assert_called_once_with(
138+
"An unhandled exception occurred in the batch processor filter Lambda",
139+
exc_info=ANY
140+
)
141+
126142
def test_lambda_handler_handles_duplicate_file_scenario(self):
127143
"""Should update the audit table status to duplicate for the incoming record"""
128144
# Add the duplicate entry that has already been processed
@@ -137,61 +153,69 @@ def test_lambda_handler_handles_duplicate_file_scenario(self):
137153
# Create the source file in S3
138154
s3_client.put_object(Bucket=self.mock_source_bucket, Key=test_file_name)
139155

140-
lambda_handler({"Records": [make_sqs_record(duplicate_file_event)]}, {})
156+
lambda_handler({"Records": [make_sqs_record(duplicate_file_event)]}, Mock())
141157

142158
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME, duplicate_file_event["message_id"])
143-
self.assertEqual(status, "Not processed - duplicate")
159+
self.assertEqual(status, "Not processed - Duplicate")
144160

145161
sqs_messages = sqs_client.receive_message(QueueUrl=self.mock_queue_url)
146162
self.assertEqual(sqs_messages.get("Messages", []), [])
147163
self._assert_source_file_moved(test_file_name)
148164
self._assert_ack_file_created("Menacwy_Vaccinations_v5_TEST_20250820T10210000_InfAck_20250826T14372600.csv")
149165

150-
self.mock_logger.info.assert_called_once_with(
166+
self.mock_logger.error.assert_called_once_with(
151167
"A duplicate file has already been processed. Filename: %s",
152168
test_file_name
153169
)
154170

155171
def test_lambda_handler_raises_error_when_event_already_processing_for_supplier_and_vacc_type(self):
156172
"""Should raise exception so that the event is returned to the originating queue to be retried later"""
157-
# Add an audit entry for a batch event that is already processing
158-
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, self.default_batch_file_event, FileStatus.PROCESSING)
159-
160-
test_event: BatchFileCreatedEvent = BatchFileCreatedEvent(
161-
message_id="3b60c4f7-ef67-43c7-8f0d-4faee04d7d0e",
162-
vaccine_type="MENACWY", # Same vacc type
163-
supplier="TESTSUPPLIER", # Same supplier
164-
permission=["some-permissions"],
165-
filename="Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv", # Different timestamp
166-
created_at_formatted_string="20250826T15003000"
167-
)
168-
# Add the audit record for the incoming event
169-
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, test_event, FileStatus.QUEUED)
170-
171-
with self.assertRaises(EventAlreadyProcessingForSupplierAndVaccTypeError) as exc:
172-
lambda_handler({"Records": [make_sqs_record(test_event)]}, {})
173-
174-
self.assertEqual(
175-
str(exc.exception),
176-
"Batch event already processing for supplier: TESTSUPPLIER and vacc type: MENACWY"
177-
)
178-
179-
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME, test_event["message_id"])
180-
self.assertEqual(status, "Queued")
181-
182-
sqs_messages = sqs_client.receive_message(QueueUrl=self.mock_queue_url)
183-
self.assertEqual(sqs_messages.get("Messages", []), [])
184-
185-
self.mock_logger.info.assert_called_once_with(
186-
"Batch event already being processed for supplier and vacc type. Filename: %s",
187-
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv"
188-
)
173+
test_cases = {
174+
("Event is already being processed for supplier + vacc type queue", FileStatus.PROCESSING),
175+
("There is a failed event to be checked in supplier + vacc type queue", FileStatus.FAILED)
176+
}
177+
178+
for msg, file_status in test_cases:
179+
self.mock_logger.reset_mock()
180+
with self.subTest(msg=msg):
181+
# Add an audit entry for a batch event that is already processing or failed
182+
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, self.default_batch_file_event, file_status)
183+
184+
test_event: BatchFileCreatedEvent = BatchFileCreatedEvent(
185+
message_id="3b60c4f7-ef67-43c7-8f0d-4faee04d7d0e",
186+
vaccine_type="MENACWY", # Same vacc type
187+
supplier="TESTSUPPLIER", # Same supplier
188+
permission=["some-permissions"],
189+
filename="Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv", # Different timestamp
190+
created_at_formatted_string="20250826T15003000"
191+
)
192+
# Add the audit record for the incoming event
193+
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, test_event, FileStatus.QUEUED)
194+
195+
with self.assertRaises(EventAlreadyProcessingForSupplierAndVaccTypeError) as exc:
196+
lambda_handler({"Records": [make_sqs_record(test_event)]}, Mock())
197+
198+
self.assertEqual(
199+
str(exc.exception),
200+
"Batch event already processing for supplier: TESTSUPPLIER and vacc type: MENACWY"
201+
)
202+
203+
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME, test_event["message_id"])
204+
self.assertEqual(status, "Queued")
205+
206+
sqs_messages = sqs_client.receive_message(QueueUrl=self.mock_queue_url)
207+
self.assertEqual(sqs_messages.get("Messages", []), [])
208+
209+
self.mock_logger.info.assert_called_once_with(
210+
"Batch event already processing for supplier and vacc type. Filename: %s",
211+
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv"
212+
)
189213

190214
def test_lambda_handler_processes_event_successfully(self):
191215
"""Should update the audit entry status to Processing and forward to SQS"""
192216
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, self.default_batch_file_event, FileStatus.QUEUED)
193217

194-
lambda_handler({"Records": [make_sqs_record(self.default_batch_file_event)]}, {})
218+
lambda_handler({"Records": [make_sqs_record(self.default_batch_file_event)]}, Mock())
195219

196220
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME,
197221
self.default_batch_file_event["message_id"])
@@ -223,7 +247,7 @@ def test_lambda_handler_processes_event_successfully_when_event_for_same_supplie
223247
)
224248
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, test_event, FileStatus.QUEUED)
225249

226-
lambda_handler({"Records": [make_sqs_record(test_event)]}, {})
250+
lambda_handler({"Records": [make_sqs_record(test_event)]}, Mock())
227251

228252
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME, test_event["message_id"])
229253
self.assertEqual(status, "Processing")

filenameprocessor/src/audit_table.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Add the filename to the audit table and check for duplicates."""
2+
from typing import Optional
23
from clients import dynamodb_client, logger
34
from errors import UnhandledAuditTableError
45
from constants import AUDIT_TABLE_NAME, AuditTableKeys
@@ -10,23 +11,29 @@ def upsert_audit_table(
1011
created_at_formatted_str: str,
1112
expiry_timestamp: int,
1213
queue_name: str,
13-
file_status: str
14+
file_status: str,
15+
error_details: Optional[str] = None
1416
) -> None:
1517
"""
1618
Updates the audit table with the file details
1719
"""
20+
audit_item = {
21+
AuditTableKeys.MESSAGE_ID: {"S": message_id},
22+
AuditTableKeys.FILENAME: {"S": file_key},
23+
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
24+
AuditTableKeys.STATUS: {"S": file_status},
25+
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
26+
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)}
27+
}
28+
29+
if error_details is not None:
30+
audit_item[AuditTableKeys.ERROR_DETAILS] = {"S": error_details}
31+
1832
try:
1933
# Add to the audit table (regardless of whether it is a duplicate)
2034
dynamodb_client.put_item(
2135
TableName=AUDIT_TABLE_NAME,
22-
Item={
23-
AuditTableKeys.MESSAGE_ID: {"S": message_id},
24-
AuditTableKeys.FILENAME: {"S": file_key},
25-
AuditTableKeys.QUEUE_NAME: {"S": queue_name},
26-
AuditTableKeys.STATUS: {"S": file_status},
27-
AuditTableKeys.TIMESTAMP: {"S": created_at_formatted_str},
28-
AuditTableKeys.EXPIRES_AT: {"N": str(expiry_timestamp)},
29-
},
36+
Item=audit_item,
3037
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
3138
)
3239
logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id)

0 commit comments

Comments
 (0)