Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 122 additions & 102 deletions delta_backend/src/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,139 +22,159 @@
def send_message(record):
# Create a message
message_body = record
# Use boto3 to interact with SQS
sqs_client = boto3.client("sqs")
try:
# Send the record to the queue
sqs_client = boto3.client("sqs")
sqs_client.send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(message_body))
logger.info("Record saved successfully to the DLQ")
except ClientError as e:
logger.error(f"Error sending record to DLQ: {e}")


def get_vaccine_type(patientsk) -> str:
parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")]
return parsed[0]


def handler(event, context):
ret = True
logger.info("Starting Delta Handler")
log_data = dict()
firehose_log = dict()
operation_outcome = dict()
log_data["function_name"] = "delta_sync"
intrusion_check = True
def process_record(record, delta_table, log_data, firehose_log):
"""
Processes a single record from the event.

Args:
record (dict): The DynamoDB stream record to process.
delta_table (boto3.Table): The DynamoDB table resource.
log_data (dict): Log data for the current record.
firehose_log (dict): Firehose log data for the current record.

Returns:
bool: True if the record was processed successfully, False otherwise.
"""
try:
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
delta_table = dynamodb.Table(delta_table_name)
start = time.time()
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
expiry_time = approximate_creation_time + timedelta(days=30)
expiry_time_epoch = int(expiry_time.timestamp())
error_records = []
response = str()
imms_id = str()
operation = str()

# Converting ApproximateCreationDateTime directly to string will give Unix timestamp
# I am converting it to isofformat for filtering purpose. This can be changed accordingly
if record["eventName"] != EventName.DELETE_PHYSICAL:
new_image = record["dynamodb"]["NewImage"]
imms_id = new_image["PK"]["S"].split("#")[1]
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]

if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
resource_json = json.loads(new_image["Resource"]["S"])
FHIRConverter = Converter(json.dumps(resource_json))
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
error_records = FHIRConverter.getErrorRecords()
flat_json["ACTION_FLAG"] = action_flag

for record in event["Records"]:
start = time.time()
log_data["date_time"] = str(datetime.now())
intrusion_check = False
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
expiry_time = approximate_creation_time + timedelta(days=30)
expiry_time_epoch = int(expiry_time.timestamp())
error_records = []
response = str()
imms_id = str()
operation = str()
if record["eventName"] != EventName.DELETE_PHYSICAL:
new_image = record["dynamodb"]["NewImage"]
imms_id = new_image["PK"]["S"].split("#")[1]
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
resource_json = json.loads(new_image["Resource"]["S"])
FHIRConverter = Converter(json.dumps(resource_json))
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
error_records = FHIRConverter.getErrorRecords()
flat_json["ACTION_FLAG"] = action_flag
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": operation,
"VaccineType": vaccine_type,
"SupplierSystem": supplier_system,
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": flat_json,
"ExpiresAt": expiry_time_epoch,
}
)
else:
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Record from DPS skipped"
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(f"Record from DPS skipped for {imms_id}")
continue
else:
operation = Operation.DELETE_PHYSICAL
new_image = record["dynamodb"]["Keys"]
logger.info(f"Record to delta:{new_image}")
imms_id = new_image["PK"]["S"].split("#")[1]
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": Operation.DELETE_PHYSICAL,
"VaccineType": "default",
"SupplierSystem": "default",
"Operation": operation,
"VaccineType": vaccine_type,
"SupplierSystem": supplier_system,
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": "",
"Imms": flat_json,
"ExpiresAt": expiry_time_epoch,
}
)
end = time.time()
log_data["time_taken"] = f"{round(end - start, 5)}s"
operation_outcome = {"record": imms_id, "operation_type": operation}
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
if error_records:
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
operation_outcome["statusCode"] = "207"
operation_outcome["statusDesc"] = (
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
)
else:
log = f"Record Successfully created for {imms_id}"
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Successfully synched into delta"
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
else:
log = f"Record NOT created for {imms_id}"
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
operation_outcome = {
"statusCode": "200",
"statusDesc": "Record from DPS skipped",
}
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
logger.info(log)
ret = False
logger.info(f"Record from DPS skipped for {imms_id}")
return True
else:
operation = Operation.DELETE_PHYSICAL
new_image = record["dynamodb"]["Keys"]
logger.info(f"Record to delta: {new_image}")
imms_id = new_image["PK"]["S"].split("#")[1]

except Exception as e:
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
if intrusion_check:
operation_outcome["diagnostics"] = "Incorrect invocation of Lambda"
logger.exception("Incorrect invocation of Lambda")
response = delta_table.put_item(
Item={
"PK": str(uuid.uuid4()),
"ImmsID": imms_id,
"Operation": Operation.DELETE_PHYSICAL,
"VaccineType": "default",
"SupplierSystem": "default",
"DateTimeStamp": approximate_creation_time.isoformat(),
"Source": delta_source,
"Imms": "",
"ExpiresAt": expiry_time_epoch,
}
)

end = time.time()
log_data["time_taken"] = f"{round(end - start, 5)}s"
operation_outcome = {"record": imms_id, "operation_type": operation}

if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
if error_records:
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
operation_outcome["statusCode"] = "207"
operation_outcome["statusDesc"] = (
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
)
else:
log = f"Record Successfully created for {imms_id}"
operation_outcome["statusCode"] = "200"
operation_outcome["statusDesc"] = "Successfully synched into delta"
logger.info(log)
else:
operation_outcome["diagnostics"] = f"Delta Lambda failure: {e}"
logger.exception(f"Delta Lambda failure: {e}")
send_message(event) # Send failed records to DLQ
log = f"Record NOT created for {imms_id}"
operation_outcome["statusCode"] = "500"
operation_outcome["statusDesc"] = "Exception"
logger.warning(log)
return False

log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
return True

except Exception as e:
logger.exception(f"Error processing record: {e}")
return False


def handler(event, context):
ret = True
logger.info("Starting Delta Handler")
log_data = dict()
firehose_log = dict()
log_data["function_name"] = "delta_sync"
try:
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
delta_table = dynamodb.Table(delta_table_name)
for record in event["Records"]:
log_data["date_time"] = str(datetime.now())

# Process each record
result = process_record(record, delta_table, log_data, firehose_log)
if not result:
ret = False

except Exception as e:
ret = False
operation_outcome = {
"statusCode": "500",
"statusDesc": "Exception",
"diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda"
}
logger.exception(operation_outcome["diagnostics"])
send_message(event) # Send failed records to DLQ
log_data["operation_outcome"] = operation_outcome
firehose_log["event"] = log_data
firehose_logger.send_log(firehose_log)
return ret
6 changes: 3 additions & 3 deletions delta_backend/src/log_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(
self.delivery_stream_name = stream_name

def send_log(self, log_message):
log_to_splunk = log_message
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
try:
log_to_splunk = log_message
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
response = self.firehose_client.put_record(
DeliveryStreamName=self.delivery_stream_name,
Record={"Data": encoded_log_data},
Expand Down
Loading
Loading