Skip to content

Commit 4c0d853

Browse files
authored
VED-294: Fix potential duplication in the delta. (#575)
* VED-294: Fix potential duplication in the delta during blue / green switchover. * VED-294: Increase timeout to handle processing a full batch of 100 records. * VED-294: Split handler into multiple functions for readability. * VED-294: Tidy up. * VED-294: Use correct log method. Fix assertions. * VED-294: Send messages with any failures to the DLQ. Add test assertions. * VED-294: Address review comment.
1 parent 7306775 commit 4c0d853

File tree

4 files changed

+220
-146
lines changed

4 files changed

+220
-146
lines changed

delta_backend/src/delta.py

Lines changed: 156 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import decimal
2-
3-
import boto3
42
import json
3+
import logging
54
import os
65
import time
7-
from datetime import datetime, timedelta
8-
import uuid
9-
import logging
6+
from datetime import datetime, timedelta, UTC
7+
from unittest import case
8+
9+
import boto3
10+
from boto3.dynamodb.conditions import Attr
1011
from botocore.exceptions import ClientError
11-
from log_firehose import FirehoseLogger
12-
from converter import Converter
12+
1313
from common.mappings import ActionFlag, Operation, EventName
14+
from converter import Converter
15+
from log_firehose import FirehoseLogger
1416

1517
failure_queue_url = os.environ["AWS_SQS_QUEUE_URL"]
1618
delta_table_name = os.environ["DELTA_TABLE_NAME"]
@@ -47,8 +49,8 @@ def get_sqs_client():
4749
try:
4850
logger.info("Initializing SQS Client")
4951
sqs_client = boto3.client("sqs", region_name)
50-
except Exception as e:
51-
logger.error(f"Error initializing SQS Client: {e}")
52+
except Exception:
53+
logger.exception("Error initializing SQS Client")
5254
sqs_client = None
5355
return sqs_client
5456

@@ -59,136 +61,169 @@ def send_message(record, queue_url=failure_queue_url):
5961
# Send the record to the queue
6062
get_sqs_client().send_message(QueueUrl=queue_url, MessageBody=json.dumps(message_body))
6163
logger.info("Record saved successfully to the DLQ")
62-
except Exception as e:
63-
logger.error(f"Error sending record to DLQ: {e}")
64+
except Exception:
65+
logger.exception("Error sending record to DLQ")
66+
67+
def get_vaccine_type(patient_sort_key: str) -> str:
68+
vaccine_type = patient_sort_key.split("#")[0]
69+
return str.strip(str.lower(vaccine_type))
70+
71+
def get_imms_id(primary_key: str) -> str:
72+
return primary_key.split("#")[1]
6473

65-
def get_vaccine_type(patientsk) -> str:
66-
parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")]
67-
return parsed[0]
74+
def get_creation_and_expiry_times(creation_timestamp: float) -> (str, int):
75+
creation_datetime = datetime.fromtimestamp(creation_timestamp, UTC)
76+
expiry_datetime = creation_datetime + timedelta(days=30)
77+
expiry_timestamp = int(expiry_datetime.timestamp())
78+
return creation_datetime.isoformat(), expiry_timestamp
6879

6980
def send_firehose(log_data):
7081
try:
7182
firehose_log = {"event": log_data}
7283
firehose_logger.send_log(firehose_log)
84+
except Exception:
85+
logger.exception("Error sending log to Firehose")
86+
87+
def handle_dynamodb_response(response, error_records):
88+
match response:
89+
case {"ResponseMetadata": {"HTTPStatusCode": 200}} if error_records:
90+
logger.warning(f"Partial success: successfully synced into delta, but issues found within record: {json.dumps(error_records)}")
91+
return True, {"statusCode": "207", "statusDesc": "Partial success: successfully synced into delta, but issues found within record", "diagnostics": error_records}
92+
case {"ResponseMetadata": {"HTTPStatusCode": 200}}:
93+
logger.info("Successfully synched into delta")
94+
return True, {"statusCode": "200", "statusDesc": "Successfully synched into delta"}
95+
case _:
96+
logger.error(f"Failure response from DynamoDB: {response}")
97+
return False, {"statusCode": "500", "statusDesc": "Failure response from DynamoDB", "diagnostics": response}
98+
99+
def handle_exception_response(response):
100+
match response:
101+
case ClientError(response={"Error": {"Code": "ConditionalCheckFailedException"}}):
102+
logger.info("Skipped record already present in delta")
103+
return True, {"statusCode": "200", "statusDesc": "Skipped record already present in delta"}
104+
case _:
105+
logger.exception("Exception during processing")
106+
return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": response}
107+
108+
def process_remove(record):
109+
event_id = record["eventID"]
110+
primary_key = record["dynamodb"]["Keys"]["PK"]["S"]
111+
imms_id = get_imms_id(primary_key)
112+
operation = Operation.DELETE_PHYSICAL
113+
creation_timestamp = record["dynamodb"]["ApproximateCreationDateTime"]
114+
creation_datetime_str, expiry_timestamp = get_creation_and_expiry_times(creation_timestamp)
115+
operation_outcome = {"operation_type": operation, "record": imms_id}
116+
try:
117+
response = get_delta_table().put_item(
118+
Item={
119+
"PK": event_id,
120+
"ImmsID": imms_id,
121+
"Operation": operation,
122+
"VaccineType": "default",
123+
"SupplierSystem": "default",
124+
"DateTimeStamp": creation_datetime_str,
125+
"Source": delta_source,
126+
"Imms": "",
127+
"ExpiresAt": expiry_timestamp,
128+
},
129+
ConditionExpression=Attr("PK").not_exists(),
130+
)
131+
success, extra_log_fields = handle_dynamodb_response(response, None)
132+
operation_outcome.update(extra_log_fields)
133+
return success, operation_outcome
73134
except Exception as e:
74-
logger.error(f"Error sending log to Firehose: {e}")
135+
success, extra_log_fields = handle_exception_response(e)
136+
operation_outcome.update(extra_log_fields)
137+
return success, operation_outcome
138+
139+
def process_skip(record):
140+
primary_key = record["dynamodb"]["NewImage"]["PK"]["S"]
141+
imms_id = get_imms_id(primary_key)
142+
logger.info("Record from DPS skipped")
143+
return True, {"record": imms_id, "statusCode": "200", "statusDesc": "Record from DPS skipped"}
144+
145+
def process_create_update_delete(record):
146+
event_id = record["eventID"]
147+
new_image = record["dynamodb"]["NewImage"]
148+
primary_key = new_image["PK"]["S"]
149+
imms_id = get_imms_id(primary_key)
150+
operation = new_image["Operation"]["S"]
151+
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
152+
supplier_system = new_image["SupplierSystem"]["S"]
153+
creation_timestamp = record["dynamodb"]["ApproximateCreationDateTime"]
154+
creation_datetime_str, expiry_timestamp = get_creation_and_expiry_times(creation_timestamp)
155+
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
156+
resource_json = json.loads(new_image["Resource"]["S"], parse_float=decimal.Decimal)
157+
fhir_converter = Converter(resource_json, action_flag=action_flag)
158+
flat_json = fhir_converter.run_conversion()
159+
error_records = fhir_converter.get_error_records()
160+
operation_outcome = {"record": imms_id, "operation_type": operation}
75161

76-
def process_record(record, log_data):
77-
ret = True
78162
try:
79-
start = time.time()
80-
operation_outcome = {}
81-
error_records = []
82-
response = str()
83-
imms_id = str()
84-
operation = str()
85-
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
86-
expiry_time = approximate_creation_time + timedelta(days=30)
87-
expiry_time_epoch = int(expiry_time.timestamp())
88-
delta_table = get_delta_table()
89-
90-
if record["eventName"] != EventName.DELETE_PHYSICAL:
91-
new_image = record["dynamodb"]["NewImage"]
92-
imms_id = new_image["PK"]["S"].split("#")[1]
93-
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
94-
supplier_system = new_image["SupplierSystem"]["S"]
95-
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
96-
operation = new_image["Operation"]["S"]
97-
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
98-
resource_json = json.loads(new_image["Resource"]["S"], parse_float=decimal.Decimal)
99-
FHIRConverter = Converter(resource_json, action_flag=action_flag)
100-
flat_json = FHIRConverter.run_conversion()
101-
error_records = FHIRConverter.get_error_records()
102-
response = delta_table.put_item(
103-
Item={
104-
"PK": str(uuid.uuid4()),
105-
"ImmsID": imms_id,
106-
"Operation": operation,
107-
"VaccineType": vaccine_type,
108-
"SupplierSystem": supplier_system,
109-
"DateTimeStamp": approximate_creation_time.isoformat(),
110-
"Source": delta_source,
111-
"Imms": flat_json,
112-
"ExpiresAt": expiry_time_epoch,
113-
}
114-
)
115-
else:
116-
operation_outcome["statusCode"] = "200"
117-
operation_outcome["statusDesc"] = "Record from DPS skipped"
118-
log_data["operation_outcome"] = operation_outcome
119-
logger.info(f"Record from DPS skipped for {imms_id}")
120-
return True, log_data
121-
else:
122-
operation = Operation.DELETE_PHYSICAL
123-
new_image = record["dynamodb"]["Keys"]
124-
logger.info(f"Record to delta:{new_image}")
125-
imms_id = new_image["PK"]["S"].split("#")[1]
126-
response = delta_table.put_item(
127-
Item={
128-
"PK": str(uuid.uuid4()),
129-
"ImmsID": imms_id,
130-
"Operation": operation,
131-
"VaccineType": "default",
132-
"SupplierSystem": "default",
133-
"DateTimeStamp": approximate_creation_time.isoformat(),
134-
"Source": delta_source,
135-
"Imms": "",
136-
"ExpiresAt": expiry_time_epoch,
137-
}
138-
)
139-
end = time.time()
140-
log_data["time_taken"] = f"{round(end - start, 5)}s"
141-
operation_outcome = {"record": imms_id, "operation_type": operation}
142-
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
143-
if error_records:
144-
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
145-
operation_outcome["statusCode"] = "207"
146-
operation_outcome["statusDesc"] = (
147-
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
148-
)
149-
else:
150-
log = f"Record Successfully created for {imms_id}"
151-
operation_outcome["statusCode"] = "200"
152-
operation_outcome["statusDesc"] = "Successfully synched into delta"
153-
logger.info(log)
154-
else:
155-
log = f"Record NOT created for {imms_id}"
156-
operation_outcome["statusCode"] = "500"
157-
operation_outcome["statusDesc"] = "Exception"
158-
logger.warning(log)
159-
ret = False
163+
response = get_delta_table().put_item(
164+
Item={
165+
"PK": event_id,
166+
"ImmsID": imms_id,
167+
"Operation": operation,
168+
"VaccineType": vaccine_type,
169+
"SupplierSystem": supplier_system,
170+
"DateTimeStamp": creation_datetime_str,
171+
"Source": delta_source,
172+
"Imms": flat_json,
173+
"ExpiresAt": expiry_timestamp,
174+
},
175+
ConditionExpression=Attr("PK").not_exists(),
176+
)
177+
success, extra_log_fields = handle_dynamodb_response(response, error_records)
178+
operation_outcome.update(extra_log_fields)
179+
return success, operation_outcome
160180
except Exception as e:
161-
operation_outcome["statusCode"] = "500"
162-
operation_outcome["statusDesc"] = "Exception"
163-
logger.exception(f"Error processing record: {e}")
164-
ret = False
181+
success, extra_log_fields = handle_exception_response(e)
182+
operation_outcome.update(extra_log_fields)
183+
return success, operation_outcome
184+
185+
def process_record(record):
186+
try:
187+
if record["eventName"] == EventName.DELETE_PHYSICAL:
188+
return process_remove(record)
189+
190+
supplier_system = record["dynamodb"]["NewImage"]["SupplierSystem"]["S"]
191+
if supplier_system in ("DPSFULL", "DPSREDUCED"):
192+
return process_skip(record)
165193

166-
log_data["operation_outcome"] = operation_outcome
167-
return ret, log_data
194+
return process_create_update_delete(record)
195+
except Exception as e:
196+
logger.exception("Exception during processing")
197+
return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": e}
168198

169-
def handler(event, context):
170-
ret = True
199+
def handler(event, _context):
200+
overall_success = True
171201
logger.info("Starting Delta Handler")
172-
log_data = dict()
173-
operation_outcome = dict()
174-
log_data["function_name"] = "delta_sync"
175202
try:
176203
for record in event["Records"]:
177-
log_data["date_time"] = str(datetime.now())
178-
result, log_data = process_record(record, log_data)
179-
send_firehose(log_data)
180-
if not result:
181-
ret = False
182-
183-
except Exception as e:
184-
ret = False
204+
datetime_str = datetime.now().isoformat()
205+
start = time.time()
206+
success, operation_outcome = process_record(record)
207+
overall_success = overall_success and success
208+
end = time.time()
209+
send_firehose({
210+
"function_name": "delta_sync",
211+
"operation_outcome": operation_outcome,
212+
"date_time": datetime_str,
213+
"time_taken": f"{round(end - start, 5)}s"
214+
})
215+
except Exception:
216+
overall_success = False
185217
operation_outcome = {
186218
"statusCode": "500",
187219
"statusDesc": "Exception",
188-
"diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda"
220+
"diagnostics": "Delta Lambda failure: Incorrect invocation of Lambda"
189221
}
190222
logger.exception(operation_outcome["diagnostics"])
191223
send_message(event) # Send failed records to DLQ
192-
log_data["operation_outcome"] = operation_outcome
193-
send_firehose(log_data)
194-
return ret
224+
send_firehose({"function_name": "delta_sync", "operation_outcome": operation_outcome})
225+
226+
if not overall_success:
227+
send_message(event)
228+
229+
return overall_success

0 commit comments

Comments
 (0)