Skip to content

Commit eda56ab

Browse files
committed
Add support for moving files to the new lambda function
1 parent 859dae9 commit eda56ab

File tree

7 files changed

+153
-13
lines changed

7 files changed

+153
-13
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""Module for the batch file repository"""
2+
from csv import writer
3+
from io import StringIO, BytesIO
4+
5+
import boto3
6+
7+
from batch_file_created_event import BatchFileCreatedEvent
8+
from constants import SOURCE_BUCKET_NAME, ACK_BUCKET_NAME
9+
10+
11+
class BatchFileRepository:
12+
"""Repository class to handle interactions with batch file interactions e.g. managing the source and ack files"""
13+
_ARCHIVE_FILE_DIR: str = "archive"
14+
_SOURCE_BUCKET_NAME: str = SOURCE_BUCKET_NAME
15+
_ACK_BUCKET_NAME: str = ACK_BUCKET_NAME
16+
17+
def __init__(self):
18+
self._s3_client = boto3.client('s3')
19+
20+
@staticmethod
21+
def _create_ack_failure_data(batch_file_created_event: BatchFileCreatedEvent) -> dict:
22+
return {
23+
"MESSAGE_HEADER_ID": batch_file_created_event["message_id"],
24+
"HEADER_RESPONSE_CODE": "Failure",
25+
"ISSUE_SEVERITY": "Fatal",
26+
"ISSUE_CODE": "Fatal Error",
27+
"ISSUE_DETAILS_CODE": "10001",
28+
"RESPONSE_TYPE": "Technical",
29+
"RESPONSE_CODE": "10002",
30+
"RESPONSE_DISPLAY": "Infrastructure Level Response Value - Processing Error",
31+
"RECEIVED_TIME": batch_file_created_event["created_at_formatted_string"],
32+
"MAILBOX_FROM": "", # TODO: Leave blank for DPS, add mailbox if from mesh mailbox
33+
"LOCAL_ID": "", # TODO: Leave blank for DPS, add from ctl file if data picked up from MESH mailbox
34+
"MESSAGE_DELIVERY": False,
35+
}
36+
37+
def move_source_file_to_archive(self, file_key: str) -> None:
38+
self._s3_client.copy_object(
39+
Bucket=self._SOURCE_BUCKET_NAME,
40+
CopySource={"Bucket": self._SOURCE_BUCKET_NAME, "Key": file_key},
41+
Key=f"{self._ARCHIVE_FILE_DIR}/{file_key}"
42+
)
43+
self._s3_client.delete_object(Bucket=self._SOURCE_BUCKET_NAME, Key=file_key)
44+
45+
def upload_failure_ack(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
46+
ack_failure_data = self._create_ack_failure_data(batch_file_created_event)
47+
48+
ack_filename = ("ack/" + batch_file_created_event["filename"]
49+
.replace(".csv", f"_InfAck_{batch_file_created_event['created_at_formatted_string']}.csv"))
50+
51+
# Create CSV file with | delimiter, filetype .csv
52+
csv_buffer = StringIO()
53+
csv_writer = writer(csv_buffer, delimiter="|")
54+
csv_writer.writerow(list(ack_failure_data.keys()))
55+
csv_writer.writerow(list(ack_failure_data.values()))
56+
57+
# Upload the CSV file to S3
58+
csv_buffer.seek(0)
59+
csv_bytes = BytesIO(csv_buffer.getvalue().encode("utf-8"))
60+
self._s3_client.upload_fileobj(csv_bytes, self._ACK_BUCKET_NAME, ack_filename)

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from batch_audit_repository import BatchAuditRepository
77
from batch_file_created_event import BatchFileCreatedEvent
8+
from batch_file_repository import BatchFileRepository
89
from constants import REGION_NAME, FileStatus, QUEUE_URL
910
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError
1011
from send_log_to_firehose import send_log_to_firehose
@@ -16,8 +17,13 @@
1617

1718
class BatchProcessorFilterService:
1819
"""Batch processor filter service class. Provides the business logic for the Lambda function"""
19-
def __init__(self, audit_repo: BatchAuditRepository = BatchAuditRepository()):
20+
def __init__(
21+
self,
22+
audit_repo: BatchAuditRepository = BatchAuditRepository(),
23+
batch_file_repo: BatchFileRepository = BatchFileRepository()
24+
):
2025
self._batch_audit_repository = audit_repo
26+
self._batch_file_repo = batch_file_repo
2127
self._queue_client = boto3.client('sqs', region_name=REGION_NAME)
2228

2329
def _is_duplicate_file(self, file_key: str) -> bool:
@@ -34,6 +40,8 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
3440
# Mark as processed and return without error so next event will be picked up from queue
3541
logger.info("A duplicate file has already been processed. Filename: %s", filename)
3642
self._batch_audit_repository.update_status(message_id, FileStatus.DUPLICATE)
43+
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
44+
self._batch_file_repo.move_source_file_to_archive(filename)
3745
return
3846

3947
if self._batch_audit_repository.is_event_processing_for_supplier_and_vacc_type(supplier, vaccine_type):
@@ -49,6 +57,6 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
4957
)
5058
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
5159

52-
successful_log_message = "File forwarded for processing by ECS"
60+
successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
5361
logger.info(successful_log_message)
5462
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})

batch_processor_filter/src/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
AUDIT_TABLE_QUEUE_NAME_GSI = os.getenv("QUEUE_NAME_GSI")
88
QUEUE_URL = os.getenv("QUEUE_URL")
99
SPLUNK_FIREHOSE_STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME")
10+
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
11+
ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
1012

1113

1214
class FileStatus(StrEnum):

batch_processor_filter/tests/test_lambda_handler.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from unittest import TestCase
66
from unittest.mock import patch
77

8-
from moto import mock_dynamodb, mock_sqs
8+
import botocore
9+
from moto import mock_dynamodb, mock_sqs, mock_s3
910

1011
from batch_file_created_event import BatchFileCreatedEvent
1112
from exceptions import InvalidBatchSizeError, EventAlreadyProcessingForSupplierAndVaccTypeError
@@ -18,10 +19,12 @@
1819

1920
sqs_client = boto3.client("sqs", region_name=REGION_NAME)
2021
dynamodb_client = boto3.client("dynamodb", region_name=REGION_NAME)
22+
s3_client = boto3.client("s3", region_name=REGION_NAME)
2123

2224

2325
@mock_dynamodb
2426
@mock_sqs
27+
@mock_s3
2528
class TestLambdaHandler(TestCase):
2629
default_batch_file_event: BatchFileCreatedEvent = BatchFileCreatedEvent(
2730
message_id="df0b745c-b8cb-492c-ba84-8ea28d9f51d5",
@@ -32,6 +35,8 @@ class TestLambdaHandler(TestCase):
3235
created_at_formatted_string="20250826T14372600"
3336
)
3437
mock_queue_url = MOCK_ENVIRONMENT_DICT.get("QUEUE_URL")
38+
mock_source_bucket = MOCK_ENVIRONMENT_DICT.get("SOURCE_BUCKET_NAME")
39+
mock_ack_bucket = MOCK_ENVIRONMENT_DICT.get("ACK_BUCKET_NAME")
3540

3641
def setUp(self):
3742
dynamodb_client.create_table(
@@ -66,6 +71,12 @@ def setUp(self):
6671
"FifoQueue": "true",
6772
"ContentBasedDeduplication": "true"
6873
})
74+
75+
for bucket_name in [self.mock_source_bucket, self.mock_ack_bucket]:
76+
s3_client.create_bucket(
77+
Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": REGION_NAME}
78+
)
79+
6980
self.logger_patcher = patch("batch_processor_filter_service.logger")
7081
self.mock_logger = self.logger_patcher.start()
7182
self.firehose_log_patcher = patch("batch_processor_filter_service.send_log_to_firehose")
@@ -74,9 +85,31 @@ def setUp(self):
7485
def tearDown(self):
7586
dynamodb_client.delete_table(TableName=AUDIT_TABLE_NAME)
7687
sqs_client.delete_queue(QueueUrl=self.mock_queue_url)
88+
89+
for bucket_name in [self.mock_source_bucket, self.mock_ack_bucket]:
90+
for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []):
91+
# Must delete objects before bucket can be deleted
92+
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
93+
s3_client.delete_bucket(Bucket=bucket_name)
94+
7795
self.logger_patcher.stop()
7896
self.firehose_log_patcher.stop()
7997

98+
def _assert_source_file_moved(self, filename: str):
99+
"""Check used in the duplicate scenario to validate that the original uploaded file is moved"""
100+
with self.assertRaises(botocore.exceptions.ClientError) as exc:
101+
s3_client.get_object(Bucket=self.mock_source_bucket, Key=filename)
102+
103+
self.assertEqual(str(exc.exception), "An error occurred (NoSuchKey) when calling the GetObject "
104+
"operation: The specified key does not exist.")
105+
archived_object = s3_client.get_object(Bucket=self.mock_source_bucket, Key=f"archive/{filename}")
106+
self.assertIsNotNone(archived_object)
107+
108+
def _assert_ack_file_created(self, ack_file_key: str):
109+
"""Check used in duplicate scenario to validate that the failure ack was created"""
110+
ack_file = s3_client.get_object(Bucket=self.mock_ack_bucket, Key=f"ack/{ack_file_key}")
111+
self.assertIsNotNone(ack_file)
112+
80113
def test_lambda_handler_raises_error_when_empty_batch_received(self):
81114
with self.assertRaises(InvalidBatchSizeError) as exc:
82115
lambda_handler({"Records": []}, {})
@@ -98,21 +131,27 @@ def test_lambda_handler_handles_duplicate_file_scenario(self):
98131
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, self.default_batch_file_event, FileStatus.PROCESSED)
99132
duplicate_file_event = copy.deepcopy(self.default_batch_file_event)
100133
duplicate_file_event["message_id"] = "fc9008b7-3865-4dcf-88b8-fc4abafff5f8"
134+
test_file_name = duplicate_file_event["filename"]
101135

102136
# Add the audit record for the incoming event
103137
add_entry_to_mock_table(dynamodb_client, AUDIT_TABLE_NAME, duplicate_file_event, FileStatus.QUEUED)
104138

139+
# Create the source file in S3
140+
s3_client.put_object(Bucket=self.mock_source_bucket, Key=test_file_name)
141+
105142
lambda_handler({"Records": [make_sqs_record(duplicate_file_event)]}, {})
106143

107144
status = get_audit_entry_status_by_id(dynamodb_client, AUDIT_TABLE_NAME, duplicate_file_event["message_id"])
108145
self.assertEqual(status, "Not processed - duplicate")
109146

110147
sqs_messages = sqs_client.receive_message(QueueUrl=self.mock_queue_url)
111148
self.assertEqual(sqs_messages.get("Messages", []), [])
149+
self._assert_source_file_moved(test_file_name)
150+
self._assert_ack_file_created("Menacwy_Vaccinations_v5_TEST_20250820T10210000_InfAck_20250826T14372600.csv")
112151

113152
self.mock_logger.info.assert_called_once_with(
114153
"A duplicate file has already been processed. Filename: %s",
115-
"Menacwy_Vaccinations_v5_TEST_20250820T10210000.csv"
154+
test_file_name
116155
)
117156

118157
def test_lambda_handler_raises_error_when_event_already_processing_for_supplier_and_vacc_type(self):
@@ -164,7 +203,8 @@ def test_lambda_handler_processes_event_successfully(self):
164203
self.assertEqual(len(sqs_messages.get("Messages", [])), 1)
165204
self.assertDictEqual(json.loads(sqs_messages["Messages"][0]["Body"]), dict(self.default_batch_file_event))
166205

167-
expected_log_message = "File forwarded for processing by ECS"
206+
expected_log_message = (f"File forwarded for processing by ECS. Filename: "
207+
f"{self.default_batch_file_event['filename']}")
168208
self.mock_logger.info.assert_called_once_with(expected_log_message)
169209
self.mock_firehose_send_log.assert_called_once_with(
170210
{**self.default_batch_file_event, "message": expected_log_message}
@@ -194,7 +234,7 @@ def test_lambda_handler_processes_event_successfully_when_event_for_same_supplie
194234
self.assertEqual(len(sqs_messages.get("Messages", [])), 1)
195235
self.assertDictEqual(json.loads(sqs_messages["Messages"][0]["Body"]), dict(test_event))
196236

197-
expected_log_message = "File forwarded for processing by ECS"
237+
expected_log_message = f"File forwarded for processing by ECS. Filename: {test_event['filename']}"
198238
self.mock_logger.info.assert_called_once_with(expected_log_message)
199239
self.mock_firehose_send_log.assert_called_once_with(
200240
{**test_event, "message": expected_log_message}

batch_processor_filter/tests/testing_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
"AUDIT_TABLE_NAME": "immunisation-batch-internal-dev-audit-table",
99
"QUEUE_URL": "https://sqs.eu-west-2.amazonaws.com/123456789012/imms-batch-metadata-queue.fifo",
1010
"FILE_NAME_GSI": "filename_index",
11-
"QUEUE_NAME_GSI": "queue_name_index"
11+
"QUEUE_NAME_GSI": "queue_name_index",
12+
"SOURCE_BUCKET_NAME": "immunisation-batch-internal-dev-data-sources",
13+
"ACK_BUCKET_NAME": "immunisation-batch-internal-dev-data-destinations"
1214
}
1315

1416

terraform/batch_processor_filter_lambda.tf

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,32 @@ resource "aws_iam_policy" "batch_processor_filter_lambda_exec_policy" {
122122
"firehose:PutRecordBatch"
123123
],
124124
"Resource" : "arn:aws:firehose:*:*:deliverystream/${module.splunk.firehose_stream_name}"
125+
},
126+
{
127+
Effect = "Allow"
128+
Action = [
129+
"s3:GetObject",
130+
"s3:ListBucket",
131+
"s3:PutObject",
132+
"s3:CopyObject",
133+
"s3:DeleteObject"
134+
]
135+
Resource = [
136+
aws_s3_bucket.batch_data_source_bucket.arn,
137+
"${aws_s3_bucket.batch_data_source_bucket.arn}/*"
138+
]
139+
},
140+
{
141+
Effect = "Allow"
142+
Action = [
143+
"s3:GetObject",
144+
"s3:PutObject",
145+
"s3:ListBucket"
146+
]
147+
Resource = [
148+
aws_s3_bucket.batch_data_destination_bucket.arn,
149+
"${aws_s3_bucket.batch_data_destination_bucket.arn}/*"
150+
]
125151
}
126152
]
127153
})
@@ -247,11 +273,13 @@ resource "aws_lambda_function" "batch_processor_filter_lambda" {
247273

248274
environment {
249275
variables = {
250-
QUEUE_URL = aws_sqs_queue.supplier_fifo_queue.url
251-
SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name
252-
AUDIT_TABLE_NAME = aws_dynamodb_table.audit-table.name
253-
FILE_NAME_GSI = "filename_index"
254-
QUEUE_NAME_GSI = "queue_name_index"
276+
SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket
277+
ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket
278+
QUEUE_URL = aws_sqs_queue.supplier_fifo_queue.url
279+
SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name
280+
AUDIT_TABLE_NAME = aws_dynamodb_table.audit-table.name
281+
FILE_NAME_GSI = "filename_index"
282+
QUEUE_NAME_GSI = "queue_name_index"
255283
}
256284
}
257285
kms_key_arn = data.aws_kms_key.existing_lambda_encryption_key.arn

terraform/sqs_batch_file_created.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ resource "aws_sqs_queue" "batch_file_created" {
33
name = "${local.short_prefix}-batch-file-created-queue.fifo"
44
fifo_queue = true
55
content_based_deduplication = true # Optional, helps with deduplication
6-
visibility_timeout_seconds = 900 # TODO - discuss and refine both this, max receives and DLQ
6+
visibility_timeout_seconds = 60 # TODO - discuss and refine this + DDB TTL
77
}

0 commit comments

Comments
 (0)