55import uuid
66from datetime import datetime
77import csv
8- from clients import logger , s3_client
8+ from clients import logger , s3_client , table
99from constants import ACK_BUCKET , FORWARDEDFILE_PREFIX
1010
1111
@@ -39,7 +39,6 @@ def generate_csv(file_name, fore_name, dose_amount, action_flag):
3939 else f"COVID19_Vaccinations_v5_YGM41_{ timestamp } .csv"
4040 )
4141 df .to_csv (file_name , index = False , sep = "|" , quoting = csv .QUOTE_MINIMAL )
42- # logger.info(f"Generated CSV file: {file_name}")
4342 return file_name
4443
4544
@@ -88,7 +87,6 @@ def upload_file_to_s3(file_name, bucket, prefix):
8887 key = f"{ prefix } { file_name } "
8988 with open (file_name , "rb" ) as f :
9089 s3_client .put_object (Bucket = bucket , Key = key , Body = f )
91- # logger.info(f"Uploaded file to s3://{bucket}/{key}")
9290 return key
9391
9492
@@ -105,7 +103,6 @@ def wait_for_ack_file(ack_prefix, input_file_name, timeout=120):
105103 for obj in response ["Contents" ]:
106104 key = obj ["Key" ]
107105 if search_pattern in key :
108- # logger.info(f"Ack file found: s3://{ACK_BUCKET}/{key}")
109106 return key
110107 time .sleep (5 )
111108 raise Exception (f"Ack file matching '{ search_pattern } ' not found in bucket { ACK_BUCKET } within { timeout } seconds." )
@@ -119,9 +116,12 @@ def get_file_content_from_s3(bucket, key):
119116
120117
121118def check_ack_file_content (content , response_code , operation_outcome ):
122- """Parse the acknowledgment (ACK) CSV file and verify that each row's 'HEADER_RESPONSE_CODE' column
123- matches the expected response code. If the response code is 'Fatal Error', it also validates
124- the 'OPERATION_OUTCOME' column."""
119+ """Parse the acknowledgment (ACK) CSV file and verify that:
120+ 1. 'HEADER_RESPONSE_CODE' matches the expected response code.
121+ 2. If 'HEADER_RESPONSE_CODE' is 'Fatal Error', check 'OPERATION_OUTCOME'.
122+ 3. If 'OPERATION_OUTCOME' is 'Ok', extract 'LOCAL_ID', convert it to IdentifierPK,
123+ fetch PK from DynamoDB, and ensure it matches 'IMMS_ID'."""
124+
125125 reader = csv .DictReader (content .splitlines (), delimiter = "|" )
126126 rows = list (reader )
127127 for i , row in enumerate (rows ):
@@ -136,4 +136,39 @@ def check_ack_file_content(content, response_code, operation_outcome):
136136 raise AssertionError (
137137 f"Row { i + 1 } : Expected RESPONSE '{ operation_outcome } ', but found '{ row ['OPERATION_OUTCOME' ]} '"
138138 )
139- # logger.info("All rows in the ack file have been verified successfully.")
139+
140+ if row ["HEADER_RESPONSE_CODE" ].strip () == "OK" :
141+ if "LOCAL_ID" not in row :
142+ raise Exception (f"Row { i + 1 } does not have a 'LOCAL_ID' column." )
143+
144+ # Extract LOCAL_ID and convert to IdentifierPK
145+ try :
146+ local_id , unique_id_uri = row ["LOCAL_ID" ].split ("^" )
147+ identifier_pk = f"{ unique_id_uri } #{ local_id } "
148+ except ValueError :
149+ raise Exception (f"Row { i + 1 } : Invalid LOCAL_ID format - { row ['LOCAL_ID' ]} " )
150+
151+ # Fetch PK from DynamoDB
152+ dynamo_pk = fetch_pk_from_dynamodb (identifier_pk )
153+ # Compare DynamoDB PK with IMMS_ID
154+ if dynamo_pk != row ["IMMS_ID" ]:
155+ raise AssertionError (
156+ f"Row { i + 1 } : Mismatch - DynamoDB PK '{ dynamo_pk } ' does not match ACK file IMMS_ID '{ row ['IMMS_ID' ]} '"
157+ )
158+
159+ def fetch_pk_from_dynamodb (identifier_pk ):
160+ """Fetch PK with IdentifierPK as the query key."""
161+ try :
162+ response = table .query (
163+ IndexName = "IdentifierGSI" ,
164+ KeyConditionExpression = "IdentifierPK = :identifier_pk" ,
165+ ExpressionAttributeValues = {":identifier_pk" : identifier_pk }
166+ )
167+ if "Items" in response and response ["Items" ]:
168+ return response ["Items" ][0 ]["PK" ] # Return the first matched PK
169+ else :
170+ return "NOT_FOUND"
171+
172+ except Exception as e :
173+ logger .error (f"Error fetching from DynamoDB: { e } " )
174+ return "ERROR"
0 commit comments