Skip to content

Commit c0df4a6

Browse files
committed
debug
1 parent 40e80f8 commit c0df4a6

File tree

2 files changed

+72
-56
lines changed

2 files changed

+72
-56
lines changed

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,53 +27,57 @@ def _is_duplicate_file(self, file_key: str) -> bool:
2727
return self._batch_audit_repository.is_duplicate_file(file_key)
2828

2929
def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
30-
filename = batch_file_created_event["filename"]
31-
message_id = batch_file_created_event["message_id"]
32-
supplier = batch_file_created_event["supplier"]
33-
vaccine_type = batch_file_created_event["vaccine_type"]
30+
try:
31+
filename = batch_file_created_event["filename"]
32+
message_id = batch_file_created_event["message_id"]
33+
supplier = batch_file_created_event["supplier"]
34+
vaccine_type = batch_file_created_event["vaccine_type"]
3435

35-
# debug
36-
print("SAW ----------- apply_filter DEBUG -------------")
37-
print(f"filename: {filename} ")
38-
print(f"message_id: {message_id} ")
39-
print(f"supplier: {supplier} ")
40-
print(f"vaccine_type: {vaccine_type} ")
36+
# debug
37+
print("SAW ----------- apply_filter DEBUG -------------")
38+
print(f"filename: {filename} ")
39+
print(f"message_id: {message_id} ")
40+
print(f"supplier: {supplier} ")
41+
print(f"vaccine_type: {vaccine_type} ")
4142

42-
print("apply_filter...checking for duplicate file...")
43-
if self._is_duplicate_file(filename):
44-
print("apply_filter...duplicate file found")
45-
# Mark as processed and return without error so next event will be picked up from queue
46-
logger.error("A duplicate file has already been processed. Filename: %s", filename)
47-
self._batch_audit_repository.update_status(
48-
message_id,
49-
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
50-
)
51-
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
52-
self._batch_file_repo.move_source_file_to_archive(filename)
53-
return
43+
print("apply_filter...checking for duplicate file...")
44+
if self._is_duplicate_file(filename):
45+
print("apply_filter...duplicate file found")
46+
# Mark as processed and return without error so next event will be picked up from queue
47+
logger.error("A duplicate file has already been processed. Filename: %s", filename)
48+
self._batch_audit_repository.update_status(
49+
message_id,
50+
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
51+
)
52+
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
53+
self._batch_file_repo.move_source_file_to_archive(filename)
54+
return
5455

55-
print("apply_filter...check for event already processing for supplier and vacc type...")
56-
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
57-
supplier,
58-
vaccine_type
59-
):
60-
print("apply_filter...event already processing for supplier and vacc type found")
61-
# Raise error so event is returned to queue and retried again later
62-
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
63-
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
64-
f"{supplier} and vacc type: {vaccine_type}")
56+
print("apply_filter...check for event already processing for supplier and vacc type...")
57+
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
58+
supplier,
59+
vaccine_type
60+
):
61+
print("apply_filter...event already processing for supplier and vacc type found")
62+
# Raise error so event is returned to queue and retried again later
63+
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
64+
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
65+
f"{supplier} and vacc type: {vaccine_type}")
6566

66-
print("apply_filter...forwarding file for processing...")
67-
self._queue_client.send_message(
68-
QueueUrl=QUEUE_URL,
69-
MessageBody=json.dumps(batch_file_created_event),
70-
MessageGroupId=f"{supplier}_{vaccine_type}"
71-
)
72-
print("apply_filter...updating status to processing...")
73-
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
67+
print("apply_filter...forwarding file for processing...")
68+
self._queue_client.send_message(
69+
QueueUrl=QUEUE_URL,
70+
MessageBody=json.dumps(batch_file_created_event),
71+
MessageGroupId=f"{supplier}_{vaccine_type}"
72+
)
73+
print("apply_filter...updating status to processing...")
74+
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
7475

75-
print("apply_filter...sending log to firehose...")
76-
successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
77-
logger.info(successful_log_message)
78-
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})
79-
print("apply_filter...done")
76+
print("apply_filter...sending log to firehose...")
77+
successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
78+
logger.info(successful_log_message)
79+
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})
80+
print("apply_filter...done")
81+
except Exception as ex:
82+
logger.error("Error in batch processor filter service: %s", str(ex))
83+
raise ex

e2e_batch/test_e2e_batch.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,31 @@ def poll_for_responses(tests: list[TestData], max_timeout=1200):
9393

9494
def validate_responses(tests: list[TestData]):
9595
start_time = time.time()
96+
count = 0
97+
expected_count = len(tests) * 2
9698
for test in tests:
9799
logger.info(f"Validation for Test: {test.name} ")
98100
# Validate the ACK file
99-
inf_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.INF])
101+
if test.ack_keys[DestinationType.INF]:
102+
count += 1
103+
inf_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.INF])
104+
else:
105+
logger.error(f"INF ACK file not found for test: {test.name}")
100106

101107
check_ack_file_content(test.name, inf_ack_content, "Success", None, test.actions)
102-
validate_row_count(f"{test.name} - inf", test.file_name, test.ack_keys[DestinationType.BUS])
103-
# check row after header
104-
bus_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.BUS])
105-
# loop through each line in the bus ack content
106-
107-
# sometimes OK and sometimes CREATE
108-
check_inf_file_content(f"{test.name} - bus", bus_ack_content, "OK", test.operation_outcome,
109-
test.getOperations())
110-
111-
logger.info(f"Responses validated. Time: {time.time() - start_time:.1f} seconds")
108+
if test.ack_keys[DestinationType.BUS]:
109+
count += 1
110+
validate_row_count(f"{test.name} - inf", test.file_name, test.ack_keys[DestinationType.BUS])
111+
# check row after header
112+
bus_ack_content = get_file_content_from_s3(ACK_BUCKET, test.ack_keys[DestinationType.BUS])
113+
# loop through each line in the bus ack content
114+
115+
# sometimes OK and sometimes CREATE
116+
check_inf_file_content(f"{test.name} - bus", bus_ack_content, "OK", test.operation_outcome,
117+
test.getOperations())
118+
else:
119+
logger.error(f"BUS ACK file not found for test: {test.name}")
120+
if count == expected_count:
121+
logger.info(f"Responses validated. Time: {time.time() - start_time:.1f} seconds")
122+
else:
123+
logger.error(f"Only {count} of {expected_count} expected responses found. Time: {time.time() - start_time:.1f} seconds")

0 commit comments

Comments
 (0)