Skip to content

Commit ac112fc

Browse files
committed
working WiP
1 parent b2959f3 commit ac112fc

File tree

6 files changed

+9932
-281
lines changed

6 files changed

+9932
-281
lines changed

recordprocessor/poetry.lock

Lines changed: 247 additions & 239 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

recordprocessor/src/batch_processor.py

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
from audit_table import update_audit_table_status
1111
from send_to_kinesis import send_to_kinesis
1212
from clients import logger
13-
from file_level_validation import file_level_validation
13+
from file_level_validation import file_level_validation, get_csv_content_dict_reader
1414
from errors import NoOperationPermissions, InvalidHeaders
1515

1616

17-
def process_csv_to_fhir(incoming_message_body: dict) -> None:
17+
def process_csv_to_fhir(incoming_message_body: dict, encoding="utf-8", start_row=0) -> None:
1818
"""
1919
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
2020
and documents the outcome for each row in the ack file.
2121
"""
2222
try:
23-
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
23+
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body,
24+
encoding=encoding)
2425
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
2526
# If the file is invalid, processing should cease immediately
2627
return
@@ -36,27 +37,65 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3637
target_disease = map_target_disease(vaccine)
3738

3839
row_count = 0
39-
for row in csv_reader:
40-
row_count += 1
41-
row_id = f"{file_id}^{row_count}"
42-
logger.info("MESSAGE ID : %s", row_id)
43-
44-
# Process the row to obtain the details needed for the message_body and ack file
45-
details_from_processing = process_row(target_disease, allowed_operations, row)
46-
47-
# Create the message body for sending
48-
outgoing_message_body = {
49-
"row_id": row_id,
50-
"file_key": file_key,
51-
"supplier": supplier,
52-
"vax_type": vaccine,
53-
"created_at_formatted_string": created_at_formatted_string,
54-
**details_from_processing,
55-
}
56-
57-
send_to_kinesis(supplier, outgoing_message_body, vaccine)
58-
59-
logger.info("Total rows processed: %s", row_count)
40+
try:
41+
for row in csv_reader:
42+
if row_count >= start_row:
43+
row_count += 1
44+
row_id = f"{file_id}^{row_count}"
45+
logger.info("MESSAGE ID : %s", row_id)
46+
# concat dict to string for logging
47+
# row_str = ", ".join(f"{v}" for k, v in row.items())
48+
# print(f"Processing row {row_count}: {row_str[:20]}")
49+
50+
details_from_processing = process_row(target_disease, allowed_operations, row)
51+
52+
# Create the message body for sending
53+
outgoing_message_body = {
54+
"row_id": row_id,
55+
"file_key": file_key,
56+
"supplier": supplier,
57+
"vax_type": vaccine,
58+
"created_at_formatted_string": created_at_formatted_string,
59+
**details_from_processing,
60+
}
61+
62+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
63+
64+
logger.info("Total rows processed: %s", row_count)
65+
except Exception as error: # pylint: disable=broad-exception-caught
66+
# encoder = "latin-1"
67+
encoder = "cp1252"
68+
print(f"Error processing: {error}.")
69+
print(f"Encode error at row {row_count} with {encoding}. Switch to {encoder}")
70+
# if we are here, re-read the file with correct encoding and ignore the processed rows
71+
# if error.args[0] == "'utf-8' codec can't decode byte 0xe9 in position 2996: invalid continuation byte":
72+
# cp1252
73+
new_reader = get_csv_content_dict_reader(file_key, encoding=encoder)
74+
start_row = row_count
75+
row_count = 0
76+
for row in new_reader:
77+
row_count += 1
78+
if row_count > start_row:
79+
row_id = f"{file_id}^{row_count}"
80+
logger.info("MESSAGE ID : %s", row_id)
81+
original_representation = ", ".join(f"{v}" for k, v in row.items())
82+
if original_representation[:20] == "9473089333, DORTHY, ":
83+
print(f"Processing row {row_count}: {original_representation[:40]}")
84+
85+
details_from_processing = process_row(target_disease, allowed_operations, row)
86+
87+
outgoing_message_body = {
88+
"row_id": row_id,
89+
"file_key": file_key,
90+
"supplier": supplier,
91+
"vax_type": vaccine,
92+
"created_at_formatted_string": created_at_formatted_string,
93+
**details_from_processing,
94+
}
95+
96+
send_to_kinesis(supplier, outgoing_message_body, vaccine)
97+
98+
logger.info("Total rows processed: %s", row_count)
6099

