Skip to content

Commit 7c5a363

Browse files
authored
Hotfix/ved 754 processor task cp1252 encoding (#796)
* Process row count * comment - replace InvalidEncoding with UnicodeDecodeError
1 parent 8217693 commit 7c5a363

File tree

10 files changed

+599
-489
lines changed

10 files changed

+599
-489
lines changed

recordprocessor/poetry.lock

Lines changed: 325 additions & 438 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

recordprocessor/pyproject.toml

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,28 @@ readme = "README.md"
77
packages = [{include = "src"}]
88

99
[tool.poetry.dependencies]
10-
python = "~3.11"
11-
"fhir.resources" = "~7.0.2"
12-
boto3 = "~1.38.42"
13-
boto3-stubs-lite = {extras = ["dynamodb"], version = "~1.38.42"}
14-
aws-lambda-typing = "~2.20.0"
15-
moto = "^4"
16-
requests = "~2.32.4"
17-
responses = "~0.25.7"
18-
pydantic = "~1.10.13"
19-
pyjwt = "~2.10.1"
20-
cryptography = "~42.0.4"
21-
cffi = "~1.17.1"
22-
jsonpath-ng = "^1.6.0"
23-
simplejson = "^3.20.1"
24-
structlog = "^24.1.0"
25-
pandas = "^2.3.0"
26-
freezegun = "^1.5.2"
27-
coverage = "^7.9.1"
28-
redis = "^6.2.0"
29-
numpy = "~2.2.6"
10+
python = "~3.11"
11+
"fhir.resources" = "~7.0.2"
12+
boto3 = "~1.38.42"
13+
boto3-stubs-lite = { extras = ["dynamodb"], version = "~1.38.42" }
14+
aws-lambda-typing = "~2.20.0"
15+
requests = "~2.32.4"
16+
responses = "~0.25.7"
17+
pydantic = "~1.10.13"
18+
pyjwt = "~2.10.1"
19+
cryptography = "~42.0.4"
20+
cffi = "~1.17.1"
21+
jsonpath-ng = "^1.6.0"
22+
simplejson = "^3.20.1"
23+
structlog = "^24.1.0"
24+
redis = "^5.1.1"
25+
coverage = "^7.9.1"
26+
freezegun = "^1.5.2"
27+
fakeredis = "^2.30.1"
3028

31-
[build-system]
32-
requires = ["poetry-core ~= 1.5.0"]
29+
[tool.poetry.group.dev.dependencies]
30+
moto = {extras = ["s3"], version = "^5.1.12"}
3331

34-
build-backend = "poetry.core.masonry.api"
32+
[build-system]
33+
requires = ["poetry-core >= 1.5.0"]
34+
build-backend = "poetry.core.masonry.api"

recordprocessor/src/batch_processor.py

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,25 @@
99
from clients import logger
1010
from file_level_validation import file_level_validation
1111
from errors import NoOperationPermissions, InvalidHeaders
12+
from utils_for_recordprocessor import get_csv_content_dict_reader
13+
from typing import Optional
1214

1315

14-
def process_csv_to_fhir(incoming_message_body: dict) -> None:
16+
def process_csv_to_fhir(incoming_message_body: dict) -> int:
1517
"""
1618
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
1719
and documents the outcome for each row in the ack file.
20+
Returns the number of rows processed. While this is not used by the handler, the number of rows
21+
processed must be correct and therefore is returned for logging and test purposes.
1822
"""
23+
encoder = "utf-8" # default encoding
1924
try:
25+
incoming_message_body["encoder"] = encoder
2026
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
21-
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
27+
except (InvalidHeaders, NoOperationPermissions, Exception) as e: # pylint: disable=broad-exception-caught
28+
logger.error(f"File level validation failed: {e}")
2229
# If the file is invalid, processing should cease immediately
23-
return
30+
return 0
2431

2532
file_id = interim_message_body.get("message_id")
2633
vaccine = interim_message_body.get("vaccine")
@@ -31,40 +38,81 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3138
csv_reader = interim_message_body.get("csv_dict_reader")
3239

3340
target_disease = map_target_disease(vaccine)
34-
3541
row_count = 0
36-
for row in csv_reader:
37-
row_count += 1
38-
row_id = f"{file_id}^{row_count}"
39-
logger.info("MESSAGE ID : %s", row_id)
42+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
43+
created_at_formatted_string, csv_reader, target_disease)
44+
45+
if err:
46+
if isinstance(err, UnicodeDecodeError):
47+
""" resolves encoding issue VED-754 """
48+
logger.warning(f"Encoding Error: {err}.")
49+
new_encoder = "cp1252"
50+
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
51+
encoder = new_encoder
52+
# load alternative encoder
53+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
54+
# re-read the file and skip processed rows
55+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
56+
created_at_formatted_string, csv_reader, target_disease, row_count)
57+
else:
58+
logger.error(f"Row Processing error: {err}")
59+
raise err
60+
61+
return row_count
62+
4063

