33import unittest
44import os
55import json
6- from unittest .mock import patch
6+ from unittest .mock import patch , Mock
77from io import StringIO
88from boto3 import client as boto3_client
99from moto import mock_aws
1010
1111from utils .mock_environment_variables import AUDIT_TABLE_NAME , MOCK_ENVIRONMENT_DICT , BucketNames , REGION_NAME
1212from utils .generic_setup_and_teardown_for_ack_backend import GenericSetUp , GenericTearDown
1313from utils .utils_for_ack_backend_tests import (
14- validate_ack_file_content , add_audit_entry_to_table ,
14+ validate_ack_file_content , add_audit_entry_to_table , generate_sample_existing_ack_content ,
1515)
1616from utils .values_for_ack_backend_tests import (
1717 DiagnosticsDictionaries ,
1818 MOCK_MESSAGE_DETAILS ,
1919 ValidValues ,
2020 EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS ,
2121)
22+ from utils_for_ack_lambda import _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP
2223
2324with patch .dict ("os.environ" , MOCK_ENVIRONMENT_DICT ):
2425 from ack_processor import lambda_handler
@@ -59,7 +60,7 @@ def tearDown(self) -> None:
5960 def generate_event (test_messages : list [dict ]) -> dict :
6061 """
6162 Returns an event where each message in the incoming message body list is based on a standard mock message,
62- updated with the details from the corresponsing message in the given test_messages list.
63+ updated with the details from the corresponding message in the given test_messages list.
6364 """
6465 incoming_message_body = [
6566 (
@@ -71,6 +72,34 @@ def generate_event(test_messages: list[dict]) -> dict:
7172 ]
7273 return {"Records" : [{"body" : json .dumps (incoming_message_body )}]}
7374
75+ def assert_ack_and_source_file_locations_correct (
76+ self ,
77+ source_file_key : str ,
78+ tmp_ack_file_key : str ,
79+ complete_ack_file_key : str ,
80+ is_complete : bool ,
81+ ) -> None :
82+ """Helper function to check the ack and source files have not been moved as the processing is not yet
83+ complete"""
84+ if is_complete :
85+ ack_file = self .s3_client .get_object (Bucket = BucketNames .DESTINATION , Key = complete_ack_file_key )
86+ else :
87+ ack_file = self .s3_client .get_object (Bucket = BucketNames .DESTINATION , Key = tmp_ack_file_key )
88+ self .assertIsNotNone (ack_file ["Body" ].read ())
89+
90+ full_src_file_key = f"archive/{ source_file_key } " if is_complete else f"processing/{ source_file_key } "
91+ src_file = self .s3_client .get_object (Bucket = BucketNames .SOURCE , Key = full_src_file_key )
92+ self .assertIsNotNone (src_file ["Body" ].read ())
93+
94+ def assert_audit_entry_status_equals (self , message_id : str , status : str ) -> None :
95+ """Checks the audit entry status is as expected"""
96+ audit_entry = self .dynamodb_client .get_item (
97+ TableName = AUDIT_TABLE_NAME , Key = {"message_id" : {"S" : message_id }}
98+ ).get ("Item" )
99+
100+ actual_status = audit_entry .get ("status" , {}).get ("S" )
101+ self .assertEqual (actual_status , status )
102+
74103 def test_lambda_handler_main_multiple_records (self ):
75104 """Test lambda handler with multiple records."""
76105 # Set up an audit entry which does not yet have record_count recorded
@@ -172,7 +201,148 @@ def test_lambda_handler_main(self):
172201
173202 self .s3_client .delete_object (Bucket = BucketNames .DESTINATION , Key = MOCK_MESSAGE_DETAILS .temp_ack_file_key )
174203
175- # def test_lambda_handler
204+ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records_still_remaining (self ):
205+ """
206+ Test that the batch file process is not marked as complete when not all records have been processed.
207+ This means:
208+ - the ack file remains in the TempAck directory
209+ - the source file remains in the processing directory
210+ - all ack records in the event are written to the temporary ack
211+ """
212+ mock_batch_message_id = "b500efe4-6e75-4768-a38b-6127b3c7b8e0"
213+
214+ # Original source file had 100 records
215+ add_audit_entry_to_table (self .dynamodb_client , mock_batch_message_id , record_count = 100 )
216+ array_of_success_messages = [
217+ {** BASE_SUCCESS_MESSAGE , "row_id" : f"{ mock_batch_message_id } ^{ i } " , "imms_id" : f"imms_{ i } " ,
218+ "local_id" : f"local^{ i } " }
219+ for i in range (1 , 4 )
220+ ]
221+ test_event = {
222+ "Records" : [
223+ {"body" : json .dumps (array_of_success_messages )}
224+ ]
225+ }
226+
227+ response = lambda_handler (event = test_event , context = {})
228+
229+ self .assertEqual (response , EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS )
230+ validate_ack_file_content (
231+ self .s3_client ,
232+ [
233+ * array_of_success_messages
234+ ],
235+ existing_file_content = ValidValues .ack_headers ,
236+ )
237+ self .assert_ack_and_source_file_locations_correct (
238+ MOCK_MESSAGE_DETAILS .file_key ,
239+ MOCK_MESSAGE_DETAILS .temp_ack_file_key ,
240+ MOCK_MESSAGE_DETAILS .archive_ack_file_key ,
241+ is_complete = False
242+ )
243+ self .assert_audit_entry_status_equals (mock_batch_message_id , "Preprocessed" )
244+
245+ @patch ("utils_for_ack_lambda.get_record_count_by_message_id" , return_value = 500 )
246+ def test_lambda_handler_uses_message_id_to_record_count_cache_to_reduce_ddb_calls (
247+ self ,
248+ mock_get_record_count : Mock
249+ ):
250+ """The DynamoDB Audit table is used to store the total record count for each source file. To reduce calls each
251+ time - this test checks that we cache the value as this lambda is called many times for large files"""
252+ mock_batch_message_id = "622cdeea-461e-4a83-acb5-7871d47ddbcd"
253+
254+ # Original source file had 100 records
255+ add_audit_entry_to_table (self .dynamodb_client , mock_batch_message_id , record_count = 500 )
256+
257+ message_one = [{** BASE_SUCCESS_MESSAGE , "row_id" : f"{ mock_batch_message_id } ^1" , "imms_id" : "imms_1" ,
258+ "local_id" : "local^1" }]
259+ message_two = [{** BASE_SUCCESS_MESSAGE , "row_id" : f"{ mock_batch_message_id } ^2" , "imms_id" : "imms_2" ,
260+ "local_id" : "local^2" }]
261+ test_event_one = {
262+ "Records" : [
263+ {"body" : json .dumps (message_one )}
264+ ]
265+ }
266+ test_event_two = {
267+ "Records" : [
268+ {"body" : json .dumps (message_two )}
269+ ]
270+ }
271+
272+ response = lambda_handler (event = test_event_one , context = {})
273+ self .assertEqual (response , EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS )
274+ second_invocation_response = lambda_handler (event = test_event_two , context = {})
275+ self .assertEqual (second_invocation_response , EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS )
276+
277+ # Assert that the DDB call is only performed once on the first invocation
278+ mock_get_record_count .assert_called_once_with (mock_batch_message_id )
279+ validate_ack_file_content (
280+ self .s3_client ,
281+ [
282+ * message_one , * message_two
283+ ],
284+ existing_file_content = ValidValues .ack_headers ,
285+ )
286+ self .assert_ack_and_source_file_locations_correct (
287+ MOCK_MESSAGE_DETAILS .file_key ,
288+ MOCK_MESSAGE_DETAILS .temp_ack_file_key ,
289+ MOCK_MESSAGE_DETAILS .archive_ack_file_key ,
290+ is_complete = False
291+ )
292+ self .assertEqual (_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP [mock_batch_message_id ], 500 )
293+ self .assert_audit_entry_status_equals (mock_batch_message_id , "Preprocessed" )
294+
295+ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_processed (self ):
296+ """
297+ Test that the batch file process is marked as complete when all records have been processed.
298+ This means:
299+ - the ack file moves from the TempAck directory to the forwardedFile directory
300+ - the source file moves from the processing to the archive directory
301+ - all ack records in the event are appended to the existing temporary ack file
302+ - the DDB Audit Table status is set as 'Processed'
303+ """
304+ mock_batch_message_id = "75db20e6-c0b5-4012-a8bc-f861a1dd4b22"
305+
306+ # Original source file had 100 records
307+ add_audit_entry_to_table (self .dynamodb_client , mock_batch_message_id , record_count = 100 )
308+
309+ # Previous invocations have already created and added to the temp ack file
310+ existing_ack_content = generate_sample_existing_ack_content ()
311+ self .s3_client .put_object (
312+ Bucket = BucketNames .DESTINATION ,
313+ Key = MOCK_MESSAGE_DETAILS .temp_ack_file_key ,
314+ Body = StringIO (existing_ack_content ).getvalue (),
315+ )
316+
317+ array_of_success_messages = [
318+ {** BASE_SUCCESS_MESSAGE , "row_id" : f"{ mock_batch_message_id } ^{ i } " , "imms_id" : f"imms_{ i } " ,
319+ "local_id" : f"local^{ i } " }
320+ for i in range (50 , 101 )
321+ ]
322+ test_event = {
323+ "Records" : [
324+ {"body" : json .dumps (array_of_success_messages )}
325+ ]
326+ }
327+
328+ response = lambda_handler (event = test_event , context = {})
329+
330+ self .assertEqual (response , EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS )
331+ validate_ack_file_content (
332+ self .s3_client ,
333+ [
334+ * array_of_success_messages
335+ ],
336+ existing_file_content = existing_ack_content ,
337+ is_complete = True
338+ )
339+ self .assert_ack_and_source_file_locations_correct (
340+ MOCK_MESSAGE_DETAILS .file_key ,
341+ MOCK_MESSAGE_DETAILS .temp_ack_file_key ,
342+ MOCK_MESSAGE_DETAILS .archive_ack_file_key ,
343+ is_complete = True
344+ )
345+ self .assert_audit_entry_status_equals (mock_batch_message_id , "Processed" )
176346
177347 def test_lambda_handler_error_scenarios (self ):
178348 """Test that the lambda handler raises appropriate exceptions for malformed event data."""
0 commit comments