Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
763 changes: 325 additions & 438 deletions recordprocessor/poetry.lock

Large diffs are not rendered by default.

46 changes: 23 additions & 23 deletions recordprocessor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ readme = "README.md"
packages = [{include = "src"}]

[tool.poetry.dependencies]
python = "~3.11"
"fhir.resources" = "~7.0.2"
boto3 = "~1.38.42"
boto3-stubs-lite = {extras = ["dynamodb"], version = "~1.38.42"}
aws-lambda-typing = "~2.20.0"
moto = "^4"
requests = "~2.32.4"
responses = "~0.25.7"
pydantic = "~1.10.13"
pyjwt = "~2.10.1"
cryptography = "~42.0.4"
cffi = "~1.17.1"
jsonpath-ng = "^1.6.0"
simplejson = "^3.20.1"
structlog = "^24.1.0"
pandas = "^2.3.0"
freezegun = "^1.5.2"
coverage = "^7.9.1"
redis = "^6.2.0"
numpy = "~2.2.6"
python = "~3.11"
"fhir.resources" = "~7.0.2"
boto3 = "~1.38.42"
boto3-stubs-lite = { extras = ["dynamodb"], version = "~1.38.42" }
aws-lambda-typing = "~2.20.0"
requests = "~2.32.4"
responses = "~0.25.7"
pydantic = "~1.10.13"
pyjwt = "~2.10.1"
cryptography = "~42.0.4"
cffi = "~1.17.1"
jsonpath-ng = "^1.6.0"
simplejson = "^3.20.1"
structlog = "^24.1.0"
redis = "^5.1.1"
coverage = "^7.9.1"
freezegun = "^1.5.2"
fakeredis = "^2.30.1"

[build-system]
requires = ["poetry-core ~= 1.5.0"]
[tool.poetry.group.dev.dependencies]
moto = {extras = ["s3"], version = "^5.1.12"}

build-backend = "poetry.core.masonry.api"
[build-system]
requires = ["poetry-core >= 1.5.0"]
build-backend = "poetry.core.masonry.api"
98 changes: 75 additions & 23 deletions recordprocessor/src/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@
from send_to_kinesis import send_to_kinesis
from clients import logger
from file_level_validation import file_level_validation
from errors import NoOperationPermissions, InvalidHeaders
from errors import NoOperationPermissions, InvalidHeaders, InvalidEncoding
from utils_for_recordprocessor import get_csv_content_dict_reader
from typing import Optional


def process_csv_to_fhir(incoming_message_body: dict) -> None:
def process_csv_to_fhir(incoming_message_body: dict) -> int:
"""
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
and documents the outcome for each row in the ack file.
Returns the number of rows processed. While this is not used by the handler, the number of rows
processed must be correct and therefore is returned for logging and test purposes.
"""
encoder = "utf-8" # default encoding
try:
# and encoder to the incoming message body
incoming_message_body["encoder"] = encoder
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
except (InvalidHeaders, NoOperationPermissions, Exception) as e: # pylint: disable=broad-exception-caught
logger.error(f"File level validation failed: {e}")
# If the file is invalid, processing should cease immediately
return
return 0

file_id = interim_message_body.get("message_id")
vaccine = interim_message_body.get("vaccine")
Expand All @@ -31,40 +39,84 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
csv_reader = interim_message_body.get("csv_dict_reader")

target_disease = map_target_disease(vaccine)

row_count = 0
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
logger.info(f"process with encoder {encoder} from row {row_count+1}")
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
created_at_formatted_string, csv_reader, target_disease)

# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(target_disease, allowed_operations, row)
if err:
logger.warning(f"Processing Error: {err}.")
if isinstance(err, InvalidEncoding):
new_encoder = "cp1252"
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
encoder = new_encoder
# load alternative encoder
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
# re-read the file and skip processed rows
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
created_at_formatted_string, csv_reader, target_disease, row_count)
else:
logger.error(f"Row Processing error: {err}")
raise err

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}
return row_count

