Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions delta_backend/src/common/mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Define enums for event names, operations, and action flags.

# case eventName operation actionFlag
----------------- --------- --------- ----------
create INSERT CREATE NEW
update MODIFY UPDATE UPDATE
logically delete MODIFY DELETE DELETE
physically delete REMOVE REMOVE N/A
"""

class EventName():
CREATE = "INSERT"
UPDATE = "MODIFY"
DELETE_LOGICAL = "MODIFY"
DELETE_PHYSICAL = "REMOVE"

class Operation():
CREATE = "CREATE"
UPDATE = "UPDATE"
DELETE_LOGICAL = "DELETE"
DELETE_PHYSICAL = "REMOVE"

class ActionFlag():
CREATE = "NEW"
UPDATE = "UPDATE"
DELETE_LOGICAL = "DELETE"
21 changes: 10 additions & 11 deletions delta_backend/src/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from botocore.exceptions import ClientError
from log_firehose import FirehoseLogger
from Converter import Converter
from common.mappings import ActionFlag, Operation, EventName

failure_queue_url = os.environ["AWS_SQS_QUEUE_URL"]
delta_table_name = os.environ["DELTA_TABLE_NAME"]
Expand Down Expand Up @@ -37,6 +38,7 @@ def get_vaccine_type(patientsk) -> str:


def handler(event, context):
ret = True
logger.info("Starting Delta Handler")
log_data = dict()
firehose_log = dict()
Expand All @@ -61,14 +63,14 @@ def handler(event, context):
response = str()
imms_id = str()
operation = str()
if record["eventName"] != "REMOVE":
if record["eventName"] != EventName.DELETE_PHYSICAL:
new_image = record["dynamodb"]["NewImage"]
imms_id = new_image["PK"]["S"].split("#")[1]
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
action_flag = "NEW" if operation == "CREATE" else operation
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
resource_json = json.loads(new_image["Resource"]["S"])
FHIRConverter = Converter(json.dumps(resource_json))
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
Expand All @@ -94,17 +96,17 @@ def handler(event, context):
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(f"Record from DPS skipped for {imms_id}")
return {"statusCode": 200, "body": f"Record from DPS skipped for {imms_id}"}
continue
else:
operation = "REMOVE"
operation = Operation.DELETE_PHYSICAL
new_image = record["dynamodb"]["Keys"]
logger.info(f"Record to delta:{new_image}")
imms_id = new_image["PK"]["S"].split("#")[1]
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": "REMOVE",
"Operation": Operation.DELETE_PHYSICAL,
"VaccineType": "default",
"SupplierSystem": "default",
"DateTimeStamp": approximate_creation_time.isoformat(),
Expand All @@ -131,7 +133,6 @@ def handler(event, context):
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
return {"statusCode": 200, "body": "Records processed successfully"}
else:
log = f"Record NOT created for {imms_id}"
operation_outcome["statusCode"] = "500"
Expand All @@ -140,7 +141,7 @@ def handler(event, context):
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
return {"statusCode": 500, "body": "Records not processed successfully"}
ret = False

except Exception as e:
operation_outcome["statusCode"] = "500"
Expand All @@ -155,7 +156,5 @@ def handler(event, context):
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
return {
"statusCode": 500,
"body": "Records not processed",
}
ret = False
return ret
17 changes: 11 additions & 6 deletions delta_backend/tests/test_convert_to_flat_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from SchemaParser import SchemaParser
from Converter import Converter
from ConversionChecker import ConversionChecker, RecordError
from common.mappings import ActionFlag, Operation, EventName
import ExceptionMessages

MOCK_ENV_VARS = {
Expand Down Expand Up @@ -100,7 +101,7 @@ def tearDown(self):
self.mock_firehose_logger.stop()

@staticmethod
def get_event(event_name="INSERT", operation="operation", supplier="EMIS"):
def get_event(event_name=EventName.CREATE, operation="operation", supplier="EMIS"):
"""Returns test event data."""
return ValuesForTests.get_event(event_name, operation, supplier)

Expand All @@ -110,8 +111,7 @@ def assert_dynamodb_record(self, operation_flag, action_flag, items, expected_va
Ignores dynamically generated fields like PK, DateTimeStamp, and ExpiresAt.
Ensures that the 'Imms' field matches exactly.
"""
self.assertEqual(response["statusCode"], 200)
self.assertEqual(response["body"], "Records processed successfully")
self.assertTrue(response)

filtered_items = [
{k: v for k, v in item.items() if k not in ["PK", "DateTimeStamp", "ExpiresAt"]}
Expand All @@ -126,6 +126,11 @@ def assert_dynamodb_record(self, operation_flag, action_flag, items, expected_va
self.assertIsInstance(imms_data, dict)
self.assertGreater(len(imms_data), 0)

for key, expected_value in expected_values.items():
self.assertIn(key, filtered_items[0], f"{key} is missing")
if (filtered_items[0][key] != expected_value):
print (f"{key} mismatch {filtered_items[0][key]} != {expected_value}")

# Check Imms JSON structure matches exactly
self.assertEqual(imms_data, expected_imms, "Imms data does not match expected JSON structure")

Expand Down Expand Up @@ -167,9 +172,9 @@ def test_fhir_converter_json_error_scenario(self):
def test_handler_imms_convert_to_flat_json(self):
"""Test that the Imms field contains the correct flat JSON data for CREATE, UPDATE, and DELETE operations."""
expected_action_flags = [
{"Operation": "CREATE", "EXPECTED_ACTION_FLAG": "NEW"},
{"Operation": "UPDATE", "EXPECTED_ACTION_FLAG": "UPDATE"},
{"Operation": "DELETE", "EXPECTED_ACTION_FLAG": "DELETE"},
{"Operation": Operation.CREATE, "EXPECTED_ACTION_FLAG": ActionFlag.CREATE},
{"Operation": Operation.UPDATE, "EXPECTED_ACTION_FLAG": ActionFlag.UPDATE},
{"Operation": Operation.DELETE_LOGICAL, "EXPECTED_ACTION_FLAG": ActionFlag.DELETE_LOGICAL},
]

for test_case in expected_action_flags:
Expand Down
Loading
Loading