Skip to content

Commit 714a2eb

Browse files
committed
mock firehose. Mock process record
1 parent 3676bf5 commit 714a2eb

File tree

3 files changed

+227
-121
lines changed

3 files changed

+227
-121
lines changed

delta_backend/src/delta.py

Lines changed: 126 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
def send_message(record):
2323
# Create a message
2424
message_body = record
25-
# Use boto3 to interact with SQS
26-
sqs_client = boto3.client("sqs")
2725
try:
26+
# Use boto3 to interact with SQS
27+
sqs_client = boto3.client("sqs")
2828
# Send the record to the queue
2929
sqs_client.send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(message_body))
3030
logger.info("Record saved successfully to the DLQ")
@@ -37,124 +37,149 @@ def get_vaccine_type(patientsk) -> str:
3737
return parsed[0]
3838

3939

40-
def handler(event, context):
41-
ret = True
42-
logger.info("Starting Delta Handler")
43-
log_data = dict()
44-
firehose_log = dict()
45-
operation_outcome = dict()
46-
log_data["function_name"] = "delta_sync"
47-
intrusion_check = True
40+
def process_record(record, delta_table, log_data, firehose_log):
41+
"""
42+
Processes a single record from the event.
43+
44+
Args:
45+
record (dict): The DynamoDB stream record to process.
46+
delta_table (boto3.Table): The DynamoDB table resource.
47+
log_data (dict): Log data for the current record.
48+
firehose_log (dict): Firehose log data for the current record.
49+
50+
Returns:
51+
bool: True if the record was processed successfully, False otherwise.
52+
"""
4853
try:
49-
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
50-
delta_table = dynamodb.Table(delta_table_name)
54+
start = time.time()
55+
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
56+
expiry_time = approximate_creation_time + timedelta(days=30)
57+
expiry_time_epoch = int(expiry_time.timestamp())
58+
error_records = []
59+
response = str()
60+
imms_id = str()
61+
operation = str()
5162

52-
# Converting ApproximateCreationDateTime directly to string will give Unix timestamp
53-
# I am converting it to isofformat for filtering purpose. This can be changed accordingly
63+
if record["eventName"] != EventName.DELETE_PHYSICAL:
64+
new_image = record["dynamodb"]["NewImage"]
65+
imms_id = new_image["PK"]["S"].split("#")[1]
66+
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
67+
supplier_system = new_image["SupplierSystem"]["S"]
68+
69+
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
70+
operation = new_image["Operation"]["S"]
71+
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
72+
resource_json = json.loads(new_image["Resource"]["S"])
73+
FHIRConverter = Converter(json.dumps(resource_json))
74+
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
75+
error_records = FHIRConverter.getErrorRecords()
76+
flat_json["ACTION_FLAG"] = action_flag
5477

55-
for record in event["Records"]:
56-
start = time.time()
57-
log_data["date_time"] = str(datetime.now())
58-
intrusion_check = False
59-
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
60-
expiry_time = approximate_creation_time + timedelta(days=30)
61-
expiry_time_epoch = int(expiry_time.timestamp())
62-
error_records = []
63-
response = str()
64-
imms_id = str()
65-
operation = str()
66-
if record["eventName"] != EventName.DELETE_PHYSICAL:
67-
new_image = record["dynamodb"]["NewImage"]
68-
imms_id = new_image["PK"]["S"].split("#")[1]
69-
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
70-
supplier_system = new_image["SupplierSystem"]["S"]
71-
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
72-
operation = new_image["Operation"]["S"]
73-
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
74-
resource_json = json.loads(new_image["Resource"]["S"])
75-
FHIRConverter = Converter(json.dumps(resource_json))
76-
flat_json = FHIRConverter.runConversion(resource_json) # Get the flat JSON
77-
error_records = FHIRConverter.getErrorRecords()
78-
flat_json["ACTION_FLAG"] = action_flag
79-
response = delta_table.put_item(
80-
Item={
81-
"PK": str(uuid.uuid4()),
82-
"ImmsID": imms_id,
83-
"Operation": operation,
84-
"VaccineType": vaccine_type,
85-
"SupplierSystem": supplier_system,
86-
"DateTimeStamp": approximate_creation_time.isoformat(),
87-
"Source": delta_source,
88-
"Imms": flat_json,
89-
"ExpiresAt": expiry_time_epoch,
90-
}
91-
)
92-
else:
93-
operation_outcome["statusCode"] = "200"
94-
operation_outcome["statusDesc"] = "Record from DPS skipped"
95-
log_data["operation_outcome"] = operation_outcome
96-
firehose_log["event"] = log_data
97-
firehose_logger.send_log(firehose_log)
98-
logger.info(f"Record from DPS skipped for {imms_id}")
99-
continue
100-
else:
101-
operation = Operation.DELETE_PHYSICAL
102-
new_image = record["dynamodb"]["Keys"]
103-
logger.info(f"Record to delta:{new_image}")
104-
imms_id = new_image["PK"]["S"].split("#")[1]
10578
response = delta_table.put_item(
10679
Item={
10780
"PK": str(uuid.uuid4()),
10881
"ImmsID": imms_id,
109-
"Operation": Operation.DELETE_PHYSICAL,
110-
"VaccineType": "default",
111-
"SupplierSystem": "default",
82+
"Operation": operation,
83+
"VaccineType": vaccine_type,
84+
"SupplierSystem": supplier_system,
11285
"DateTimeStamp": approximate_creation_time.isoformat(),
11386
"Source": delta_source,
114-
"Imms": "",
87+
"Imms": flat_json,
11588
"ExpiresAt": expiry_time_epoch,
11689
}
11790
)
118-
end = time.time()
119-
log_data["time_taken"] = f"{round(end - start, 5)}s"
120-
operation_outcome = {"record": imms_id, "operation_type": operation}
121-
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
122-
if error_records:
123-
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
124-
operation_outcome["statusCode"] = "207"
125-
operation_outcome["statusDesc"] = (
126-
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
127-
)
128-
else:
129-
log = f"Record Successfully created for {imms_id}"
130-
operation_outcome["statusCode"] = "200"
131-
operation_outcome["statusDesc"] = "Successfully synched into delta"
132-
log_data["operation_outcome"] = operation_outcome
133-
firehose_log["event"] = log_data
134-
firehose_logger.send_log(firehose_log)
135-
logger.info(log)
13691
else:
137-
log = f"Record NOT created for {imms_id}"
138-
operation_outcome["statusCode"] = "500"
139-
operation_outcome["statusDesc"] = "Exception"
92+
operation_outcome = {
93+
"statusCode": "200",
94+
"statusDesc": "Record from DPS skipped",
95+
}
14096
log_data["operation_outcome"] = operation_outcome
14197
firehose_log["event"] = log_data
14298
firehose_logger.send_log(firehose_log)
143-
logger.info(log)
144-
ret = False
99+
logger.info(f"Record from DPS skipped for {imms_id}")
100+
return True
101+
else:
102+
operation = Operation.DELETE_PHYSICAL
103+
new_image = record["dynamodb"]["Keys"]
104+
logger.info(f"Record to delta: {new_image}")
105+
imms_id = new_image["PK"]["S"].split("#")[1]
145106

