Skip to content

Commit 3fab57f

Browse files
authored
Merge branch 'master' into dependabot/pip/ack_backend/pip-minor-patch-2249d2c524
2 parents bd16665 + 24484d9 commit 3fab57f

File tree

10 files changed

+278
-51
lines changed

10 files changed

+278
-51
lines changed

package-lock.json

Lines changed: 22 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
"homepage": "https://github.com/NHSDigital/immunisation-fhir-api",
1313
"devDependencies": {
1414
"license-checker": "^25.0.1",
15-
"@redocly/cli": "^2.0.8"
15+
"@redocly/cli": "^2.1.0"
1616
}
1717
}

recordprocessor/src/batch_processor.py

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,24 @@
1212
from clients import logger
1313
from file_level_validation import file_level_validation
1414
from errors import NoOperationPermissions, InvalidHeaders
15+
from utils_for_recordprocessor import get_csv_content_dict_reader
16+
from typing import Optional
1517

1618

17-
def process_csv_to_fhir(incoming_message_body: dict) -> None:
19+
def process_csv_to_fhir(incoming_message_body: dict) -> int:
1820
"""
1921
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
2022
and documents the outcome for each row in the ack file.
23+
Returns the number of rows processed. While this is not used by the handler, the number of rows
24+
processed must be correct and therefore is returned for logging and test purposes.
2125
"""
26+
encoder = "utf-8" # default encoding
2227
try:
28+
incoming_message_body["encoder"] = encoder
2329
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
24-
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
25-
# If the file is invalid, processing should cease immediately
26-
return
30+
except (InvalidHeaders, NoOperationPermissions, Exception) as e: # pylint: disable=broad-exception-caught
31+
logger.error(f"File level validation failed: {e}") # If the file is invalid, processing should cease
32+
return 0
2733

2834
file_id = interim_message_body.get("message_id")
2935
vaccine = interim_message_body.get("vaccine")
@@ -36,40 +42,81 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3642
target_disease = map_target_disease(vaccine)
3743

3844
row_count = 0
39-
for row in csv_reader:
40-
row_count += 1
41-
row_id = f"{file_id}^{row_count}"
42-
logger.info("MESSAGE ID : %s", row_id)
45+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
46+
created_at_formatted_string, csv_reader, target_disease)
47+
48+
if err:
49+
if isinstance(err, UnicodeDecodeError):
50+
""" resolves encoding issue VED-754 """
51+
logger.warning(f"Encoding Error: {err}.")
52+
new_encoder = "cp1252"
53+
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
54+
encoder = new_encoder
55+
56+
# load alternative encoder
57+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
58+
# re-read the file and skip processed rows
59+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60+
created_at_formatted_string, csv_reader, target_disease, row_count)
61+
else:
62+
logger.error(f"Row Processing error: {err}")
63+
raise err
4364

44-
# Process the row to obtain the details needed for the message_body and ack file
45-
details_from_processing = process_row(target_disease, allowed_operations, row)
46-
47-
# Create the message body for sending
48-
outgoing_message_body = {
49-
"row_id": row_id,
50-
"file_key": file_key,
51-
"supplier": supplier,
52-
"vax_type": vaccine,
53-
"created_at_formatted_string": created_at_formatted_string,
54-
**details_from_processing,
55-
}
56-
57-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
65+
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
66+
return row_count
5867

59-
logger.info("Total rows processed: %s", row_count)
6068

61-
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
69+
# Process the row to obtain the details needed for the message_body and ack file
70+
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
71+
csv_reader, target_disease,
72+
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
73+
"""
74+
Processes each row in the csv_reader starting from start_row.
75+
"""
76+
row_count = 0
77+
start_row = total_rows_processed_count
78+
try:
79+
for row in csv_reader:
80+
row_count += 1
81+
if row_count > start_row:
82+
row_id = f"{file_id}^{row_count}"
83+
logger.info("MESSAGE ID : %s", row_id)
84+
# Log progress every 1000 rows and the first 10 rows after a restart
85+
if total_rows_processed_count % 1000 == 0:
86+
logger.info(f"Process: {total_rows_processed_count+1}")
87+
if start_row > 0 and row_count <= start_row+10:
88+
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count+1}")
89+
# Process the row to obtain the details needed for the message_body and ack file
90+
details_from_processing = process_row(target_disease, allowed_operations, row)
91+
# Create the message body for sending
92+
outgoing_message_body = {
93+
"row_id": row_id,
94+
"file_key": file_key,
95+
"supplier": supplier,
96+
"vax_type": vaccine,
97+
"created_at_formatted_string": created_at_formatted_string,
98+
**details_from_processing,
99+
}
100+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
101+
total_rows_processed_count += 1
102+
except UnicodeDecodeError as error: # pylint: disable=broad-exception-caught
103+
logger.error("Error processing row %s: %s", row_count, error)
104+
return total_rows_processed_count, error
105+
106+
return total_rows_processed_count, None
62107

63108

64109
def main(event: str) -> None:
65110
"""Process each row of the file"""
66111
logger.info("task started")
67112
start = time.time()
113+
n_rows_processed = 0
68114
try:
69-
process_csv_to_fhir(incoming_message_body=json.loads(event))
115+
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
70116
except Exception as error: # pylint: disable=broad-exception-caught
71117
logger.error("Error processing message: %s", error)
72118
end = time.time()
119+
logger.info("Total rows processed: %s", n_rows_processed)
73120
logger.info("Total time for completion: %ss", round(end - start, 5))
74121

75122

recordprocessor/src/file_level_validation.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,17 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7676
file_key = incoming_message_body.get("filename")
7777
permission = incoming_message_body.get("permission")
7878
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
79+
encoder = incoming_message_body.get("encoder", "utf-8")
7980

8081
# Fetch the data
81-
csv_reader = get_csv_content_dict_reader(file_key)
82+
try:
83+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
84+
validate_content_headers(csv_reader)
85+
except UnicodeDecodeError as e:
86+
logger.warning("Invalid Encoding detected: %s", e)
87+
# retry with cp1252 encoding
88+
csv_reader = get_csv_content_dict_reader(file_key, encoder="cp1252")
89+
validate_content_headers(csv_reader)
8290

8391
validate_content_headers(csv_reader)
8492

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ def get_environment() -> str:
1313
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1414

1515

16-
def get_csv_content_dict_reader(file_key: str) -> DictReader:
16+
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
1717
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
1818
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
1919
binary_io = response["Body"]
20-
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
20+
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
2121
return DictReader(text_io, delimiter="|")
2222

2323

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
2+
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
3+
"9473084064"|"DORTHY"|"DICEMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0802"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473087667"|"SHAYE"|"PUHAR"|"20010903"|"0"|"NE39 2HB"|"20250730T18191400"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0803"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170431005"|"Administration of booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"MSBDJRRNHLWPFZQYDFGM"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684007"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
5+
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
6+
"9473089333"|"HELAH"|"JUDD"|"20010908"|"2"|"NE47 9HQ"|"20250730T13504900"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0805"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170433008"|"Administration of second dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"1"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"SNOJRTGYMZLPMGDNRBVD"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684009"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
7+
""|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0806"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
8+
"0"|"DORTHY"|"DIC�MBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0807"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"

0 commit comments

Comments
 (0)