41-
# Process the row to obtain the details needed for the message_body and ack file
42-
details_from_processing = process_row(target_disease, allowed_operations, row)
64+
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
65+
csv_reader, target_disease,
66+
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
67+
"""
68+
Processes each row in the csv_reader starting from start_row.
69+
"""
70+
row_count = 0
71+
start_row = total_rows_processed_count
72+
try:
73+
for row in csv_reader:
74+
row_count += 1
75+
if row_count > start_row:
76+
row_id = f"{file_id}^{row_count}"
77+
logger.info("MESSAGE ID : %s", row_id)
78+
# Log progress every 1000 rows and the first 10 rows after a restart
79+
if total_rows_processed_count % 1000 == 0:
80+
logger.info(f"Process: {total_rows_processed_count+1}")
81+
if start_row > 0 and row_count <= start_row+10:
82+
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count+1}")
4383

44-
# Create the message body for sending
45-
outgoing_message_body = {
46-
"row_id": row_id,
47-
"file_key": file_key,
48-
"supplier": supplier,
49-
"vax_type": vaccine,
50-
"created_at_formatted_string": created_at_formatted_string,
51-
**details_from_processing,
52-
}
84+
# Process the row to obtain the details needed for the message_body and ack file
85+
details_from_processing = process_row(target_disease, allowed_operations, row)
86+
# Create the message body for sending
87+
outgoing_message_body = {
88+
"row_id": row_id,
89+
"file_key": file_key,
90+
"supplier": supplier,
91+
"vax_type": vaccine,
92+
"created_at_formatted_string": created_at_formatted_string,
93+
**details_from_processing,
94+
}
95+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
96+
total_rows_processed_count += 1
5397

54-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
98+
except UnicodeDecodeError as error: # pylint: disable=broad-exception-caught
99+
logger.error("Error processing row %s: %s", row_count, error)
100+
return total_rows_processed_count, error
55101

56-
logger.info("Total rows processed: %s", row_count)
102+
return total_rows_processed_count, None
57103

58104

59105
def main(event: str) -> None:
60106
"""Process each row of the file"""
61107
logger.info("task started")
62108
start = time.time()
109+
n_rows_processed = 0
63110
try:
64-
process_csv_to_fhir(incoming_message_body=json.loads(event))
111+
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
65112
except Exception as error: # pylint: disable=broad-exception-caught
66113
logger.error("Error processing message: %s", error)
67114
end = time.time()
115+
logger.info("Total rows processed: %s", n_rows_processed)
68116
logger.info("Total time for completion: %ss", round(end - start, 5))
69117

70118

recordprocessor/src/file_level_validation.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,17 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7676
file_key = incoming_message_body.get("filename")
7777
permission = incoming_message_body.get("permission")
7878
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
79+
encoder = incoming_message_body.get("encoder", "utf-8")
7980

8081
# Fetch the data
81-
csv_reader = get_csv_content_dict_reader(file_key)
82-
83-
validate_content_headers(csv_reader)
82+
try:
83+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
84+
validate_content_headers(csv_reader)
85+
except UnicodeDecodeError as e:
86+
logger.warning("Invalid Encoding detected: %s", e)
87+
# retry with cp1252 encoding
88+
csv_reader = get_csv_content_dict_reader(file_key, encoder="cp1252")
89+
validate_content_headers(csv_reader)
8490

8591
# Validate has permission to perform at least one of the requested actions
8692
allowed_operations_set = get_permitted_operations(supplier, vaccine, permission)
@@ -98,7 +104,6 @@ def file_level_validation(incoming_message_body: dict) -> dict:
98104
"created_at_formatted_string": created_at_formatted_string,
99105
"csv_dict_reader": csv_reader,
100106
}
101-
102107
except (InvalidHeaders, NoOperationPermissions, Exception) as error:
103108
logger.error("Error in file_level_validation: %s", error)
104109

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ def get_environment() -> str:
1515
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1616

1717

18-
def get_csv_content_dict_reader(file_key: str) -> DictReader:
18+
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
1919
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
2020
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
2121
binary_io = response["Body"]
22-
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
22+
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
2323
return DictReader(text_io, delimiter="|")
2424

2525

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
2+
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
3+
"9473084064"|"DORTHY"|"DICEMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0802"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473087667"|"SHAYE"|"PUHAR"|"20010903"|"0"|"NE39 2HB"|"20250730T18191400"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0803"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170431005"|"Administration of booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"MSBDJRRNHLWPFZQYDFGM"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684007"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
5+
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
6+
"9473089333"|"HELAH"|"JUDD"|"20010908"|"2"|"NE47 9HQ"|"20250730T13504900"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0805"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170433008"|"Administration of second dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"1"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"SNOJRTGYMZLPMGDNRBVD"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684009"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
7+
""|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0806"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
8+
"0"|"DORTHY"|"DIC�MBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0807"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
2+
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
3+
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
4+
"9473088733"|"TANESHA"|"SIEGELER"|"20010131"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"

0 commit comments

Comments
 (0)