Skip to content

Commit be40a91

Browse files
committed
debug
1 parent b04882f commit be40a91

File tree

5 files changed

+156
-66
lines changed

5 files changed

+156
-66
lines changed

batch_processor_filter/src/batch_audit_repository.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,19 @@ def is_event_processing_or_failed_for_supplier_and_vacc_type(self, supplier: str
4242
return False
4343

4444
def update_status(self, message_id: str, updated_status: str) -> None:
45-
self._batch_audit_table.update_item(
46-
Key={AuditTableKeys.MESSAGE_ID: message_id},
47-
UpdateExpression="SET #status = :status",
48-
ExpressionAttributeNames={"#status": "status"},
49-
ExpressionAttributeValues={":status": updated_status},
50-
ConditionExpression="attribute_exists(message_id)"
51-
)
45+
try:
46+
print(f"Updating status for message_id: {message_id} to {updated_status}")
47+
print(f"Table name: {AUDIT_TABLE_NAME}")
48+
print(f"Region: {REGION_NAME}")
49+
print(f"Batch audit table: {self._batch_audit_table.table_name}")
50+
self._batch_audit_table.update_item(
51+
Key={AuditTableKeys.MESSAGE_ID: message_id},
52+
UpdateExpression="SET #status = :status",
53+
ExpressionAttributeNames={"#status": "status"},
54+
ExpressionAttributeValues={":status": updated_status},
55+
ConditionExpression="attribute_exists(message_id)"
56+
)
57+
print(f"Successfully updated status for message_id: {message_id} to {updated_status}")
58+
except Exception as e:
59+
print(f"Error updating status for message_id: {message_id} to {updated_status}: {e}")
60+
raise

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
3232
supplier = batch_file_created_event["supplier"]
3333
vaccine_type = batch_file_created_event["vaccine_type"]
3434

35+
# debug
36+
print("SAW ----------- DEBUG -------------")
37+
print(f"filename: {filename} ")
38+
print(f"message_id: {message_id} ")
39+
print(f"supplier: {supplier} ")
40+
print(f"vaccine_type: {vaccine_type} ")
41+
print("SAW ----------- END DEBUG -------------")
42+
3543
if self._is_duplicate_file(filename):
3644
# Mark as processed and return without error so next event will be picked up from queue
3745
logger.error("A duplicate file has already been processed. Filename: %s", filename)

e2e_batch/constants.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,51 @@
1818
PERMISSIONS_CONFIG_FILE_KEY = "permissions_config.json"
1919

2020

21-
def create_row(unique_id, dose_amount, action_flag, header, inject_char=None):
21+
class EventName():
22+
CREATE = "INSERT"
23+
UPDATE = "MODIFY"
24+
DELETE_LOGICAL = "MODIFY"
25+
DELETE_PHYSICAL = "REMOVE"
26+
27+
28+
class Operation():
29+
CREATE = "CREATE"
30+
UPDATE = "UPDATE"
31+
DELETE_LOGICAL = "DELETE"
32+
DELETE_PHYSICAL = "REMOVE"
33+
34+
35+
class ActionFlag():
36+
CREATE = "NEW"
37+
UPDATE = "UPDATE"
38+
DELETE_LOGICAL = "DELETE"
39+
40+
41+
class ActionSequence():
42+
def __init__(self, desc: str, actions: list[ActionFlag]):
43+
self.actions = actions
44+
self.description = desc
45+
46+
47+
class ActionSet():
48+
CREATE = ActionSequence("Create", [ActionFlag.CREATE])
49+
UPDATE = ActionSequence("Update", [ActionFlag.CREATE, ActionFlag.UPDATE])
50+
DELETE = ActionSequence("Delete", [ActionFlag.CREATE, ActionFlag.UPDATE, ActionFlag.DELETE_LOGICAL])
51+
REINSTATE = ActionSequence("Reinstate", [ActionFlag.CREATE, ActionFlag.DELETE_LOGICAL, ActionFlag.UPDATE])
52+
53+
54+
def get_operation(action_flag: ActionFlag) -> Operation:
55+
if action_flag == ActionFlag.CREATE:
56+
return Operation.CREATE
57+
elif action_flag == ActionFlag.UPDATE:
58+
return Operation.UPDATE
59+
elif action_flag == ActionFlag.DELETE_LOGICAL:
60+
return Operation.DELETE_LOGICAL
61+
else:
62+
raise ValueError(f"Unknown ActionFlag: {action_flag}")
63+
64+
65+
def create_row(unique_id, dose_amount, action_flag: ActionFlag, header, inject_char=None):
2266
"""Helper function to create a single row with the specified UNIQUE_ID and ACTION_FLAG."""
2367

2468
name = "James" if not inject_char else b'Jam\xe9s'

e2e_batch/test_e2e_batch.py

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
import time
33
from utils import (
44
upload_file_to_s3,
5-
# get_file_content_from_s3,
6-
# check_ack_file_content,
7-
# check_inf_file_content,
8-
# validate_row_count,
5+
get_file_content_from_s3,
6+
check_ack_file_content,
7+
check_inf_file_content,
8+
validate_row_count,
99
generate_csv_files,
1010
# OpMsgs,
1111
TestData,
12-
# DestinationType,
12+
DestinationType,
1313
# BusRowResult,
1414
cleanup
1515
)
@@ -19,8 +19,11 @@
1919
from constants import (
2020
SOURCE_BUCKET,
2121
INPUT_PREFIX,
22-
# ACK_BUCKET,
23-
environment
22+
ACK_BUCKET,
23+
environment,
24+
# ActionFlag,
25+
# Operation,
26+
ActionSet,
2427
)
2528

2629
NEW = "NEW"
@@ -32,15 +35,15 @@ class TestE2EBatch(unittest.TestCase):
3235

3336
def setUp(self):
3437
self.seed_datas = [
35-
TestData("Create", "V0V8L", ["CREATE"]),
36-
TestData("Update", "8HK48", ["CREATE", UPDATE]),
37-
# TestData("Delete", "8HA94", [NEW, UPDATE, DELETE]),
38+
TestData("V0V8L", ActionSet.CREATE),
39+
TestData("8HK48", ActionSet.UPDATE),
40+
TestData("8HA94", ActionSet.DELETE),
3841
# TestData("Reinstate", "X26", [NEW, DELETE, UPDATE]),
3942
# TestData("Update no Create", "YGM41", [UPDATE], expected=BusRowResult.FATAL_ERROR,
4043
# operation_outcome=OpMsgs.IMM_NOT_EXIST),
4144
# TestData("Delete no Create", "YGJ", [DELETE], expected=BusRowResult.FATAL_ERROR,
4245
# operation_outcome=OpMsgs.IMM_NOT_EXIST),
43-
# TestData("Create with extended ascii characters in name", "YGA", [NEW], inject_char=True),
46+
TestData("YGA", ActionSet.CREATE, inject_char=True, name="Create with 1252 char"),
4447
]
4548

4649
def tearDown(self):
@@ -55,41 +58,54 @@ def test_create_success(self):
5558

5659
tests: list[TestData] = generate_csv_files(self.seed_datas)
5760

58-
for test in tests:
59-
logger.info(f"Upload for Test: {test.name} ")
60-
key = upload_file_to_s3(test.file_name, SOURCE_BUCKET, INPUT_PREFIX)
61-
test.key = key
61+
send_files(tests)
6262

63-
logger.info(f"Uploaded all files. Time: {time.time() - start_time:.1f} seconds")
64-
logger.info("Waiting while processing...")
65-
# dictionary of file name to track whether inf and bus acks have been received
66-
start_time = time.time()
67-
# while there are still pending files, poll for acks and forwarded files
68-
pending = True
69-
while pending and (time.time() - start_time) < max_timeout:
70-
pending = False
71-
for test in tests:
72-
pending = test.poll_destination(pending)
73-
if pending:
74-
print(".", end="")
75-
time.sleep(5)
76-
77-
logger.info(f"Files Processed. Time: {time.time() - start_time:.1f} seconds")
78-
79-
# Now validate all files have been processed correctly
80-
for test in tests:
81-
logger.info(f"Validation for Test: {test.name} ")
82-
# Validate the ACK file
83-
# inf_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.INF])
63+
poll_for_responses(tests, max_timeout)
64+
65+
validate_responses(tests)
8466

85-
# check_ack_file_content(test.name, inf_ack_content, "Success", None, test.actions)
86-
# validate_row_count(f"{test.name} - inf", test.file_name, test.ack_keys[DestinationType.BUS])
87-
# check row after header
88-
# bus_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.BUS])
89-
# loop through each line in the bus ack content
67+
logger.info(f"Tests Completed. Time: {time.time() - start_time:.1f} seconds")
9068

91-
# sometimes OK and sometimes CREATE
92-
# check_inf_file_content(f"{test.name} - bus", bus_ack_content, "OK", test.operation_outcome,
93-
# test.actions)
9469

95-
logger.info(f"Completed all validations. Total time: {time.time() - start_time:.1f} seconds")
70+
def send_files(tests: list[TestData]):
71+
start_time = time.time()
72+
for test in tests:
73+
logger.info(f"Upload for Test: {test.name} ")
74+
key = upload_file_to_s3(test.file_name, SOURCE_BUCKET, INPUT_PREFIX)
75+
test.key = key
76+
logger.info(f"Files uploaded. Time: {time.time() - start_time:.1f} seconds")
77+
78+
79+
def poll_for_responses(tests: list[TestData], max_timeout=1200):
80+
logger.info("Waiting while processing...")
81+
start_time = time.time()
82+
# while there are still pending files, poll for acks and forwarded files
83+
pending = True
84+
while pending and (time.time() - start_time) < max_timeout:
85+
pending = False
86+
for test in tests:
87+
pending = test.poll_destination(pending)
88+
if pending:
89+
print(".", end="")
90+
time.sleep(5)
91+
logger.info(f"Files processed. Time: {time.time() - start_time:.1f} seconds")
92+
93+
94+
def validate_responses(tests: list[TestData]):
95+
start_time = time.time()
96+
for test in tests:
97+
logger.info(f"Validation for Test: {test.name} ")
98+
# Validate the ACK file
99+
inf_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.INF])
100+
101+
check_ack_file_content(test.name, inf_ack_content, "Success", None, test.actions)
102+
validate_row_count(f"{test.name} - inf", test.file_name, test.ack_keys[DestinationType.BUS])
103+
# check row after header
104+
bus_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.BUS])
105+
# loop through each line in the bus ack content
106+
107+
# sometimes OK and sometimes CREATE
108+
check_inf_file_content(f"{test.name} - bus", bus_ack_content, "OK", test.operation_outcome,
109+
test.getOperations())
110+
111+
logger.info(f"Responses validated. Time: {time.time() - start_time:.1f} seconds")

