Skip to content

Commit bea32b8

Browse files
committed
Needs firehost count test
1 parent 857ea87 commit bea32b8

File tree

5 files changed

+128
-93
lines changed

5 files changed

+128
-93
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from mappings import map_target_disease
88
from send_to_kinesis import send_to_kinesis
99
from clients import logger
10-
from file_level_validation import file_level_validation
11-
from errors import NoOperationPermissions, InvalidHeaders
10+
from file_level_validation import file_level_validation, validate_content_headers
11+
from errors import NoOperationPermissions, InvalidHeaders, InvalidEncoding
1212
from utils_for_recordprocessor import get_csv_content_dict_reader
1313

1414

@@ -17,11 +17,21 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
1717
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
1818
and documents the outcome for each row in the ack file.
1919
"""
20+
encoder = "utf-8" # default encoding
2021
try:
21-
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
22+
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body, encoder=encoder)
23+
except InvalidEncoding as error:
24+
logger.warning("Invalid Encoding detected in process_csv_to_fhir: %s", error)
25+
# retry with cp1252 encoding
26+
encoder = "cp1252"
27+
try:
28+
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body, encoder=encoder)
29+
except Exception as error:
30+
logger.error(f"Error in file_level_validation with {encoder} encoding: %s", error)
31+
return 0
2232
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
2333
# If the file is invalid, processing should cease immediately
24-
return
34+
return 0
2535

2636
file_id = interim_message_body.get("message_id")
2737
vaccine = interim_message_body.get("vaccine")
@@ -32,29 +42,28 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3242
csv_reader = interim_message_body.get("csv_dict_reader")
3343

3444
target_disease = map_target_disease(vaccine)
35-
print("process csv to fhir")
3645
row_count = 0
37-
encoder = "utf-8" # default encoding
38-
try:
39-
row_count = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
40-
created_at_formatted_string, csv_reader, target_disease)
41-
except Exception as error: # pylint: disable=broad-exception-caught
42-
new_encoder = "cp1252"
43-
print(f"Error processing: {error}.")
46+
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
47+
created_at_formatted_string, csv_reader, target_disease)
48+
if err:
49+
print(f"Error processing: {err}.")
4450
# check if it's a decode error, ie error.args[0] begins with "'utf-8' codec can't decode byte"
45-
if error.reason == "invalid continuation byte":
51+
if err.reason == "invalid continuation byte":
52+
new_encoder = "cp1252"
4653
print(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
4754
# print(f"Detected decode error: {error.reason}")
4855
encoder = new_encoder
4956
# if we are here, re-read the file with alternative encoding and skip processed rows
50-
row_count = process_rows_retry(file_id, vaccine, supplier, file_key,
51-
allowed_operations, created_at_formatted_string,
52-
encoder, row_count)
57+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
58+
validate_content_headers(csv_reader)
59+
row_count = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
60+
created_at_formatted_string, csv_reader, target_disease, row_count)
5361
else:
54-
logger.error(f"Non-decode error: {error}. Cannot retry. Call someone.")
55-
raise error from error
62+
logger.error(f"Non-decode error: {err}. Cannot retry. Call someone.")
63+
raise err
5664

5765
logger.info("Total rows processed: %s", row_count)
66+
return row_count
5867

5968

6069
def process_rows_retry(file_id, vaccine, supplier, file_key, allowed_operations,
@@ -81,35 +90,39 @@ def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, creat
8190
print("process_rows...")
8291
row_count = 0
8392
start_row = total_rows_processed_count
84-
for row in csv_reader:
85-
86-
row_count += 1
87-
if row_count > start_row:
88-
row_id = f"{file_id}^{row_count}"
89-
logger.info("MESSAGE ID : %s", row_id)
90-
91-
# convert dict to string and print first 20 chars
92-
if (total_rows_processed_count % 1000 == 0):
93-
print(f"Process: {total_rows_processed_count}")
94-
if (total_rows_processed_count > 19995):
95-
print(f"Process: {total_rows_processed_count} - {row['PERSON_SURNAME']}")
96-
97-
# Process the row to obtain the details needed for the message_body and ack file
98-
details_from_processing = process_row(target_disease, allowed_operations, row)
99-
100-
# Create the message body for sending
101-
outgoing_message_body = {
102-
"row_id": row_id,
103-
"file_key": file_key,
104-
"supplier": supplier,
105-
"vax_type": vaccine,
106-
"created_at_formatted_string": created_at_formatted_string,
107-
**details_from_processing,
108-
}
109-
110-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
111-
total_rows_processed_count += 1
112-
logger.info("Total rows processed: %s", total_rows_processed_count)
93+
try:
94+
for row in csv_reader:
95+
96+
row_count += 1
97+
if row_count > start_row:
98+
row_id = f"{file_id}^{row_count}"
99+
logger.info("MESSAGE ID : %s", row_id)
100+
101+
# convert dict to string and print first 20 chars
102+
if (total_rows_processed_count % 1000 == 0):
103+
print(f"Process: {total_rows_processed_count}")
104+
if (total_rows_processed_count > 19995):
105+
print(f"Process: {total_rows_processed_count} - {row['PERSON_SURNAME']}")
106+
107+
# Process the row to obtain the details needed for the message_body and ack file
108+
details_from_processing = process_row(target_disease, allowed_operations, row)
109+
110+
# Create the message body for sending
111+
outgoing_message_body = {
112+
"row_id": row_id,
113+
"file_key": file_key,
114+
"supplier": supplier,
115+
"vax_type": vaccine,
116+
"created_at_formatted_string": created_at_formatted_string,
117+
**details_from_processing,
118+
}
119+
120+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
121+
total_rows_processed_count += 1
122+
logger.info("Total rows processed: %s", total_rows_processed_count)
123+
except Exception as error: # pylint: disable=broad-exception-caught
124+
logger.error("Error processing row %s: %s", row_count, error)
125+
return total_rows_processed_count, error
113126
return total_rows_processed_count
114127

115128

recordprocessor/src/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,9 @@ class InvalidHeaders(Exception):
99
"""A custom exception for when the file headers are invalid."""
1010

1111

12+
class InvalidEncoding(Exception):
13+
"""A custom exception for when the file encoding is invalid."""
14+
15+
1216
class UnhandledAuditTableError(Exception):
1317
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""

recordprocessor/src/file_level_validation.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from clients import logger, s3_client
66
from make_and_upload_ack_file import make_and_upload_ack_file
77
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
8-
from errors import InvalidHeaders, NoOperationPermissions
8+
from errors import InvalidHeaders, NoOperationPermissions, InvalidEncoding
99
from logging_decorator import file_level_validation_logging_decorator
1010
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
1111
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, Permission
@@ -61,7 +61,7 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
6161

6262

6363
@file_level_validation_logging_decorator
64-
def file_level_validation(incoming_message_body: dict) -> dict:
64+
def file_level_validation(incoming_message_body: dict, encoder: str) -> dict:
6565
"""
6666
Validates that the csv headers are correct and that the supplier has permission to perform at least one of
6767
the requested operations. Uploades the inf ack file and moves the source file to the processing folder.
@@ -78,7 +78,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7878
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
7979