146-
except Exception as e:
147-
operation_outcome["statusCode"] = "500"
148-
operation_outcome["statusDesc"] = "Exception"
149-
if intrusion_check:
150-
operation_outcome["diagnostics"] = "Incorrect invocation of Lambda"
151-
logger.exception("Incorrect invocation of Lambda")
107+
response = delta_table.put_item(
108+
Item={
109+
"PK": str(uuid.uuid4()),
110+
"ImmsID": imms_id,
111+
"Operation": Operation.DELETE_PHYSICAL,
112+
"VaccineType": "default",
113+
"SupplierSystem": "default",
114+
"DateTimeStamp": approximate_creation_time.isoformat(),
115+
"Source": delta_source,
116+
"Imms": "",
117+
"ExpiresAt": expiry_time_epoch,
118+
}
119+
)
120+
121+
end = time.time()
122+
log_data["time_taken"] = f"{round(end - start, 5)}s"
123+
operation_outcome = {"record": imms_id, "operation_type": operation}
124+
125+
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
126+
if error_records:
127+
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
128+
operation_outcome["statusCode"] = "207"
129+
operation_outcome["statusDesc"] = (
130+
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
131+
)
132+
else:
133+
log = f"Record Successfully created for {imms_id}"
134+
operation_outcome["statusCode"] = "200"
135+
operation_outcome["statusDesc"] = "Successfully synched into delta"
136+
logger.info(log)
152137
else:
153-
operation_outcome["diagnostics"] = f"Delta Lambda failure: {e}"
154-
logger.exception(f"Delta Lambda failure: {e}")
155-
send_message(event) # Send failed records to DLQ
138+
log = f"Record NOT created for {imms_id}"
139+
operation_outcome["statusCode"] = "500"
140+
operation_outcome["statusDesc"] = "Exception"
141+
logger.warning(log)
142+
return False
143+
156144
log_data["operation_outcome"] = operation_outcome
157145
firehose_log["event"] = log_data
158146
firehose_logger.send_log(firehose_log)
147+
return True
148+
149+
except Exception as e:
150+
logger.exception(f"Error processing record: {e}")
151+
return False
152+
153+
154+
def handler(event, context):
155+
ret = True
156+
logger.info("Starting Delta Handler")
157+
log_data = dict()
158+
firehose_log = dict()
159+
log_data["function_name"] = "delta_sync"
160+
try:
161+
dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
162+
delta_table = dynamodb.Table(delta_table_name)
163+
for record in event["Records"]:
164+
log_data["date_time"] = str(datetime.now())
165+
166+
# Process each record
167+
result = process_record(record, delta_table, log_data, firehose_log)
168+
if not result:
169+
ret = False
170+
171+
except Exception as e:
159172
ret = False
160-
return ret
173+
operation_outcome = {
174+
"statusCode": "500",
175+
"statusDesc": "Exception",
176+
"diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda"
177+
}
178+
logger.exception(operation_outcome["diagnostics"])
179+
send_message(event) # Send failed records to DLQ
180+
log_data["operation_outcome"] = operation_outcome
181+
firehose_log["event"] = log_data
182+
random_id = str(uuid.uuid4())
183+
firehose_logger.send_log(firehose_log)
184+
finally:
185+
return ret

delta_backend/src/log_firehose.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ def __init__(
1919
self.delivery_stream_name = stream_name
2020

2121
def send_log(self, log_message):
22-
log_to_splunk = log_message
23-
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
24-
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
2522
try:
23+
log_to_splunk = log_message
24+
logger.info(f"Log sent to Firehose for save: {log_to_splunk}")
25+
encoded_log_data = json.dumps(log_to_splunk).encode("utf-8")
2626
response = self.firehose_client.put_record(
2727
DeliveryStreamName=self.delivery_stream_name,
2828
Record={"Data": encoded_log_data},

0 commit comments

Comments
 (0)