e2e_batch/utils.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
PERMISSIONS_CONFIG_FILE_KEY,
2626
INPUT_PREFIX,
2727
HEADER_RESPONSE_CODE_COLUMN,
28+
ActionFlag,
29+
Operation,
30+
get_operation,
31+
ActionSequence,
2832
)
2933

3034
ods_vaccines = {
@@ -59,17 +63,19 @@ class OpMsgs:
5963

6064
class TestData:
6165

62-
def __init__(self, name,
66+
def __init__(self,
6367
vax_ods,
64-
actions: list, header="NHS_NUMBER",
68+
action_sequence: ActionSequence, header="NHS_NUMBER",
6569
expected: BusRowResult = BusRowResult.SUCCESS,
6670
dose_amount=0.5,
6771
inject_char=False,
6872
version=5,
73+
name=None,
6974
operation_outcome: OpMsgs = None):
70-
self.name = name # name of the test
75+
suffix = f" - {name}" if name else ""
76+
self.name = f"{action_sequence.description}{suffix}" # description of the test
77+
self.action_sequence = action_sequence # ActionSequence object containing description and list of actions
7178
self.dose_amount = dose_amount # dose amount to use in the csv rows
72-
self.actions = actions # list of actions to include in the csv rows
7379
self.vax = ods_vaccines[vax_ods][0] # Use the first vaccine for the ods
7480
self.ods = vax_ods # ods code to use in the csv rows
7581
self.expected = expected # expected result for each row in the bus ack file
@@ -80,6 +86,12 @@ def __init__(self, name,
8086
self.ack_keys = {DestinationType.INF: None, DestinationType.BUS: None} # s3 key prefixes of the ack files
8187
self.operation_outcome = operation_outcome # expected operation outcome message in the bus ack file
8288

89+
def getOperations(self) -> list[Operation]:
90+
operations = []
91+
for action in self.actions:
92+
operations.append(get_operation(action))
93+
return operations
94+
8395
def poll_destination(self, pending: bool) -> bool:
8496
# loop through keys in test (inf and bus)
8597
for ack_key in self.ack_keys.keys():
@@ -92,7 +104,7 @@ def poll_destination(self, pending: bool) -> bool:
92104
return pending
93105

94106

95-
def generate_csv(dose_amount, action_flag, headers="NHS_NUMBER", same_id=False, version="4",
107+
def generate_csv(dose_amount, action_flag: ActionFlag, headers="NHS_NUMBER", same_id=False, version="4",
96108
vax_type="RSV", ods="YGM41"):
97109
"""
98110
Generate a CSV file with 2 or 3 rows depending on the action_flag.
@@ -116,7 +128,7 @@ def generate_csv(dose_amount, action_flag, headers="NHS_NUMBER", same_id=False,
116128

117129
data = []
118130

119-
if action_flag == "CREATE":
131+
if action_flag == ActionFlag.CREATE:
120132
if same_id:
121133

122134
unique_id = str(uuid.uuid4())
@@ -349,7 +361,7 @@ def validate_fatal_error(desc, row, index, expected_outcome):
349361
)
350362

351363

352-
def validate_ok_response(desc, row, index, operation_requested):
364+
def validate_ok_response(desc, row, index, operation_requested: Operation):
353365
"""
354366
Validate the LOCAL_ID format and verify that the DynamoDB primary key (PK)
355367
and operation match the expected values for OK responses.
@@ -519,7 +531,7 @@ def generate_csv_with_ordered_100000_rows(vax_type, ods):
519531
# Generate remaining 99,700 rows as CREATE operations
520532
create_data = [
521533
create_row(
522-
unique_id=str(uuid.uuid4()), action_flag="NEW", dose_amount="0.3", header="NHS_NUMBER"
534+
unique_id=str(uuid.uuid4()), action_flag=ActionFlag.CREATE, dose_amount="0.3", header="NHS_NUMBER"
523535
)
524536
for _ in range(total_rows - special_row_count)
525537
]
@@ -567,12 +579,12 @@ def get_file_name(vax_type, ods, version="5"):
567579
def generate_csv_files(seed_data_list: list[TestData]) -> list[TestData]:
568580
"""Generate CSV files based on a list of TestData instances."""
569581
for seed_data in seed_data_list:
570-
file_name = (generate_csv_file(seed_data, actions=seed_data.actions))
582+
file_name = (generate_csv_file(seed_data, actions=seed_data.action_sequence.actions))
571583
seed_data.file_name = file_name
572584
return seed_data_list
573585

574586

575-
def generate_csv_file(seed: TestData, actions: str) -> str:
587+
def generate_csv_file(seed: TestData, actions: list[ActionFlag]) -> str:
576588

577589
data = []
578590
for action in actions:
@@ -618,7 +630,8 @@ def cleanup(data_list: list[TestData]):
618630
logger.warning(f"s3 delete fail {ACK_BUCKET}: {ack_key}")
619631

620632

621-
def check_inf_file_content(desc, content, response_code, operation_outcome, operation_requested):
633+
def check_inf_file_content(desc, content, response_code, operation_outcome,
634+
operations_requested: list[Operation]):
622635

623636
reader = csv.DictReader(content.splitlines(), delimiter="|")
624637
rows = list(reader)
@@ -629,7 +642,7 @@ def check_inf_file_content(desc, content, response_code, operation_outcome, oper
629642

630643
first_row = rows[0]
631644
validate_header_response_code(desc, first_row, 0, "OK")
632-
validate_ok_response(desc, first_row, 0, operation_requested)
645+
validate_ok_response(desc, first_row, 0, operations_requested[0])
633646

634647
second_row = rows[1]
635648
validate_header_response_code(desc, second_row, 1, "Fatal Error")
@@ -650,6 +663,6 @@ def check_inf_file_content(desc, content, response_code, operation_outcome, oper
650663
f"but got '{row_OPERATION_OUTCOME}'"
651664
)
652665
if row_HEADER_RESPONSE_CODE == "OK":
653-
validate_ok_response(desc, row, i, operation_requested[i])
666+
validate_ok_response(desc, row, i, operations_requested[i])
654667
elif row_HEADER_RESPONSE_CODE == "Fatal Error":
655668
validate_fatal_error(desc, row, i, operation_outcome)

0 commit comments

Comments
 (0)