33import pandas as pd
44import uuid
55from datetime import datetime , timezone
6- import csv
76from clients import logger , s3_client , table
87from constants import ACK_BUCKET , FORWARDEDFILE_PREFIX
98
@@ -104,7 +103,9 @@ def wait_for_ack_file(ack_prefix, input_file_name, timeout=120):
104103 if search_pattern in key :
105104 return key
106105 time .sleep (5 )
107- raise Exception (f"Ack file matching '{ search_pattern } ' not found in bucket { ACK_BUCKET } within { timeout } seconds." )
106+ raise AssertionError (
107+ f"Ack file matching '{ search_pattern } ' not found in bucket { ACK_BUCKET } within { timeout } seconds."
108+ )
108109
109110
110111def get_file_content_from_s3 (bucket , key ):
@@ -120,12 +121,12 @@ def check_ack_file_content(content, response_code, operation_outcome):
120121 2. If 'HEADER_RESPONSE_CODE' is 'Fatal Error', check 'OPERATION_OUTCOME'.
121122 3. If 'OPERATION_OUTCOME' is 'Ok', extract 'LOCAL_ID', convert it to IdentifierPK,
122123 fetch PK from DynamoDB, and ensure it matches 'IMMS_ID'."""
123-
124+
124125 reader = csv .DictReader (content .splitlines (), delimiter = "|" )
125126 rows = list (reader )
126127 for i , row in enumerate (rows ):
127128 if "HEADER_RESPONSE_CODE" not in row :
128- raise Exception (f"Row { i + 1 } does not have a 'HEADER_RESPONSE_CODE' column." )
129+ raise AssertionError (f"Row { i + 1 } does not have a 'HEADER_RESPONSE_CODE' column." )
129130 if row ["HEADER_RESPONSE_CODE" ].strip () != response_code :
130131 raise AssertionError (
131132 f"Row { i + 1 } : Expected RESPONSE '{ response_code } ', but found '{ row ['HEADER_RESPONSE_CODE' ]} '"
@@ -138,30 +139,31 @@ def check_ack_file_content(content, response_code, operation_outcome):
138139
139140 if row ["HEADER_RESPONSE_CODE" ].strip () == "OK" :
140141 if "LOCAL_ID" not in row :
141- raise Exception (f"Row { i + 1 } does not have a 'LOCAL_ID' column." )
142-
142+ raise AssertionError (f"Row { i + 1 } does not have a 'LOCAL_ID' column." )
143+
143144 # Extract LOCAL_ID and convert to IdentifierPK
144145 try :
145146 local_id , unique_id_uri = row ["LOCAL_ID" ].split ("^" )
146147 identifier_pk = f"{ unique_id_uri } #{ local_id } "
147148 except ValueError :
148- raise Exception (f"Row { i + 1 } : Invalid LOCAL_ID format - { row ['LOCAL_ID' ]} " )
149-
149+ raise AssertionError (f"Row { i + 1 } : Invalid LOCAL_ID format - { row ['LOCAL_ID' ]} " )
150+
150151 # Fetch PK from DynamoDB
151152 dynamo_pk = fetch_pk_from_dynamodb (identifier_pk )
152153 # Compare DynamoDB PK with IMMS_ID
153154 if dynamo_pk != row ["IMMS_ID" ]:
154155 raise AssertionError (
155- f"Row { i + 1 } : Mismatch - DynamoDB PK '{ dynamo_pk } ' does not match ACK file IMMS_ID '{ row ['IMMS_ID' ]} '"
156+ f"Row { i + 1 } : Mismatch - DynamoDB PK '{ dynamo_pk } ' does not match ACK file IMMS_ID '{ row ['IMMS_ID' ]} '"
156157 )
157158
159+
158160def fetch_pk_from_dynamodb (identifier_pk ):
159161 """Fetch PK with IdentifierPK as the query key."""
160162 try :
161163 response = table .query (
162- IndexName = "IdentifierGSI" ,
164+ IndexName = "IdentifierGSI" ,
163165 KeyConditionExpression = "IdentifierPK = :identifier_pk" ,
164- ExpressionAttributeValues = {":identifier_pk" : identifier_pk }
166+ ExpressionAttributeValues = {":identifier_pk" : identifier_pk },
165167 )
166168 if "Items" in response and response ["Items" ]:
167169 return response ["Items" ][0 ]["PK" ] # Return the first matched PK
@@ -170,4 +172,4 @@ def fetch_pk_from_dynamodb(identifier_pk):
170172
171173 except Exception as e :
172174 logger .error (f"Error fetching from DynamoDB: { e } " )
173- return "ERROR"
175+ return "ERROR"
0 commit comments