From 05ab289118d25a6602722c82900580168c76f257 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Fri, 16 May 2025 19:59:06 +0100 Subject: [PATCH 1/8] mock sqs, firehose --- delta_backend/src/delta.py | 224 ++++++++++++++++-------------- delta_backend/src/log_firehose.py | 6 +- delta_backend/tests/test_delta.py | 124 ++++++++++++++--- 3 files changed, 229 insertions(+), 125 deletions(-) diff --git a/delta_backend/src/delta.py b/delta_backend/src/delta.py index ed94da6763..a6e03c25b4 100644 --- a/delta_backend/src/delta.py +++ b/delta_backend/src/delta.py @@ -22,139 +22,159 @@ def send_message(record): # Create a message message_body = record - # Use boto3 to interact with SQS - sqs_client = boto3.client("sqs") try: - # Send the record to the queue + sqs_client = boto3.client("sqs") sqs_client.send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(message_body)) logger.info("Record saved successfully to the DLQ") except ClientError as e: logger.error(f"Error sending record to DLQ: {e}") - def get_vaccine_type(patientsk) -> str: parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")] return parsed[0] -def handler(event, context): - ret = True - logger.info("Starting Delta Handler") - log_data = dict() - firehose_log = dict() - operation_outcome = dict() - log_data["function_name"] = "delta_sync" - intrusion_check = True +def process_record(record, delta_table, log_data, firehose_log): + """ + Processes a single record from the event. + + Args: + record (dict): The DynamoDB stream record to process. + delta_table (boto3.Table): The DynamoDB table resource. + log_data (dict): Log data for the current record. + firehose_log (dict): Firehose log data for the current record. + + Returns: + bool: True if the record was processed successfully, False otherwise. + """ try: - dynamodb = boto3.resource("dynamodb", region_name="eu-west-2") - delta_table = dynamodb.Table(delta_table_name) + start = time.time() + approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"]) + expiry_time = approximate_creation_time + timedelta(days=30) + expiry_time_epoch = int(expiry_time.timestamp()) + error_records = [] + response = str() + imms_id = str() + operation = str() - # Converting ApproximateCreationDateTime directly to string will give Unix timestamp - # I am converting it to isofformat for filtering purpose. This can be changed accordingly + if record["eventName"] != EventName.DELETE_PHYSICAL: + new_image = record["dynamodb"]["NewImage"] + imms_id = new_image["PK"]["S"].split("#")[1] + vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"]) + supplier_system = new_image["SupplierSystem"]["S"] + + if supplier_system not in ("DPSFULL", "DPSREDUCED"): + operation = new_image["Operation"]["S"] + action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation + resource_json = json.loads(new_image["Resource"]["S"]) + FHIRConverter = Converter(json.dumps(resource_json)) + flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON + error_records = FHIRConverter.getErrorRecords() + flat_json["ACTION_FLAG"] = action_flag - for record in event["Records"]: - start = time.time() - log_data["date_time"] = str(datetime.now()) - intrusion_check = False - approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"]) - expiry_time = approximate_creation_time + timedelta(days=30) - expiry_time_epoch = int(expiry_time.timestamp()) - error_records = [] - response = str() - imms_id = str() - operation = str() - if record["eventName"] != EventName.DELETE_PHYSICAL: - new_image = record["dynamodb"]["NewImage"] - imms_id = new_image["PK"]["S"].split("#")[1] - vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"]) - supplier_system = new_image["SupplierSystem"]["S"] - if supplier_system not in ("DPSFULL", "DPSREDUCED"): - operation = new_image["Operation"]["S"] - action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation - resource_json = json.loads(new_image["Resource"]["S"]) - FHIRConverter = Converter(json.dumps(resource_json)) - flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON - error_records = FHIRConverter.getErrorRecords() - flat_json["ACTION_FLAG"] = action_flag - response = delta_table.put_item( - Item={ - "PK": str(uuid.uuid4()), - "ImmsID": imms_id, - "Operation": operation, - "VaccineType": vaccine_type, - "SupplierSystem": supplier_system, - "DateTimeStamp": approximate_creation_time.isoformat(), - "Source": delta_source, - "Imms": flat_json, - "ExpiresAt": expiry_time_epoch, - } - ) - else: - operation_outcome["statusCode"] = "200" - operation_outcome["statusDesc"] = "Record from DPS skipped" - log_data["operation_outcome"] = operation_outcome - firehose_log["event"] = log_data - firehose_logger.send_log(firehose_log) - logger.info(f"Record from DPS skipped for {imms_id}") - continue - else: - operation = Operation.DELETE_PHYSICAL - new_image = record["dynamodb"]["Keys"] - logger.info(f"Record to delta:{new_image}") - imms_id = new_image["PK"]["S"].split("#")[1] response = delta_table.put_item( Item={ "PK": str(uuid.uuid4()), "ImmsID": imms_id, - "Operation": Operation.DELETE_PHYSICAL, - "VaccineType": "default", - "SupplierSystem": "default", + "Operation": operation, + "VaccineType": vaccine_type, + "SupplierSystem": supplier_system, "DateTimeStamp": approximate_creation_time.isoformat(), "Source": delta_source, - "Imms": "", + "Imms": flat_json, "ExpiresAt": expiry_time_epoch, } ) - end = time.time() - log_data["time_taken"] = f"{round(end - start, 5)}s" - operation_outcome = {"record": imms_id, "operation_type": operation} - if response["ResponseMetadata"]["HTTPStatusCode"] == 200: - if error_records: - log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}" - operation_outcome["statusCode"] = "207" - operation_outcome["statusDesc"] = ( - f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}" - ) - else: - log = f"Record Successfully created for {imms_id}" - operation_outcome["statusCode"] = "200" - operation_outcome["statusDesc"] = "Successfully synched into delta" - log_data["operation_outcome"] = operation_outcome - firehose_log["event"] = log_data - firehose_logger.send_log(firehose_log) - logger.info(log) else: - log = f"Record NOT created for {imms_id}" - operation_outcome["statusCode"] = "500" - operation_outcome["statusDesc"] = "Exception" + operation_outcome = { + "statusCode": "200", + "statusDesc": "Record from DPS skipped", + } log_data["operation_outcome"] = operation_outcome firehose_log["event"] = log_data firehose_logger.send_log(firehose_log) - logger.info(log) - ret = False + logger.info(f"Record from DPS skipped for {imms_id}") + return True + else: + operation = Operation.DELETE_PHYSICAL + new_image = record["dynamodb"]["Keys"] + logger.info(f"Record to delta: {new_image}") + imms_id = new_image["PK"]["S"].split("#")[1] - except Exception as e: - operation_outcome["statusCode"] = "500" - operation_outcome["statusDesc"] = "Exception" - if intrusion_check: - operation_outcome["diagnostics"] = "Incorrect invocation of Lambda" - logger.exception("Incorrect invocation of Lambda") + response = delta_table.put_item( + Item={ + "PK": str(uuid.uuid4()), + "ImmsID": imms_id, + "Operation": Operation.DELETE_PHYSICAL, + "VaccineType": "default", + "SupplierSystem": "default", + "DateTimeStamp": approximate_creation_time.isoformat(), + "Source": delta_source, + "Imms": "", + "ExpiresAt": expiry_time_epoch, + } + ) + + end = time.time() + log_data["time_taken"] = f"{round(end - start, 5)}s" + operation_outcome = {"record": imms_id, "operation_type": operation} + + if response["ResponseMetadata"]["HTTPStatusCode"] == 200: + if error_records: + log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}" + operation_outcome["statusCode"] = "207" + operation_outcome["statusDesc"] = ( + f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}" + ) + else: + log = f"Record Successfully created for {imms_id}" + operation_outcome["statusCode"] = "200" + operation_outcome["statusDesc"] = "Successfully synched into delta" + logger.info(log) else: - operation_outcome["diagnostics"] = f"Delta Lambda failure: {e}" - logger.exception(f"Delta Lambda failure: {e}") - send_message(event) # Send failed records to DLQ + log = f"Record NOT created for {imms_id}" + operation_outcome["statusCode"] = "500" + operation_outcome["statusDesc"] = "Exception" + logger.warning(log) + return False + log_data["operation_outcome"] = operation_outcome firehose_log["event"] = log_data firehose_logger.send_log(firehose_log) + return True + + except Exception as e: + logger.exception(f"Error processing record: {e}") + return False + + +def handler(event, context): + ret = True + logger.info("Starting Delta Handler") + log_data = dict() + firehose_log = dict() + log_data["function_name"] = "delta_sync" + try: + dynamodb = boto3.resource("dynamodb", region_name="eu-west-2") + delta_table = dynamodb.Table(delta_table_name) + for record in event["Records"]: + log_data["date_time"] = str(datetime.now()) + + # Process each record + result = process_record(record, delta_table, log_data, firehose_log) + if not result: + ret = False + + except Exception as e: ret = False + operation_outcome = { + "statusCode": "500", + "statusDesc": "Exception", + "diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda" + } + logger.exception(operation_outcome["diagnostics"]) + send_message(event) # Send failed records to DLQ + log_data["operation_outcome"] = operation_outcome + firehose_log["event"] = log_data + firehose_logger.send_log(firehose_log) return ret diff --git a/delta_backend/src/log_firehose.py b/delta_backend/src/log_firehose.py index b86e87e2f8..8b0e102075 100644 --- a/delta_backend/src/log_firehose.py +++ b/delta_backend/src/log_firehose.py @@ -19,10 +19,10 @@ def __init__( self.delivery_stream_name = stream_name def send_log(self, log_message): - log_to_splunk = log_message - logger.info(f"Log sent to Firehose for save: {log_to_splunk}") - encoded_log_data = json.dumps(log_to_splunk).encode("utf-8") try: + log_to_splunk = log_message + logger.info(f"Log sent to Firehose for save: {log_to_splunk}") + encoded_log_data = json.dumps(log_to_splunk).encode("utf-8") response = self.firehose_client.put_record( DeliveryStreamName=self.delivery_stream_name, Record={"Data": encoded_log_data}, diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index 718e140148..7fd4212de1 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -14,6 +14,10 @@ from delta import send_message, handler # Import after setting environment variables from utils_for_converter_tests import ValuesForTests, RecordConfig +success_response = {"ResponseMetadata": {"HTTPStatusCode": 200}} +exception_response = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem") +fail_response = {"ResponseMetadata": {"HTTPStatusCode": 500}} + class DeltaTestCase(unittest.TestCase): def setUp(self): @@ -25,13 +29,22 @@ def setUp(self): self.logger_exception_patcher = patch("logging.Logger.exception") self.mock_logger_exception = self.logger_exception_patcher.start() + self.logger_warning_patcher = patch("logging.Logger.warning") + self.mock_logger_warning = self.logger_warning_patcher.start() + self.firehose_logger_patcher = patch("delta.firehose_logger") self.mock_firehose_logger = self.firehose_logger_patcher.start() + + self.boto_client_patcher = patch("boto3.client") + self.mock_boto_client = self.boto_client_patcher.start() + def tearDown(self): self.logger_exception_patcher.stop() + self.logger_warning_patcher.stop() self.logger_info_patcher.stop() self.mock_firehose_logger.stop() + self.boto_client_patcher.stop() @staticmethod def setup_mock_sqs(mock_boto_client, return_value={"ResponseMetadata": {"HTTPStatusCode": 200}}): @@ -53,10 +66,9 @@ def setUp_mock_resources(self, mock_boto_resource, mock_boto_client): mock_table.put_item.side_effect = Exception("Test Exception") return mock_table - @patch("boto3.client") - def test_send_message_success(self, mock_boto_client): + def test_send_message_success(self): # Arrange - mock_sqs = self.setup_mock_sqs(mock_boto_client) + mock_sqs = self.setup_mock_sqs(self.mock_boto_client) record = {"key": "value"} # Act @@ -187,38 +199,38 @@ def test_handler_success_delete_logical(self, mock_boto_resource): self.assertEqual(put_item_data["Operation"], Operation.DELETE_LOGICAL) self.assertEqual(put_item_data["ImmsID"], imms_id) + @patch("boto3.resource") @patch("boto3.client") - def test_handler_exception_intrusion_check(self, mock_boto_resource, mock_boto_client): + def test_handler_exception_intrusion_check(self, mock_boto_client, mock_boto_resource): # Arrange - self.setup_mock_dynamodb(mock_boto_resource, status_code=500) - mock_boto_client.return_value = MagicMock() + self.setUp_mock_resources(mock_boto_client, mock_boto_resource) event = ValuesForTests.get_event() + expected_message = "Delta Lambda failure: Incorrect invocation of Lambda" # Act & Assert result = handler(event, self.context) self.assertFalse(result) + + self.mock_firehose_logger.send_log.assert_called() + firehose_log = self.mock_firehose_logger.send_log.call_args[0][0] + firehose_log_event = firehose_log["event"] + operation_outcome = firehose_log_event["operation_outcome"] + self.assertEqual(operation_outcome["statusDesc"], "Exception") + self.assertEqual(operation_outcome["diagnostics"], expected_message) + self.mock_logger_exception.assert_called_once_with(expected_message) - @patch("boto3.resource") - @patch("boto3.client") - def test_handler_exception_intrusion(self, mock_boto_client, mock_boto_resource): - # Arrange - self.setUp_mock_resources(mock_boto_resource, mock_boto_client) - event = ValuesForTests.get_event() - context = {} - - # Act & Assert - with self.assertRaises(Exception): - handler(event, context) - - self.mock_logger_exception.assert_called_once_with("Delta Lambda failure: Test Exception") + @patch("logging.Logger.error") @patch("boto3.resource") @patch("delta.handler") - def test_handler_exception_intrusion_check_false(self, mocked_intrusion, mock_boto_client): + def test_handler_exception_intrusion_check_false(self, mocked_intrusion, mock_boto_client, mock_logger_error): # Arrange self.setUp_mock_resources(mocked_intrusion, mock_boto_client) + self.setup_mock_sqs(self.mock_boto_client) + mock_logger_error.side_effect = None + mock_logger_error.return_value = None event = ValuesForTests.get_event() context = {} @@ -367,3 +379,75 @@ def test_send_message_multi_physical_delete(self, mock_boto_resource): self.assertTrue(result) self.assertEqual(mock_table.put_item.call_count, 3) self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3) + + @patch("boto3.resource") + def test_single_error_in_multi(self, mock_boto_resource): + # Arrange + mock_table = self.setup_mock_dynamodb(mock_boto_resource) + mock_table.put_item.side_effect = [ success_response, fail_response, success_response] + + records_config = [ + RecordConfig(EventName.CREATE, Operation.CREATE, "ok-id1", ActionFlag.CREATE), + RecordConfig(EventName.UPDATE, Operation.UPDATE, "fail-id1.2", ActionFlag.UPDATE), + RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "ok-id1.3"), + ] + event = ValuesForTests.get_multi_record_event(records_config) + + # Act + result = handler(event, self.context) + + # Assert + self.assertFalse(result) + self.assertEqual(mock_table.put_item.call_count, 3) + self.assertEqual(self.mock_firehose_logger.send_log.call_count, 2) + self.assertEqual(self.mock_logger_warning.call_count, 1) + + @patch("boto3.resource") + def test_single_exception_in_multi(self, mock_boto_resource): + # Arrange + mock_table = self.setup_mock_dynamodb(mock_boto_resource) + # arrange so the 3rd record fails + mock_table.put_item.side_effect = [success_response, exception_response, success_response] + + records_config = [ + RecordConfig(EventName.CREATE, Operation.CREATE, "ok-id2.1", ActionFlag.CREATE), + RecordConfig(EventName.UPDATE, Operation.UPDATE, "exception-id2.2", ActionFlag.UPDATE), + RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "ok-id2.3"), + ] + event = ValuesForTests.get_multi_record_event(records_config) + + # Act + result = handler(event, self.context) + + # Assert + self.assertFalse(result) + self.assertEqual(mock_table.put_item.call_count, 3) + self.assertEqual(self.mock_firehose_logger.send_log.call_count, 2) + self.assertEqual(self.mock_logger_exception.call_count, 1) + +@patch("delta.process_record") +@patch("boto3.resource") +def test_handler_calls_process_record_for_each_event(self, mock_boto_resource, mock_process_record): + # Arrange + mock_delta_table = MagicMock() + mock_boto_resource.return_value.Table.return_value = mock_delta_table + + # Create an event with 3 records + event = { + "Records": [ + { "a": "record1" }, + { "a": "record2" }, + { "a": "record3" } + ] + } + context = {} + + # Mock process_record to always return True + mock_process_record.return_value = True + + # Act + result = handler(event, context) + + # Assert + self.assertTrue(result) + self.assertEqual(mock_process_record.call_count, len(event["Records"])) From e2b89331dccfec25e00ec5b4afc810baaf8c0c2c Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Tue, 20 May 2025 15:15:29 +0100 Subject: [PATCH 2/8] Intrusion begone --- delta_backend/tests/test_delta.py | 39 ------------------------------- 1 file changed, 39 deletions(-) diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index 7fd4212de1..a03a89ec2c 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -200,45 +200,6 @@ def test_handler_success_delete_logical(self, mock_boto_resource): self.assertEqual(put_item_data["ImmsID"], imms_id) - @patch("boto3.resource") - @patch("boto3.client") - def test_handler_exception_intrusion_check(self, mock_boto_client, mock_boto_resource): - # Arrange - self.setUp_mock_resources(mock_boto_client, mock_boto_resource) - event = ValuesForTests.get_event() - expected_message = "Delta Lambda failure: Incorrect invocation of Lambda" - - # Act & Assert - - result = handler(event, self.context) - self.assertFalse(result) - - self.mock_firehose_logger.send_log.assert_called() - firehose_log = self.mock_firehose_logger.send_log.call_args[0][0] - firehose_log_event = firehose_log["event"] - operation_outcome = firehose_log_event["operation_outcome"] - self.assertEqual(operation_outcome["statusDesc"], "Exception") - self.assertEqual(operation_outcome["diagnostics"], expected_message) - self.mock_logger_exception.assert_called_once_with(expected_message) - - - @patch("logging.Logger.error") - @patch("boto3.resource") - @patch("delta.handler") - def test_handler_exception_intrusion_check_false(self, mocked_intrusion, mock_boto_client, mock_logger_error): - # Arrange - self.setUp_mock_resources(mocked_intrusion, mock_boto_client) - self.setup_mock_sqs(self.mock_boto_client) - mock_logger_error.side_effect = None - mock_logger_error.return_value = None - event = ValuesForTests.get_event() - context = {} - - # Act & Assert - response = handler(event, context) - - self.assertFalse(response) - @patch("delta.logger.info") def test_dps_record_skipped(self, mock_logger_info): event = ValuesForTests.get_event(supplier="DPSFULL") From ee865c0dcea5e9d42f0cdd13cdb18cc4da1b3438 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Wed, 21 May 2025 12:51:59 +0100 Subject: [PATCH 3/8] Firehose all cases --- delta_backend/src/delta.py | 48 ++++++------ delta_backend/tests/test_delta.py | 74 +++++++++++++++++-- .../tests/utils_for_converter_tests.py | 6 ++ 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/delta_backend/src/delta.py b/delta_backend/src/delta.py index a6e03c25b4..97592e4410 100644 --- a/delta_backend/src/delta.py +++ b/delta_backend/src/delta.py @@ -33,12 +33,19 @@ def get_vaccine_type(patientsk) -> str: parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")] return parsed[0] +def send_firehose(log_data): + try: + firehose_log = {"event": log_data} + firehose_logger.send_log(firehose_log) + except Exception as e: + logger.error(f"Error sending log to Firehose: {e}") -def process_record(record, delta_table, log_data, firehose_log): +def process_record(record, delta_table, log_data): """ Processes a single record from the event. Args: + record (dict): The DynamoDB stream record to process. delta_table (boto3.Table): The DynamoDB table resource. log_data (dict): Log data for the current record. @@ -47,6 +54,7 @@ def process_record(record, delta_table, log_data, firehose_log): Returns: bool: True if the record was processed successfully, False otherwise. """ + ret = True try: start = time.time() approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"]) @@ -56,15 +64,19 @@ def process_record(record, delta_table, log_data, firehose_log): response = str() imms_id = str() operation = str() + operation_outcome = {} if record["eventName"] != EventName.DELETE_PHYSICAL: new_image = record["dynamodb"]["NewImage"] imms_id = new_image["PK"]["S"].split("#")[1] + operation_outcome["record"] = imms_id vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"]) supplier_system = new_image["SupplierSystem"]["S"] if supplier_system not in ("DPSFULL", "DPSREDUCED"): operation = new_image["Operation"]["S"] + operation_outcome["operation_type"] = operation + action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation resource_json = json.loads(new_image["Resource"]["S"]) FHIRConverter = Converter(json.dumps(resource_json)) @@ -86,20 +98,17 @@ def process_record(record, delta_table, log_data, firehose_log): } ) else: - operation_outcome = { - "statusCode": "200", - "statusDesc": "Record from DPS skipped", - } - log_data["operation_outcome"] = operation_outcome - firehose_log["event"] = log_data - firehose_logger.send_log(firehose_log) + operation_outcome["statusCode"] = "200" + operation_outcome["statusDesc"] = "Record from DPS skipped" logger.info(f"Record from DPS skipped for {imms_id}") - return True + return True, log_data else: operation = Operation.DELETE_PHYSICAL new_image = record["dynamodb"]["Keys"] logger.info(f"Record to delta: {new_image}") imms_id = new_image["PK"]["S"].split("#")[1] + operation_outcome["record"] = imms_id + operation_outcome["operation_type"] = operation response = delta_table.put_item( Item={ @@ -117,7 +126,6 @@ def process_record(record, delta_table, log_data, firehose_log): end = time.time() log_data["time_taken"] = f"{round(end - start, 5)}s" - operation_outcome = {"record": imms_id, "operation_type": operation} if response["ResponseMetadata"]["HTTPStatusCode"] == 200: if error_records: @@ -136,23 +144,21 @@ def process_record(record, delta_table, log_data, firehose_log): operation_outcome["statusCode"] = "500" operation_outcome["statusDesc"] = "Exception" logger.warning(log) - return False - - log_data["operation_outcome"] = operation_outcome - firehose_log["event"] = log_data - firehose_logger.send_log(firehose_log) - return True + ret = False except Exception as e: + operation_outcome["statusCode"] = "500" + operation_outcome["statusDesc"] = "Exception" logger.exception(f"Error processing record: {e}") - return False + ret = False + log_data["operation_outcome"] = operation_outcome + return ret, log_data def handler(event, context): ret = True logger.info("Starting Delta Handler") log_data = dict() - firehose_log = dict() log_data["function_name"] = "delta_sync" try: dynamodb = boto3.resource("dynamodb", region_name="eu-west-2") @@ -161,7 +167,8 @@ def handler(event, context): log_data["date_time"] = str(datetime.now()) # Process each record - result = process_record(record, delta_table, log_data, firehose_log) + result, log_data = process_record(record, delta_table, log_data) + send_firehose(log_data) if not result: ret = False @@ -175,6 +182,5 @@ def handler(event, context): logger.exception(operation_outcome["diagnostics"]) send_message(event) # Send failed records to DLQ log_data["operation_outcome"] = operation_outcome - firehose_log["event"] = log_data - firehose_logger.send_log(firehose_log) + send_firehose(log_data) return ret diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index a03a89ec2c..2b26ad3897 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -11,8 +11,8 @@ os.environ["DELTA_TABLE_NAME"] = "my_delta_table" os.environ["SOURCE"] = "my_source" -from delta import send_message, handler # Import after setting environment variables -from utils_for_converter_tests import ValuesForTests, RecordConfig +from delta import send_message, handler, process_record # Import after setting environment variables +from utils_for_converter_tests import ValuesForTests, RecordConfig, TestConfig success_response = {"ResponseMetadata": {"HTTPStatusCode": 200}} exception_response = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem") @@ -360,15 +360,15 @@ def test_single_error_in_multi(self, mock_boto_resource): # Assert self.assertFalse(result) self.assertEqual(mock_table.put_item.call_count, 3) - self.assertEqual(self.mock_firehose_logger.send_log.call_count, 2) + self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3) self.assertEqual(self.mock_logger_warning.call_count, 1) @patch("boto3.resource") def test_single_exception_in_multi(self, mock_boto_resource): # Arrange - mock_table = self.setup_mock_dynamodb(mock_boto_resource) + mock_delta_table = self.setup_mock_dynamodb(mock_boto_resource) # arrange so the 3rd record fails - mock_table.put_item.side_effect = [success_response, exception_response, success_response] + mock_delta_table.put_item.side_effect = [success_response, exception_response, success_response] records_config = [ RecordConfig(EventName.CREATE, Operation.CREATE, "ok-id2.1", ActionFlag.CREATE), @@ -382,10 +382,70 @@ def test_single_exception_in_multi(self, mock_boto_resource): # Assert self.assertFalse(result) - self.assertEqual(mock_table.put_item.call_count, 3) - self.assertEqual(self.mock_firehose_logger.send_log.call_count, 2) + self.assertEqual(mock_delta_table.put_item.call_count, len(records_config)) + self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config)) + + + @patch("boto3.resource") + def test_single_record_success(self, mock_boto_resource): + + # Arrange + mock_delta_table = self.setup_mock_dynamodb(mock_boto_resource) + mock_delta_table.put_item.return_value = success_response + test_configs = [ + RecordConfig(EventName.CREATE, Operation.CREATE, "ok-id.1", ActionFlag.CREATE), + RecordConfig(EventName.UPDATE, Operation.UPDATE, "ok-id.2", ActionFlag.UPDATE), + RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "ok-id.3"), + ] + for config in test_configs: + record = ValuesForTests.get_event_record( + imms_id=config.imms_id, + event_name=config.event_name, + operation=config.operation, + supplier=config.supplier, + ) + log_data = {} + # Act + result, log_data = process_record(record, mock_delta_table, log_data) + + # Assert + self.assertEqual(result, True) + operation_outcome = log_data["operation_outcome"] + self.assertEqual(operation_outcome["record"], config.imms_id) + self.assertEqual(operation_outcome["operation_type"], config.operation) + self.assertEqual(operation_outcome["statusCode"], "200") + self.assertEqual(operation_outcome["statusDesc"], "Successfully synched into delta") + self.assertEqual(mock_delta_table.put_item.call_count, len(test_configs)) + self.assertEqual(self.mock_logger_exception.call_count, 0) + + @patch("boto3.resource") + def test_single_record_table_exception(self, mock_boto_resource): + + # Arrange + mock_delta_table = self.setup_mock_dynamodb(mock_boto_resource) + imms_id = "exception-id" + record = ValuesForTests.get_event_record( + imms_id, + event_name=EventName.UPDATE, + operation=Operation.UPDATE, + supplier="EMIS", + ) + mock_delta_table.put_item.return_value = exception_response + log_data = {} + # Act + result, log_data = process_record(record, mock_delta_table, log_data) + + # Assert + self.assertEqual(result, False) + operation_outcome = log_data["operation_outcome"] + self.assertEqual(operation_outcome["record"], imms_id) + self.assertEqual(operation_outcome["operation_type"], Operation.UPDATE) + self.assertEqual(operation_outcome["statusCode"], "500") + self.assertEqual(operation_outcome["statusDesc"], "Exception") + self.assertEqual(mock_delta_table.put_item.call_count, 1) self.assertEqual(self.mock_logger_exception.call_count, 1) + @patch("delta.process_record") @patch("boto3.resource") def test_handler_calls_process_record_for_each_event(self, mock_boto_resource, mock_process_record): diff --git a/delta_backend/tests/utils_for_converter_tests.py b/delta_backend/tests/utils_for_converter_tests.py index c5380646ad..84c8c9981e 100644 --- a/delta_backend/tests/utils_for_converter_tests.py +++ b/delta_backend/tests/utils_for_converter_tests.py @@ -12,6 +12,12 @@ def __init__(self, event_name, operation, imms_id, expected_action_flag=None, su self.imms_id = imms_id self.expected_action_flag = expected_action_flag +class TestConfig: + def __init__(self, record_config: RecordConfig, table_response: any): + self.record_config = record_config + self.table_response = table_response + + class ValuesForTests: MOCK_ENVIRONMENT_DICT = { From 72d6d51be1bfb785a434e48a4bb1c3c45af1e005 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Wed, 21 May 2025 12:57:04 +0100 Subject: [PATCH 4/8] tidy --- delta_backend/tests/test_delta.py | 2 +- delta_backend/tests/utils_for_converter_tests.py | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index 2b26ad3897..53b4363583 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -12,7 +12,7 @@ os.environ["SOURCE"] = "my_source" from delta import send_message, handler, process_record # Import after setting environment variables -from utils_for_converter_tests import ValuesForTests, RecordConfig, TestConfig +from utils_for_converter_tests import ValuesForTests, RecordConfig success_response = {"ResponseMetadata": {"HTTPStatusCode": 200}} exception_response = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem") diff --git a/delta_backend/tests/utils_for_converter_tests.py b/delta_backend/tests/utils_for_converter_tests.py index 84c8c9981e..c5380646ad 100644 --- a/delta_backend/tests/utils_for_converter_tests.py +++ b/delta_backend/tests/utils_for_converter_tests.py @@ -12,12 +12,6 @@ def __init__(self, event_name, operation, imms_id, expected_action_flag=None, su self.imms_id = imms_id self.expected_action_flag = expected_action_flag -class TestConfig: - def __init__(self, record_config: RecordConfig, table_response: any): - self.record_config = record_config - self.table_response = table_response - - class ValuesForTests: MOCK_ENVIRONMENT_DICT = { From 51e24b90fb43564ab70ff2112999c4ed8868d5a0 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Wed, 21 May 2025 13:05:59 +0100 Subject: [PATCH 5/8] Skipped --- delta_backend/tests/test_delta.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index 53b4363583..0ed34e6ddd 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -260,6 +260,28 @@ def test_send_message_multi_records_diverse(self, mock_boto_resource): self.assertEqual(mock_table.put_item.call_count, len(records_config)) self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config)) + @patch("boto3.resource") + def test_send_message_skipped_records_diverse(self, mock_boto_resource): + '''Check skipped records sent to firehose but not to DynamoDB''' + # Arrange + mock_table = self.setup_mock_dynamodb(mock_boto_resource) + + records_config = [ + RecordConfig(EventName.CREATE, Operation.CREATE, "id1", ActionFlag.CREATE), + RecordConfig(EventName.UPDATE, Operation.UPDATE, "id2", ActionFlag.UPDATE), + RecordConfig(EventName.CREATE, Operation.CREATE, "id-skip", ActionFlag.CREATE, "DPSFULL"), + RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "id4"), + ] + event = ValuesForTests.get_multi_record_event(records_config) + + # Act + result = handler(event, self.context) + + # Assert + self.assertTrue(result) + self.assertEqual(mock_table.put_item.call_count, 3) + self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config)) + @patch("boto3.resource") def test_send_message_multi_create(self, mock_boto_resource): # Arrange From 310513766f6fcc328001e3c942ef292a2e7650f6 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Wed, 21 May 2025 13:15:31 +0100 Subject: [PATCH 6/8] Test put in iteration --- delta_backend/tests/test_delta.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/delta_backend/tests/test_delta.py b/delta_backend/tests/test_delta.py index 0ed34e6ddd..30a026ceb0 100644 --- a/delta_backend/tests/test_delta.py +++ b/delta_backend/tests/test_delta.py @@ -419,7 +419,9 @@ def test_single_record_success(self, mock_boto_resource): RecordConfig(EventName.UPDATE, Operation.UPDATE, "ok-id.2", ActionFlag.UPDATE), RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "ok-id.3"), ] + test_index = 0 for config in test_configs: + test_index += 1 record = ValuesForTests.get_event_record( imms_id=config.imms_id, event_name=config.event_name, @@ -437,7 +439,8 @@ def test_single_record_success(self, mock_boto_resource): self.assertEqual(operation_outcome["operation_type"], config.operation) self.assertEqual(operation_outcome["statusCode"], "200") self.assertEqual(operation_outcome["statusDesc"], "Successfully synched into delta") - self.assertEqual(mock_delta_table.put_item.call_count, len(test_configs)) + self.assertEqual(mock_delta_table.put_item.call_count, test_index) + self.assertEqual(self.mock_logger_exception.call_count, 0) @patch("boto3.resource") From bf6c853b82f721e28fc4f9e6bb0f5caa92c6e803 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Wed, 21 May 2025 14:45:21 +0100 Subject: [PATCH 7/8] mock_aws --- delta_backend/tests/test_convert_to_flat_json.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/delta_backend/tests/test_convert_to_flat_json.py b/delta_backend/tests/test_convert_to_flat_json.py index 4eebd6c701..5a48842064 100644 --- a/delta_backend/tests/test_convert_to_flat_json.py +++ b/delta_backend/tests/test_convert_to_flat_json.py @@ -2,7 +2,7 @@ import unittest from copy import deepcopy from unittest.mock import patch, Mock -from moto import mock_dynamodb, mock_sqs +from moto import mock_aws from boto3 import resource as boto3_resource from tests.utils_for_converter_tests import ValuesForTests, ErrorValuesForTests from SchemaParser import SchemaParser @@ -22,8 +22,7 @@ @patch.dict("os.environ", MOCK_ENV_VARS, clear=True) -@mock_dynamodb -@mock_sqs +@mock_aws class TestConvertToFlatJson(unittest.TestCase): maxDiff = None def setUp(self): From a9f150584585a9a3abb106100e6bc2ccefa5fed2 Mon Sep 17 00:00:00 2001 From: nhsdevws Date: Thu, 22 May 2025 12:20:05 +0100 Subject: [PATCH 8/8] moto version --- .github/workflows/sonarcloud.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/sonarcloud.yml b/.github/workflows/sonarcloud.yml index 0b7c0c0727..836ee5ce12 100644 --- a/.github/workflows/sonarcloud.yml +++ b/.github/workflows/sonarcloud.yml @@ -64,7 +64,7 @@ jobs: PYTHONPATH: delta_backend/src:delta_backend/tests continue-on-error: true run: | - pip install poetry==1.8.4 moto==4.2.11 mypy-boto3-dynamodb==1.35.54 boto3==1.26.165 coverage botocore==1.29.165 jmespath==1.0.1 python-dateutil==2.9.0 urllib3==1.26.20 s3transfer==0.6.2 typing-extensions==4.12.2 + pip install poetry==1.8.4 moto==5.1.4 mypy-boto3-dynamodb==1.35.54 boto3==1.26.165 coverage botocore==1.29.165 jmespath==1.0.1 python-dateutil==2.9.0 urllib3==1.26.20 s3transfer==0.6.2 typing-extensions==4.12.2 poetry run coverage run --source=delta_backend -m unittest discover -s delta_backend || echo "delta tests failed" >> failed_tests.txt poetry run coverage xml -o sonarcloud-coverage-delta.xml