77import logging
88from botocore .exceptions import ClientError
99from log_firehose import FirehoseLogger
10- from Converter import Converter
11- from helpers .delta_data import DeltaData
12- from helpers .mappings import OperationName , EventName
10+ from helpers .db_processor import DbProcessor
11+ from helpers .mappings import OperationName , EventName , ActionFlag
12+ from helpers .record_processor import RecordProcessor
13+ from helpers .sqs_utils import send_message
1314
1415failure_queue_url = os .environ ["AWS_SQS_QUEUE_URL" ]
1516delta_table_name = os .environ ["DELTA_TABLE_NAME" ]
1819logger = logging .getLogger ()
1920logger .setLevel ("INFO" )
2021firehose_logger = FirehoseLogger ()
21- delta_data = DeltaData (delta_table_name , delta_source )
2222
2323
24- def send_message (record ):
25- # Create a message
26- message_body = record
27- # Use boto3 to interact with SQS
28- sqs_client = boto3 .client ("sqs" )
29- try :
30- # Send the record to the queue
31- print (f"Sending record to DLQ: { message_body } " )
32- sqs_client .send_message (QueueUrl = failure_queue_url , MessageBody = json .dumps (message_body ))
33- logger .info ("Record saved successfully to the DLQ" )
34- except ClientError as e :
35- logger .error (f"Error sending record to DLQ: { e } " )
24+ # def send_message(record):
25+ # # Create a message
26+ # message_body = record
27+ # # Use boto3 to interact with SQS
28+ # sqs_client = boto3.client("sqs")
29+ # try:
30+ # # Send the record to the queue
31+ # print(f"Sending record to DLQ: {message_body}")
32+ # sqs_client.send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(message_body))
33+ # logger.info("Record saved successfully to the DLQ")
34+ # except ClientError as e:
35+ # logger.error(f"Error sending record to DLQ: {e}")
3636
37+ def get_vaccine_type (patientsk ) -> str :
38+ parsed = [str .strip (str .lower (s )) for s in patientsk .split ("#" )]
39+ return parsed [0 ]
3740
3841def handler (event , context ):
3942 logger .info ("Starting Delta Handler" )
@@ -45,81 +48,17 @@ def handler(event, context):
4548 try :
4649 dynamodb = boto3 .resource ("dynamodb" , region_name = "eu-west-2" )
4750 delta_table = dynamodb .Table (delta_table_name )
48-
49- # Converting ApproximateCreationDateTime directly to string will give Unix timestamp
50- # I am converting it to isofformat for filtering purpose. This can be changed accordingly
51-
51+ db_processor = DbProcessor (delta_table , delta_source , logger )
52+ record_processor = RecordProcessor (delta_table ,
53+ delta_source ,
54+ log_data ,
55+ db_processor ,
56+ firehose_logger ,
57+ firehose_log ,
58+ logger )
5259 for record in event ["Records" ]:
53- start = time .time ()
54- log_data ["date_time" ] = str (datetime .now ())
55- intrusion_check = False
56- approximate_creation_time = datetime .utcfromtimestamp (record ["dynamodb" ]["ApproximateCreationDateTime" ])
57- expiry_time = approximate_creation_time + timedelta (days = 30 )
58- expiry_time_epoch = int (expiry_time .timestamp ())
59- error_records = []
60- response = str ()
61- imms_id = str ()
62- operation = str ()
63- if record ["eventName" ] != EventName .DELETE_PHYSICAL :
64- new_image = record ["dynamodb" ]["NewImage" ]
65- imms_id = new_image ["PK" ]["S" ].split ("#" )[1 ]
66- supplier_system = new_image ["SupplierSystem" ]["S" ]
67- if supplier_system not in ("DPSFULL" , "DPSREDUCED" ):
68- response , error_records = delta_data .write_to_db (new_image , imms_id , approximate_creation_time , expiry_time_epoch )
69- else :
70- operation_outcome ["statusCode" ] = "200"
71- operation_outcome ["statusDesc" ] = "Record from DPS skipped"
72- log_data ["operation_outcome" ] = operation_outcome
73- firehose_log ["event" ] = log_data
74- firehose_logger .send_log (firehose_log )
75- logger .info (f"Record from DPS skipped for { imms_id } " )
76- return {"statusCode" : 200 , "body" : f"Record from DPS skipped for { imms_id } " }
77- else :
78- operation = OperationName .DELETE_PHYSICAL
79- new_image = record ["dynamodb" ]["Keys" ]
80- logger .info (f"Record to delta:{ new_image } " )
81- imms_id = new_image ["PK" ]["S" ].split ("#" )[1 ]
82- response = delta_table .put_item (
83- Item = {
84- "PK" : str (uuid .uuid4 ()),
85- "ImmsID" : imms_id ,
86- "Operation" : OperationName .DELETE_PHYSICAL ,
87- "VaccineType" : "default" ,
88- "SupplierSystem" : "default" ,
89- "DateTimeStamp" : approximate_creation_time .isoformat (),
90- "Source" : delta_source ,
91- "Imms" : "" ,
92- "ExpiresAt" : expiry_time_epoch ,
93- }
94- )
95- end = time .time ()
96- log_data ["time_taken" ] = f"{ round (end - start , 5 )} s"
97- operation_outcome = {"record" : imms_id , "operation_type" : operation }
98- if response ["ResponseMetadata" ]["HTTPStatusCode" ] == 200 :
99- if error_records :
100- log = f"Partial success: successfully synced into delta, but issues found within record { imms_id } "
101- operation_outcome ["statusCode" ] = "207"
102- operation_outcome ["statusDesc" ] = (
103- f"Partial success: successfully synced into delta, but issues found within record { json .dumps (error_records )} "
104- )
105- else :
106- log = f"Record Successfully created for { imms_id } "
107- operation_outcome ["statusCode" ] = "200"
108- operation_outcome ["statusDesc" ] = "Successfully synched into delta"
109- log_data ["operation_outcome" ] = operation_outcome
110- firehose_log ["event" ] = log_data
111- firehose_logger .send_log (firehose_log )
112- logger .info (log )
113- return {"statusCode" : 200 , "body" : "Records processed successfully" }
114- else :
115- log = f"Record NOT created for { imms_id } "
116- operation_outcome ["statusCode" ] = "500"
117- operation_outcome ["statusDesc" ] = "Exception"
118- log_data ["operation_outcome" ] = operation_outcome
119- firehose_log ["event" ] = log_data
120- firehose_logger .send_log (firehose_log )
121- logger .info (log )
122- return {"statusCode" : 500 , "body" : "Records not processed successfully" }
60+ record_processor .process_record (record )
61+
12362
12463 except Exception as e :
12564 operation_outcome ["statusCode" ] = "500"
@@ -138,3 +77,4 @@ def handler(event, context):
13877 "statusCode" : 500 ,
13978 "body" : "Records not processed" ,
14079 }
80+
0 commit comments