61100
update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
62101

@@ -66,6 +105,7 @@ def main(event: str) -> None:
66105
logger.info("task started")
67106
start = time.time()
68107
try:
108+
# SAW - error thrown here when invalid character using windows-1252
69109
process_csv_to_fhir(incoming_message_body=json.loads(event))
70110
except Exception as error: # pylint: disable=broad-exception-caught
71111
logger.error("Error processing message: %s", error)

recordprocessor/src/file_level_validation.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
6161

6262

6363
@file_level_validation_logging_decorator
64-
def file_level_validation(incoming_message_body: dict) -> dict:
64+
def file_level_validation(incoming_message_body: dict, encoding="utf-8") -> dict:
6565
"""
6666
Validates that the csv headers are correct and that the supplier has permission to perform at least one of
6767
the requested operations. Uploads the inf ack file and moves the source file to the processing folder.
@@ -70,22 +70,16 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7070
to reflect the file has been processed and the filename lambda is invoked with the next file in the queue.
7171
"""
7272
try:
73-
logger.info("SAW> file_level_validation...1")
7473
message_id = incoming_message_body.get("message_id")
75-
logger.info("SAW> file_level_validation...2")
7674
vaccine = incoming_message_body.get("vaccine_type").upper()
77-
logger.info("SAW> file_level_validation...3")
7875
supplier = incoming_message_body.get("supplier").upper()
79-
logger.info("SAW> file_level_validation...4")
8076
file_key = incoming_message_body.get("filename")
81-
logger.info("SAW> file_level_validation...5")
8277
permission = incoming_message_body.get("permission")
83-
logger.info("SAW> file_level_validation...6")
8478
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
85-
logger.info("SAW> file_level_validation...7")
8679

8780
# Fetch the data
88-
csv_reader = get_csv_content_dict_reader(file_key)
81+
# SAW Conversion ERROR here
82+
csv_reader = get_csv_content_dict_reader(file_key, encoding)
8983
logger.info("SAW> file_level_validation...8")
9084

9185
validate_content_headers(csv_reader)

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
from csv import DictReader
55
from io import TextIOWrapper
6-
from clients import s3_client, logger
6+
from clients import s3_client
77

88

99
def get_environment() -> str:
@@ -13,20 +13,26 @@ def get_environment() -> str:
1313
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1414

1515

16-
def get_csv_content_dict_reader(file_key: str) -> DictReader:
16+
def get_csv_content_dict_reader(file_key: str, encoding="utf-8") -> DictReader:
1717
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
18-
logger.info("SAW> get_csv_content_dict_reader..1")
19-
logger.info("SAW> Fetch file: %s", file_key)
20-
logger.info("SAW> SOURCE_BUCKET_NAME: %s", os.getenv("SOURCE_BUCKET_NAME"))
2118
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
22-
logger.info("SAW> get_csv_content_dict_reader..2")
2319
binary_io = response["Body"]
24-
logger.info("SAW> get_csv_content_dict_reader..3")
25-
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
26-
logger.info("SAW> get_csv_content_dict_reader..4")
20+
text_io = TextIOWrapper(binary_io, encoding=encoding, newline="")
2721
return DictReader(text_io, delimiter="|")
2822

2923

