Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
PYTHONPATH: delta_backend/src:delta_backend/tests
continue-on-error: true
run: |
pip install poetry==1.8.4 moto==4.2.11 mypy-boto3-dynamodb==1.35.54 boto3==1.26.165 coverage botocore==1.29.165 jmespath==1.0.1 python-dateutil==2.9.0 urllib3==1.26.20 s3transfer==0.6.2 typing-extensions==4.12.2
pip install poetry==1.8.4 moto==5.1.4 mypy-boto3-dynamodb==1.35.54 boto3==1.26.165 coverage botocore==1.29.165 jmespath==1.0.1 python-dateutil==2.9.0 urllib3==1.26.20 s3transfer==0.6.2 typing-extensions==4.12.2
poetry run coverage run --source=delta_backend -m unittest discover -s delta_backend || echo "delta tests failed" >> failed_tests.txt
poetry run coverage xml -o sonarcloud-coverage-delta.xml

Expand Down
236 changes: 131 additions & 105 deletions delta_backend/src/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,139 +22,165 @@
def send_message(record):
# Create a message
message_body = record
# Use boto3 to interact with SQS
sqs_client = boto3.client("sqs")
try:
# Send the record to the queue
sqs_client = boto3.client("sqs")
sqs_client.send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(message_body))
logger.info("Record saved successfully to the DLQ")
except ClientError as e:
logger.error(f"Error sending record to DLQ: {e}")


def get_vaccine_type(patientsk) -> str:
parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")]
return parsed[0]

def send_firehose(log_data):
try:
firehose_log = {"event": log_data}
firehose_logger.send_log(firehose_log)
except Exception as e:
logger.error(f"Error sending log to Firehose: {e}")

def process_record(record, delta_table, log_data):
"""
Processes a single record from the event.

def handler(event, context):
Args:

record (dict): The DynamoDB stream record to process.
delta_table (boto3.Table): The DynamoDB table resource.
log_data (dict): Log data for the current record.
firehose_log (dict): Firehose log data for the current record.

Returns:
bool: True if the record was processed successfully, False otherwise.
"""
ret = True
logger.info("Starting Delta Handler")
log_data = dict()
firehose_log = dict()
operation_outcome = dict()
log_data["function_name"] = "delta_sync"
intrusion_check = True
try:
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
delta_table = dynamodb.Table(delta_table_name)
start = time.time()
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
expiry_time = approximate_creation_time + timedelta(days=30)
expiry_time_epoch = int(expiry_time.timestamp())
error_records = []
response = str()
imms_id = str()
operation = str()
operation_outcome = {}

# Converting ApproximateCreationDateTime directly to string will give Unix timestamp
# I am converting it to isofformat for filtering purpose. This can be changed accordingly
if record["eventName"] != EventName.DELETE_PHYSICAL:
new_image = record["dynamodb"]["NewImage"]
imms_id = new_image["PK"]["S"].split("#")[1]
operation_outcome["record"] = imms_id
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]

if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
operation_outcome["operation_type"] = operation

action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
resource_json = json.loads(new_image["Resource"]["S"])
FHIRConverter = Converter(json.dumps(resource_json))
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
error_records = FHIRConverter.getErrorRecords()
flat_json["ACTION_FLAG"] = action_flag