send_to_kinesis(supplier, outgoing_message_body, vaccine)

logger.info("Total rows processed: %s", row_count)
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
csv_reader, target_disease,
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
"""
Processes each row in the csv_reader starting from start_row.
"""
row_count = 0
start_row = total_rows_processed_count
try:
for row in csv_reader:
row_count += 1
if row_count > start_row:
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
# Log progress every 1000 rows and the first 10 rows after a restart
if (total_rows_processed_count % 1000 == 0):
logger.info(f"Process: {total_rows_processed_count+1}")
if (start_row > 0 and row_count <= start_row+10):
logger.info(f"Restarted Process (log up to first 10): {total_rows_processed_count+1}")

# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(target_disease, allowed_operations, row)
# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}
send_to_kinesis(supplier, outgoing_message_body, vaccine)
total_rows_processed_count += 1
logger.info("Total rows processed: %s", total_rows_processed_count)
except Exception as error: # pylint: disable=broad-exception-caught
# if error reason is 'invalid continuation byte', then it's a decode error
logger.error("Error processing row %s: %s", row_count, error)
if hasattr(error, 'reason') and error.reason == "invalid continuation byte":
return total_rows_processed_count, InvalidEncoding("Invalid continuation byte")
else:
raise error
return total_rows_processed_count, None


def main(event: str) -> None:
"""Process each row of the file"""
logger.info("task started")
start = time.time()
n_rows_processed = 0
try:
process_csv_to_fhir(incoming_message_body=json.loads(event))
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))
except Exception as error: # pylint: disable=broad-exception-caught
logger.error("Error processing message: %s", error)
end = time.time()
logger.info("Total rows processed: %s", n_rows_processed)
logger.info("Total time for completion: %ss", round(end - start, 5))


Expand Down
4 changes: 4 additions & 0 deletions recordprocessor/src/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ class InvalidHeaders(Exception):
"""A custom exception for when the file headers are invalid."""


class InvalidEncoding(Exception):
"""A custom exception for when the file encoding is invalid."""


class UnhandledAuditTableError(Exception):
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""
22 changes: 17 additions & 5 deletions recordprocessor/src/file_level_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from clients import logger, s3_client
from make_and_upload_ack_file import make_and_upload_ack_file
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
from errors import InvalidHeaders, NoOperationPermissions
from errors import InvalidHeaders, NoOperationPermissions, InvalidEncoding
from logging_decorator import file_level_validation_logging_decorator
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, Permission
Expand Down Expand Up @@ -76,11 +76,20 @@ def file_level_validation(incoming_message_body: dict) -> dict:
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
encoder = incoming_message_body.get("encoder", "utf-8")

# Fetch the data
csv_reader = get_csv_content_dict_reader(file_key)

validate_content_headers(csv_reader)
try:
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
validate_content_headers(csv_reader)
except Exception as e:
if hasattr(e, 'reason') and e.reason == "invalid continuation byte" and encoder == "utf-8":
logger.warning("Invalid Encoding detected: %s", e)
# retry with cp1252 encoding
csv_reader = get_csv_content_dict_reader(file_key, encoder="cp1252")
validate_content_headers(csv_reader)
else:
raise

# Validate has permission to perform at least one of the requested actions
allowed_operations_set = get_permitted_operations(supplier, vaccine, permission)
Expand All @@ -98,8 +107,11 @@ def file_level_validation(incoming_message_body: dict) -> dict:
"created_at_formatted_string": created_at_formatted_string,
"csv_dict_reader": csv_reader,
}

except (InvalidHeaders, NoOperationPermissions, Exception) as error:
reason = getattr(error, 'reason', None)
if reason is not None:
if reason == "invalid continuation byte" and encoder == "utf-8":
raise InvalidEncoding(f"Error File encoding {encoder} is invalid.")
logger.error("Error in file_level_validation: %s", error)

# NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned
Expand Down
4 changes: 2 additions & 2 deletions recordprocessor/src/utils_for_recordprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ def get_environment() -> str:
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"