3024
def create_diagnostics_dictionary(error_type, status_code, error_message) -> dict:
3125
"""Returns a dictionary containing the error_type, statusCode, and error_message"""
3226
return {"error_type": error_type, "statusCode": status_code, "error_message": error_message}
27+
28+
29+
def fix_cp1252_text(text: str, encoder: str, decoder: str) -> str:
30+
raw_bytes = text.encode(encoder, errors='replace') # Or 'latin-1' if needed
31+
corrected_text = raw_bytes.decode(decoder, errors='replace')
32+
return corrected_text
33+
34+
35+
def dict_decode(source_dict: dict, decoder: str) -> dict:
36+
"""Decode all string values in a dictionary from one encoding to another."""
37+
38+
return {k: v.decode(decoder) if isinstance(v, bytes) else v for k, v in source_dict.items()}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import unittest
2+
import os
3+
from io import BytesIO
4+
from unittest.mock import patch
5+
from utils_for_recordprocessor import dict_decode
6+
7+
with patch("logging_decorator.file_level_validation_logging_decorator", lambda f: f):
8+
# from file_level_validation import file_level_validation
9+
from batch_processor import process_csv_to_fhir
10+
11+
12+
class TestProcessCsvToFhir(unittest.TestCase):
13+
14+
def setUp(self):
15+
self.logger_info_patcher = patch("logging.Logger.info")
16+
self.mock_logger_info = self.logger_info_patcher.start()
17+
self.update_audit_table_status_patcher = patch("batch_processor.update_audit_table_status")
18+
self.mock_update_audit_table_status = self.update_audit_table_status_patcher.start()
19+
self.send_to_kinesis_patcher = patch("batch_processor.send_to_kinesis")
20+
self.mock_send_to_kinesis = self.send_to_kinesis_patcher.start()
21+
self.map_target_disease_patcher = patch("batch_processor.map_target_disease")
22+
self.mock_map_target_disease = self.map_target_disease_patcher.start()
23+
self.s3_get_object_patcher = patch("utils_for_recordprocessor.s3_client.get_object")
24+
self.mock_s3_get_object = self.s3_get_object_patcher.start()
25+
self.make_and_move_patcher = patch("file_level_validation.make_and_upload_ack_file")
26+
self.mock_make_and_move = self.make_and_move_patcher.start()
27+
self.make_and_move_patcher = patch("file_level_validation.move_file")
28+
self.mock_move_file = self.make_and_move_patcher.start()
29+
# get_permitted_operations
30+
self.get_permitted_operations_patcher = patch("file_level_validation.get_permitted_operations")
31+
self.mock_get_permitted_operations = self.get_permitted_operations_patcher.start()
32+
33+
# self.validate_content_headers_patcher = patch("file_level_validation.validate_content_headers")
34+
# self.mock_validate_content_headers = self.validate_content_headers_patcher.start()
35+
36+
def tearDown(self):
37+
patch.stopall()
38+
39+
def test_process_csv_to_fhir_success(self):
40+
# Setup mocks
41+
print("test_process_csv_to_fhir_success")
42+
try:
43+
test_csv_path = os.path.join(
44+
os.path.dirname(__file__), "test_data", "windows-1252-accented-e.csv"
45+
)
46+
with open(test_csv_path, "rb") as f:
47+
data = f.readlines()
48+
49+
# insert source_text into last row of cp1252_bytes
50+
for i in reversed(range(len(data))):
51+
line = data[i]
52+
# Split fields by pipe
53+
fields = line.strip().split(b"|")
54+
print(f"replace field: {fields[2]}")
55+
fields[2] = b'D\xe9cembre'
56+
print(f"replaced field: {fields[2]}")
57+
58+
# Reconstruct the line
59+
data[i] = b"|".join(fields) + b"\n"
60+
break
61+
62+
# manually add
63+
64+
# Read CSV from test_csv_path as utf-8
65+
ret1 = {"Body": BytesIO(b"".join(data))}
66+
ret2 = {"Body": BytesIO(b"".join(data))}
67+
self.mock_s3_get_object.side_effect = [ret1, ret2]
68+
self.mock_map_target_disease.return_value = "RSV"
69+
70+
message_body = {
71+
"message_id": "file123",
72+
"vaccine_type": "covid",
73+
"supplier": "test-supplier",
74+
"filename": "file-key-1",
75+
"permission": ["COVID.R", "COVID.U", "COVID.D"],
76+
"allowed_operations": ["CREATE", "UPDATE", "DELETE"],
77+
"created_at_formatted_string": "2024-09-05T12:00:00Z"
78+
# "csv_dict_reader": csv_rows
79+
}
80+
# self.mock_file_level_validation.return_value = message_body
81+
self.mock_get_permitted_operations.return_value = {"CREATE", "UPDATE", "DELETE"}
82+
83+
self.mock_map_target_disease.return_value = "RSV"
84+
85+
process_csv_to_fhir(message_body)
86+
except Exception as e:
87+
print(f"Exception during test: {e}")
88+
89+
def test_fix_cp1252(self):
90+
# create a cp1252 string that contains an accented E
91+
source_text = b'D\xe9cembre'
92+
test_dict = {
93+
"date": source_text,
94+
"name": "Test Name"}
95+
utf8_dict = dict_decode(test_dict, "cp1252")
96+
self.assertEqual(utf8_dict["date"], "Décembre")
97+
self.assertEqual(utf8_dict["name"], "Test Name")

0 commit comments

Comments
 (0)