for record in event["Records"]:
start = time.time()
log_data["date_time"] = str(datetime.now())
intrusion_check = False
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
expiry_time = approximate_creation_time + timedelta(days=30)
expiry_time_epoch = int(expiry_time.timestamp())
error_records = []
response = str()
imms_id = str()
operation = str()
if record["eventName"] != EventName.DELETE_PHYSICAL:
new_image = record["dynamodb"]["NewImage"]
imms_id = new_image["PK"]["S"].split("#")[1]
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
resource_json = json.loads(new_image["Resource"]["S"])
FHIRConverter = Converter(json.dumps(resource_json))
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
error_records = FHIRConverter.getErrorRecords()
flat_json["ACTION_FLAG"] = action_flag
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": operation,
"VaccineType": vaccine_type,
"SupplierSystem": supplier_system,
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": flat_json,
"ExpiresAt": expiry_time_epoch,
}
)
else:
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Record from DPS skipped"
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(f"Record from DPS skipped for {imms_id}")
continue
else:
operation = Operation.DELETE_PHYSICAL
new_image = record["dynamodb"]["Keys"]
logger.info(f"Record to delta:{new_image}")
imms_id = new_image["PK"]["S"].split("#")[1]
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": Operation.DELETE_PHYSICAL,
"VaccineType": "default",
"SupplierSystem": "default",
"Operation": operation,
"VaccineType": vaccine_type,
"SupplierSystem": supplier_system,
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": "",
"Imms": flat_json,
"ExpiresAt": expiry_time_epoch,
}
)
end = time.time()
log_data["time_taken"] = f"{round(end - start, 5)}s"
operation_outcome = {"record": imms_id, "operation_type": operation}
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
if error_records:
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
operation_outcome["statusCode"] = "207"
operation_outcome["statusDesc"] = (
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
)
else:
log = f"Record Successfully created for {imms_id}"
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Successfully synched into delta"
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
else:
log = f"Record NOT created for {imms_id}"
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
ret = False
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Record from DPS skipped"
logger.info(f"Record from DPS skipped for {imms_id}")
return True, log_data
else:
operation = Operation.DELETE_PHYSICAL
new_image = record["dynamodb"]["Keys"]
logger.info(f"Record to delta: {new_image}")
imms_id = new_image["PK"]["S"].split("#")[1]
operation_outcome["record"] = imms_id
operation_outcome["operation_type"] = operation

response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": Operation.DELETE_PHYSICAL,
"VaccineType": "default",
"SupplierSystem": "default",
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": "",
"ExpiresAt": expiry_time_epoch,
}
)

end = time.time()
log_data["time_taken"] = f"{round(end - start, 5)}s"

if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
if error_records:
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
operation_outcome["statusCode"] = "207"
operation_outcome["statusDesc"] = (
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
)
else:
log = f"Record Successfully created for {imms_id}"
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Successfully synched into delta"
logger.info(log)
else:
log = f"Record NOT created for {imms_id}"
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
logger.warning(log)
ret = False

except Exception as e:
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
if intrusion_check:
operation_outcome["diagnostics"] = "Incorrect invocation of Lambda"
logger.exception("Incorrect invocation of Lambda")
else:
operation_outcome["diagnostics"] = f"Delta Lambda failure: {e}"
logger.exception(f"Delta Lambda failure: {e}")
send_message(event) # Send failed records to DLQ
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.exception(f"Error processing record: {e}")
ret = False

log_data["operation_outcome"] = operation_outcome
return ret, log_data

def handler(event, context):
ret = True
logger.info("Starting Delta Handler")
log_data = dict()
log_data["function_name"] = "delta_sync"
try:
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
delta_table = dynamodb.Table(delta_table_name)
for record in event["Records"]:
log_data["date_time"] = str(datetime.now())

# Process each record
result, log_data = process_record(record, delta_table, log_data)
send_firehose(log_data)
if not result:
ret = False

except Exception as e:
ret = False
operation_outcome = {
"statusCode": "500",
"statusDesc": "Exception",
"diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda"
}
logger.exception(operation_outcome["diagnostics"])
send_message(event) # Send failed records to DLQ
log_data["operation_outcome"] = operation_outcome
send_firehose(log_data)
return ret
6 changes: 3 additions & 3 deletions delta_backend/src/log_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(
self.delivery_stream_name = stream_name

def send_log(self, log_message):
log_to_splunk = log_message
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
try:
log_to_splunk = log_message
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
response = self.firehose_client.put_record(
DeliveryStreamName=self.delivery_stream_name,
Record={"Data": encoded_log_data},
Expand Down
5 changes: 2 additions & 3 deletions delta_backend/tests/test_convert_to_flat_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from copy import deepcopy
from unittest.mock import patch, Mock
from moto import mock_dynamodb, mock_sqs
from moto import mock_aws
from boto3 import resource as boto3_resource
from tests.utils_for_converter_tests import ValuesForTests, ErrorValuesForTests
from SchemaParser import SchemaParser
Expand All @@ -22,8 +22,7 @@


@patch.dict("os.environ", MOCK_ENV_VARS, clear=True)
@mock_dynamodb
@mock_sqs
@mock_aws
class TestConvertToFlatJson(unittest.TestCase):
maxDiff = None
def setUp(self):
Expand Down
Loading
Loading