1-
2- import os
3- import boto3
41import time
52import csv
63import pandas as pd
96from datetime import datetime
107import csv
118from clients import logger , s3_client
12- from constants import ACK_BUCKET , ACK_PREFIX
9+ from constants import ACK_BUCKET , FORWARDEDFILE_PREFIX
10+
1311
14- def generate_csv (fore_name , dose_amount ):
12+ def generate_csv (file_name , fore_name , dose_amount , action_flag ):
1513 """
16- Generate a CSV file with 3 rows and unique GUIDs for UNIQUE_ID.
17- Filename is timestamped for uniqueness.
14+ Generate a CSV file with 2 rows.
15+ - For `CREATE`, both rows have unique `UNIQUE_ID` with `"ACTION_FLAG": "NEW"`.
16+ - For `UPDATE`, one row has `"ACTION_FLAG": "NEW"` and the other `"ACTION_FLAG": "UPDATE"` with the same `UNIQUE_ID`.
17+ - For `DELETE`, one row has `"ACTION_FLAG": "NEW"` and the other `"ACTION_FLAG": "DELETE"` with the same `UNIQUE_ID`.
1818 """
19- unique_ids = [str (uuid .uuid4 ()) for _ in range (3 )]
2019 data = []
21- for i in range (3 ):
22- row = {
23- "NHS_NUMBER" : "9732928395" ,
24- "PERSON_FORENAME" : fore_name ,
25- "PERSON_SURNAME" : "James" ,
26- "PERSON_DOB" : "20080217" ,
27- "PERSON_GENDER_CODE" : "0" ,
28- "PERSON_POSTCODE" : "WD25 0DZ" ,
29- "DATE_AND_TIME" : datetime .utcnow ().strftime ("%Y%m%dT%H%M%S" ),
30- "SITE_CODE" : "RVVKC" ,
31- "SITE_CODE_TYPE_URI" : "https://fhir.nhs.uk/Id/ods-organization-code" ,
32- "UNIQUE_ID" : unique_ids [i ],
33- "UNIQUE_ID_URI" : "https://www.ravs.england.nhs.uk/" ,
34- "ACTION_FLAG" : "new" ,
35- "PERFORMING_PROFESSIONAL_FORENAME" : "PHYLIS" ,
36- "PERFORMING_PROFESSIONAL_SURNAME" : "James" ,
37- "RECORDED_DATE" : datetime .utcnow ().strftime ("%Y%m%d" ),
38- "PRIMARY_SOURCE" : "TRUE" ,
39- "VACCINATION_PROCEDURE_CODE" : "956951000000104" ,
40- "VACCINATION_PROCEDURE_TERM" : "RSV vaccination in pregnancy (procedure)" ,
41- "DOSE_SEQUENCE" : "1" ,
42- "VACCINE_PRODUCT_CODE" : "42223111000001107" ,
43- "VACCINE_PRODUCT_TERM" : "Quadrivalent influenza vaccine (Sanofi Pasteur)" ,
44- "VACCINE_MANUFACTURER" : "Sanofi Pasteur" ,
45- "BATCH_NUMBER" : "BN92478105653" ,
46- "EXPIRY_DATE" : "20240915" ,
47- "SITE_OF_VACCINATION_CODE" : "368209003" ,
48- "SITE_OF_VACCINATION_TERM" : "Right arm" ,
49- "ROUTE_OF_VACCINATION_CODE" : "1210999013" ,
50- "ROUTE_OF_VACCINATION_TERM" : "Intradermal use" ,
51- "DOSE_AMOUNT" : dose_amount ,
52- "DOSE_UNIT_CODE" : "2622896019" ,
53- "DOSE_UNIT_TERM" : "Inhalation - unit of product usage" ,
54- "INDICATION_CODE" : "1037351000000105" ,
55- "LOCATION_CODE" : "RJC02" ,
56- "LOCATION_CODE_TYPE_URI" : "https://fhir.nhs.uk/Id/ods-organization-code"
57- }
58- data .append (row )
20+
21+ if action_flag == "CREATE" :
22+ unique_ids = [str (uuid .uuid4 ()), str (uuid .uuid4 ())]
23+ for unique_id in unique_ids :
24+ data .append (create_row (unique_id , fore_name , dose_amount , "NEW" ))
25+ elif action_flag == "UPDATE" :
26+ unique_id = str (uuid .uuid4 ())
27+ data .append (create_row (unique_id , fore_name , dose_amount , "NEW" ))
28+ data .append (create_row (unique_id , fore_name , dose_amount , "UPDATE" ))
29+ elif action_flag == "DELETE" :
30+ unique_id = str (uuid .uuid4 ())
31+ data .append (create_row (unique_id , fore_name , dose_amount , "NEW" ))
32+ data .append (create_row (unique_id , fore_name , dose_amount , "DELETE" ))
33+
5934 df = pd .DataFrame (data )
6035 timestamp = datetime .now (pytz .UTC ).strftime ("%Y%m%dT%H%M%S%f" )[:- 3 ]
61- file_name = f"COVID19_Vaccinations_v5_YGM41_{ timestamp } .csv"
36+ file_name = (
37+ f"COVID19_Vaccinations_v4_YGM41_{ timestamp } .csv"
38+ if file_name
39+ else f"COVID19_Vaccinations_v5_YGM41_{ timestamp } .csv"
40+ )
6241 df .to_csv (file_name , index = False , sep = "|" , quoting = csv .QUOTE_MINIMAL )
63- print (f"Generated CSV file: { file_name } " )
42+ # logger.info (f"Generated CSV file: {file_name}")
6443 return file_name
6544
6645
46+ def create_row (unique_id , fore_name , dose_amount , action_flag ):
47+ """Helper function to create a single row with the specified UNIQUE_ID and ACTION_FLAG."""
48+ return {
49+ "NHS_NUMBER" : "9732928395" ,
50+ "PERSON_FORENAME" : fore_name ,
51+ "PERSON_SURNAME" : "James" ,
52+ "PERSON_DOB" : "20080217" ,
53+ "PERSON_GENDER_CODE" : "0" ,
54+ "PERSON_POSTCODE" : "WD25 0DZ" ,
55+ "DATE_AND_TIME" : datetime .utcnow ().strftime ("%Y%m%dT%H%M%S" ),
56+ "SITE_CODE" : "RVVKC" ,
57+ "SITE_CODE_TYPE_URI" : "https://fhir.nhs.uk/Id/ods-organization-code" ,
58+ "UNIQUE_ID" : unique_id ,
59+ "UNIQUE_ID_URI" : "https://www.ravs.england.nhs.uk/" ,
60+ "ACTION_FLAG" : action_flag ,
61+ "PERFORMING_PROFESSIONAL_FORENAME" : "PHYLIS" ,
62+ "PERFORMING_PROFESSIONAL_SURNAME" : "James" ,
63+ "RECORDED_DATE" : datetime .utcnow ().strftime ("%Y%m%d" ),
64+ "PRIMARY_SOURCE" : "TRUE" ,
65+ "VACCINATION_PROCEDURE_CODE" : "956951000000104" ,
66+ "VACCINATION_PROCEDURE_TERM" : "RSV vaccination in pregnancy (procedure)" ,
67+ "DOSE_SEQUENCE" : "1" ,
68+ "VACCINE_PRODUCT_CODE" : "42223111000001107" ,
69+ "VACCINE_PRODUCT_TERM" : "Quadrivalent influenza vaccine (Sanofi Pasteur)" ,
70+ "VACCINE_MANUFACTURER" : "Sanofi Pasteur" ,
71+ "BATCH_NUMBER" : "BN92478105653" ,
72+ "EXPIRY_DATE" : "20240915" ,
73+ "SITE_OF_VACCINATION_CODE" : "368209003" ,
74+ "SITE_OF_VACCINATION_TERM" : "Right arm" ,
75+ "ROUTE_OF_VACCINATION_CODE" : "1210999013" ,
76+ "ROUTE_OF_VACCINATION_TERM" : "Intradermal use" ,
77+ "DOSE_AMOUNT" : dose_amount ,
78+ "DOSE_UNIT_CODE" : "2622896019" ,
79+ "DOSE_UNIT_TERM" : "Inhalation - unit of product usage" ,
80+ "INDICATION_CODE" : "1037351000000105" ,
81+ "LOCATION_CODE" : "RJC02" ,
82+ "LOCATION_CODE_TYPE_URI" : "https://fhir.nhs.uk/Id/ods-organization-code" ,
83+ }
84+
85+
6786def upload_file_to_s3 (file_name , bucket , prefix ):
6887 """Upload the given file to the specified bucket under the provided prefix."""
6988 key = f"{ prefix } { file_name } "
7089 with open (file_name , "rb" ) as f :
7190 s3_client .put_object (Bucket = bucket , Key = key , Body = f )
72- logger .info (f"Uploaded file to s3://{ bucket } /{ key } " )
91+ # logger.info(f"Uploaded file to s3://{bucket}/{key}")
7392 return key
7493
7594
76- def wait_for_ack_file (input_file_name , timeout = 120 ):
95+ def wait_for_ack_file (ack_prefix , input_file_name , timeout = 120 ):
7796 """Poll the ACK_BUCKET for an ack file that contains the input_file_name as a substring."""
7897 filename_without_ext = input_file_name [:- 4 ] if input_file_name .endswith (".csv" ) else input_file_name
79- search_pattern = f"{ ACK_PREFIX } { filename_without_ext } "
98+ search_pattern = f"{ ack_prefix if ack_prefix else FORWARDEDFILE_PREFIX } { filename_without_ext } "
8099 start_time = time .time ()
81100 while time .time () - start_time < timeout :
82- response = s3_client .list_objects_v2 (Bucket = ACK_BUCKET , Prefix = ACK_PREFIX )
101+ response = s3_client .list_objects_v2 (
102+ Bucket = ACK_BUCKET , Prefix = ack_prefix if ack_prefix else FORWARDEDFILE_PREFIX
103+ )
83104 if "Contents" in response :
84105 for obj in response ["Contents" ]:
85106 key = obj ["Key" ]
86107 if search_pattern in key :
87- print (f"Ack file found: s3://{ ACK_BUCKET } /{ key } " )
108+ # logger.info (f"Ack file found: s3://{ACK_BUCKET}/{key}")
88109 return key
89110 time .sleep (5 )
90111 raise Exception (f"Ack file matching '{ search_pattern } ' not found in bucket { ACK_BUCKET } within { timeout } seconds." )
@@ -101,11 +122,11 @@ def check_ack_file_content(content, response_code):
101122 """Parse the ack CSV file and verify each row's 'HEADER_RESPONSE_CODE' column matches the response code."""
102123 reader = csv .DictReader (content .splitlines (), delimiter = "|" )
103124 rows = list (reader )
104- if len (rows ) != 3 :
105- raise Exception (f"Expected 3 rows in the ack file, found { len (rows )} ." )
106125 for i , row in enumerate (rows ):
107126 if "HEADER_RESPONSE_CODE" not in row :
108127 raise Exception (f"Row { i + 1 } does not have a 'HEADER_RESPONSE_CODE' column." )
109128 if row ["HEADER_RESPONSE_CODE" ].strip () != response_code :
110- raise AssertionError (f"Row { i + 1 } : Expected RESPONSE '{ response_code } ', but found '{ row ['HEADER_RESPONSE_CODE' ]} '" )
111- print ("All rows in the ack file have been verified successfully." )
129+ raise AssertionError (
130+ f"Row { i + 1 } : Expected RESPONSE '{ response_code } ', but found '{ row ['HEADER_RESPONSE_CODE' ]} '"
131+ )
132+ # logger.info("All rows in the ack file have been verified successfully.")
0 commit comments