def get_csv_content_dict_reader(file_key: str) -> DictReader:
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
binary_io = response["Body"]
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
return DictReader(text_io, delimiter="|")


Expand Down
8 changes: 8 additions & 0 deletions recordprocessor/tests/test_data/test-batch-data-cp1252.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
NHS_NUMBER|PERSON_FORENAME|PERSON_SURNAME|PERSON_DOB|PERSON_GENDER_CODE|PERSON_POSTCODE|DATE_AND_TIME|SITE_CODE|SITE_CODE_TYPE_URI|UNIQUE_ID|UNIQUE_ID_URI|ACTION_FLAG|PERFORMING_PROFESSIONAL_FORENAME|PERFORMING_PROFESSIONAL_SURNAME|RECORDED_DATE|PRIMARY_SOURCE|VACCINATION_PROCEDURE_CODE|VACCINATION_PROCEDURE_TERM|DOSE_SEQUENCE|VACCINE_PRODUCT_CODE|VACCINE_PRODUCT_TERM|VACCINE_MANUFACTURER|BATCH_NUMBER|EXPIRY_DATE|SITE_OF_VACCINATION_CODE|SITE_OF_VACCINATION_TERM|ROUTE_OF_VACCINATION_CODE|ROUTE_OF_VACCINATION_TERM|DOSE_AMOUNT|DOSE_UNIT_CODE|DOSE_UNIT_TERM|INDICATION_CODE|LOCATION_CODE|LOCATION_CODE_TYPE_URI
"9473081898"|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0801"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
"9473084064"|"DORTHY"|"DICEMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0802"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
"9473087667"|"SHAYE"|"PUHAR"|"20010903"|"0"|"NE39 2HB"|"20250730T18191400"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0803"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170431005"|"Administration of booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"MSBDJRRNHLWPFZQYDFGM"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684007"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
"9473088736"|"TANESHA"|"SIEGELER"|"20010130"|"2"|"NE46 1EA"|"20250730T14452700"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0804"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170432003"|"Administration of pre-school booster dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens"|"2"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"YRJNNROADZHBJHBNCHNG"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684008"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
"9473089333"|"HELAH"|"JUDD"|"20010908"|"2"|"NE47 9HQ"|"20250730T13504900"|"B23002"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0805"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"170433008"|"Administration of second dose of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"1"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"SNOJRTGYMZLPMGDNRBVD"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684009"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
""|"DELYTH"|"PRESCOT"|"20030731"|"2"|"NE23 1NW"|"20250730T11525000"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0806"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"308081000000105"|"Measles mumps and rubella vaccination - first dose (procedure)"|"1"|"34925111000001104"|"Priorix vaccine powder and solvent for solution for injection 0.5ml pre-filled syringes (GlaxoSmithKline UK Ltd)"|"GlaxoSmithKline UK Ltd"|"NIQUTMBYAHKSGIERGSCB"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684005"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
"0"|"DORTHY"|"DICÉMBRE"|"20020403"|"2"|"NE29 6BA"|"20250730T11304900"|"RJ1"|"https://fhir.nhs.uk/Id/ods-organization-code"|"MMR_CDFDPS-1330325--27-0807"|"HTTPS://ravsTest-123"|"new"|""|""|"20250730"|"FALSE"|"38598009"|"Administration of vaccine product containing only Measles morbillivirus and Mumps orthorubulavirus and Rubella virus antigens (procedure)"|"3"|"13968211000001108"|"M-M-RVAXPRO vaccine powder and solvent for suspension for injection 0.5ml pre-filled syringes (Merck Sharp & Dohme (UK) Ltd)"|"Merck Sharp & Dohme (UK) Ltd"|"HJIXUMURNAOVKRKIECZH"|"20230519"|"368209003"|"Right arm"|"78421000"|"Intramuscular"|"0.5"|"3318611000001103"|"Pre-filled disposable injection"|"443684006"|"X99999"|"https://fhir.nhs.uk/Id/ods-organization-code"
Loading
Loading