8080
# Fetch the data
81-
csv_reader = get_csv_content_dict_reader(file_key)
81+
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
8282

8383
validate_content_headers(csv_reader)
8484

@@ -98,8 +98,10 @@ def file_level_validation(incoming_message_body: dict) -> dict:
9898
"created_at_formatted_string": created_at_formatted_string,
9999
"csv_dict_reader": csv_reader,
100100
}
101-
102101
except (InvalidHeaders, NoOperationPermissions, Exception) as error:
102+
if error.reason == "invalid continuation byte" and encoder == "utf-8":
103+
# propagate the error to trigger a retry with cp1252 encoding
104+
raise InvalidEncoding(f"Error File encoding {encoder} is invalid.")
103105
logger.error("Error in file_level_validation: %s", error)
104106

105107
# NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned

recordprocessor/tests/test_batch_processor.py

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,26 @@
22
import os
33
from io import BytesIO
44
from unittest.mock import patch
5-
# from utils_for_recordprocessor import dict_decode
65

76
with patch("logging_decorator.file_level_validation_logging_decorator", lambda f: f):
8-
# from file_level_validation import file_level_validation
97
from batch_processor import process_csv_to_fhir
108

119

10+
def create_patch(target: str):
11+
patcher = patch(target)
12+
return patcher.start()
13+
14+
1215
class TestProcessCsvToFhir(unittest.TestCase):
1316

1417
def setUp(self):
15-
self.logger_info_patcher = patch("logging.Logger.info")
16-
self.mock_logger_info = self.logger_info_patcher.start()
17-
# self.update_audit_table_status_patcher = patch("batch_processor.update_audit_table_status")
18-
# self.mock_update_audit_table_status = self.update_audit_table_status_patcher.start()
19-
self.send_to_kinesis_patcher = patch("batch_processor.send_to_kinesis")
20-
self.mock_send_to_kinesis = self.send_to_kinesis_patcher.start()
21-
self.map_target_disease_patcher = patch("batch_processor.map_target_disease")
22-
self.mock_map_target_disease = self.map_target_disease_patcher.start()
23-
self.s3_get_object_patcher = patch("utils_for_recordprocessor.s3_client.get_object")
24-
self.mock_s3_get_object = self.s3_get_object_patcher.start()
25-
self.make_and_move_patcher = patch("file_level_validation.make_and_upload_ack_file")
26-
self.mock_make_and_move = self.make_and_move_patcher.start()
27-
self.make_and_move_patcher = patch("file_level_validation.move_file")
28-
self.mock_move_file = self.make_and_move_patcher.start()
29-
# get_permitted_operations
30-
self.get_permitted_operations_patcher = patch("file_level_validation.get_permitted_operations")
31-
self.mock_get_permitted_operations = self.get_permitted_operations_patcher.start()
32-
33-
# self.validate_content_headers_patcher = patch("file_level_validation.validate_content_headers")
34-
# self.mock_validate_content_headers = self.validate_content_headers_patcher.start()
18+
self.mock_logger_info = create_patch("logging.Logger.info")
19+
self.mock_send_to_kinesis = create_patch("batch_processor.send_to_kinesis")
20+
self.mock_map_target_disease = create_patch("batch_processor.map_target_disease")
21+
self.mock_s3_get_object = create_patch("utils_for_recordprocessor.s3_client.get_object")
22+
self.mock_make_and_move = create_patch("file_level_validation.make_and_upload_ack_file")
23+
self.mock_move_file = create_patch("file_level_validation.move_file")
24+
self.mock_get_permitted_operations = create_patch("file_level_validation.get_permitted_operations")
3525

3626
def tearDown(self):
3727
patch.stopall()
@@ -64,24 +54,48 @@ def insert_cp1252_at_end(self, data: list[bytes], new_text: bytes, field: int) -
6454
print(f"replace field: {fields[field]}")
6555
fields[field] = new_text
6656
print(f"replaced field: {fields[field]}")
67-
6857
# Reconstruct the line
6958
data[i] = b"|".join(fields) + b"\n"
7059
break
7160
return data
7261

73-
def test_process_csv_to_fhir_success(self):
74-
# Setup mocks
75-
print("test_process_csv_to_fhir_success")
62+
def test_process_large_file_with_cp1252(self):
63+
""" Test processing a large file with cp1252 encoding """
7664
try:
65+
n_rows = 20000
7766
data = self.create_test_data_from_file("test-batch-data.csv")
78-
data = self.expand_test_data(data, 20_000)
67+
data = self.expand_test_data(data, n_rows)
7968
data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
69+
self.mock_s3_get_object.side_effect = [{"Body": BytesIO(b"".join(data))},
70+
{"Body": BytesIO(b"".join(data))}]
71+
self.mock_map_target_disease.return_value = "RSV"
72+
73+
message_body = {
74+
"message_id": "file123",
75+
"vaccine_type": "covid",
76+
"supplier": "test-supplier",
77+
"filename": "file-key-1",
78+
"permission": ["COVID.R", "COVID.U", "COVID.D"],
79+
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
80+
"created_at_formatted_string": "2024-09-05T12:00:00Z"
81+
}
82+
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
8083

81-
# Read CSV from test_csv_path as utf-8
82-
ret1 = {"Body": BytesIO(b"".join(data))}
83-
ret2 = {"Body": BytesIO(b"".join(data))}
84-
self.mock_s3_get_object.side_effect = [ret1, ret2]
84+
self.mock_map_target_disease.return_value = "RSV"
85+
86+
n_rows_processed = process_csv_to_fhir(message_body)
87+
self.assertEqual(n_rows_processed, n_rows)
88+
except Exception as e:
89+
print(f"Exception during test: {e}")
90+
91+
def test_process_small_file_with_cp1252(self):
92+
""" Test processing a small file with cp1252 encoding """
93+
try:
94+
data = self.create_test_data_from_file("test-batch-data-cp1252.csv")
95+
n_rows = len(data) - 1 # Exclude header
96+
# data = self.insert_cp1252_at_end(data, b'D\xe9cembre', 2)
97+
self.mock_s3_get_object.side_effect = [{"Body": BytesIO(b"".join(data))},
98+
{"Body": BytesIO(b"".join(data))}]
8599
self.mock_map_target_disease.return_value = "RSV"
86100

87101
message_body = {
@@ -93,26 +107,18 @@ def test_process_csv_to_fhir_success(self):
93107
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
94108
"created_at_formatted_string": "2024-09-05T12:00:00Z"
95109
}
96-
# self.mock_file_level_validation.return_value = message_body
97110
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
98111

99112
self.mock_map_target_disease.return_value = "RSV"
100113

101-
process_csv_to_fhir(message_body)
114+
n_rows_processed = process_csv_to_fhir(message_body)
115+
self.assertEqual(n_rows_processed, n_rows)
102116
except Exception as e:
103117
print(f"Exception during test: {e}")
104118

105119
def test_fix_cp1252(self):
106120
# create a cp1252 string that contains an accented E
107-
source_text = b'D\xe9cembre'
108-
test_dict = {
109-
"date": source_text,
110-
"name": "Test Name"}
111-
utf8_dict = dict_decode(test_dict, "cp1252")
112-
self.assertEqual(utf8_dict["date"], "Décembre")
113-
self.assertEqual(utf8_dict["name"], "Test Name")
114-
115-
def dict_decode(self):
121+
# this is a unit test as such but checks our assumptions about encoding
116122
source_text = b'D\xe9cembre'
117123
test_dict = {
118124
"date": source_text,
@@ -133,3 +139,5 @@ def dict_decode(input_dict: dict, encoding: str) -> dict:
133139
else:
134140
decoded_dict[key] = value
135141
return decoded_dict
142+
143+
# @TODO TEST to check correct number of messages sent to firehose with encode error

0 commit comments

Comments
 (0)