Skip to content

Commit c0c1628

Browse files
committed
tests work
1 parent 4103ce8 commit c0c1628

File tree

13 files changed

+385
-363
lines changed

13 files changed

+385
-363
lines changed

ack_backend/src/ack_processor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from logging_decorators import ack_lambda_handler_logging_decorator
55
from update_ack_file import update_ack_file
66
from convert_message_to_ack_row import convert_message_to_ack_row
7-
from clients import logger
87

98

109
@ack_lambda_handler_logging_decorator
@@ -14,7 +13,6 @@ def lambda_handler(event, context):
1413
For each record: each message in the array of messages is converted to an ack row,
1514
then all of the ack rows for that array of messages are uploaded to the ack file in one go.
1615
"""
17-
logger.info("SAW DEBUG ack_processor.lambda_handler")
1816
if not event.get("Records"):
1917
raise ValueError("Error in ack_processor_lambda_handler: No records found in the event")
2018

@@ -25,10 +23,8 @@ def lambda_handler(event, context):
2523
ack_data_rows = []
2624

2725
for i, record in enumerate(event["Records"]):
28-
logger.info(f"SAW DEBUG record {i}")
2926
try:
3027
incoming_message_body = json.loads(record["body"])
31-
logger.info(f"SAW DEBUG incoming_message_body {i}: {incoming_message_body}")
3228
except Exception as body_json_error:
3329
raise ValueError("Could not load incoming message body") from body_json_error
3430

@@ -44,8 +40,6 @@ def lambda_handler(event, context):
4440
created_at_formatted_string = incoming_message_body[0].get("created_at_formatted_string")
4541

4642
for message in incoming_message_body:
47-
msg = message if message else 'EMPTY MESSAGE'
48-
logger.info(f"SAW DEBUG message {i}: {msg}")
4943
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
5044

5145
update_ack_file(file_key, message_id, supplier, vaccine_type, created_at_formatted_string, ack_data_rows)

ack_backend/src/convert_message_to_ack_row.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import Union
44
from logging_decorators import convert_message_to_ack_row_logging_decorator
55
from update_ack_file import create_ack_data
6-
from clients import logger
76

87
def get_error_message_for_ack_file(message_diagnostics) -> Union[None, str]:
98
"""Determines and returns the error message to be displayed in the ack file"""
@@ -26,8 +25,6 @@ def convert_message_to_ack_row(message, created_at_formatted_string):
2625
A value error is raised if the file_key or created_at_formatted_string for the message do not match the
2726
expected values.
2827
"""
29-
logger.info("SAW DEBUG convert_message_to_ack_row")
30-
logger.info(f"SAW DEBUG convert_message_to_ack_row: {message}")
3128
diagnostics = message.get("diagnostics")
3229
return create_ack_data(
3330
created_at_formatted_string=created_at_formatted_string,

ack_backend/src/update_ack_file.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ def create_ack_data(
2020
) -> dict:
2121
"""Returns a dictionary containing the ack headers as keys, along with the relevant values."""
2222
# Pack multi-line diagnostics down to single line (because Imms API diagnostics may be multi-line)
23-
logger.info("SAW DEBUG create_ack_data")
24-
logger.info(f"SAW DEBUG create_ack_data: {diagnostics}")
2523
diagnostics = (
2624
" ".join(diagnostics.replace("\r", " ").replace("\n", " ").replace("\t", " ").replace("\xa0", " ").split())
2725
if diagnostics is not None

batch_processor_filter/src/batch_audit_repository.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,10 @@ def is_event_processing_or_failed_for_supplier_and_vacc_type(self, supplier: str
4242
return False
4343

4444
def update_status(self, message_id: str, updated_status: str) -> None:
45-
try:
46-
print(f"Updating status for message_id: {message_id} to {updated_status}")
47-
print(f"Table name: {AUDIT_TABLE_NAME}")
48-
print(f"Region: {REGION_NAME}")
49-
print(f"Batch audit table: {self._batch_audit_table.table_name}")
50-
self._batch_audit_table.update_item(
51-
Key={AuditTableKeys.MESSAGE_ID: message_id},
52-
UpdateExpression="SET #status = :status",
53-
ExpressionAttributeNames={"#status": "status"},
54-
ExpressionAttributeValues={":status": updated_status},
55-
ConditionExpression="attribute_exists(message_id)"
56-
)
57-
print(f"Successfully updated status for message_id: {message_id} to {updated_status}")
58-
except Exception as e:
59-
print(f"Error updating status for message_id: {message_id} to {updated_status}: {e}")
60-
raise
45+
self._batch_audit_table.update_item(
46+
Key={AuditTableKeys.MESSAGE_ID: message_id},
47+
UpdateExpression="SET #status = :status",
48+
ExpressionAttributeNames={"#status": "status"},
49+
ExpressionAttributeValues={":status": updated_status},
50+
ConditionExpression="attribute_exists(message_id)"
51+
)

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,57 +27,40 @@ def _is_duplicate_file(self, file_key: str) -> bool:
2727
return self._batch_audit_repository.is_duplicate_file(file_key)
2828

2929
def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
30-
try:
31-
filename = batch_file_created_event["filename"]
32-
message_id = batch_file_created_event["message_id"]
33-
supplier = batch_file_created_event["supplier"]
34-
vaccine_type = batch_file_created_event["vaccine_type"]
30+
filename = batch_file_created_event["filename"]
31+
message_id = batch_file_created_event["message_id"]
32+
supplier = batch_file_created_event["supplier"]
33+
vaccine_type = batch_file_created_event["vaccine_type"]
3534

36-
# debug
37-
print("SAW ----------- apply_filter DEBUG -------------")
38-
print(f"filename: {filename} ")
39-
print(f"message_id: {message_id} ")
40-
print(f"supplier: {supplier} ")
41-
print(f"vaccine_type: {vaccine_type} ")
35+
logger.info("Received batch file event for filename: %s with message id: %s", filename, message_id)
4236

43-
print("apply_filter...checking for duplicate file...")
44-
if self._is_duplicate_file(filename):
45-
print("apply_filter...duplicate file found")
46-
# Mark as processed and return without error so next event will be picked up from queue
47-
logger.error("A duplicate file has already been processed. Filename: %s", filename)
48-
self._batch_audit_repository.update_status(
49-
message_id,
50-
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
51-
)
52-
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
53-
self._batch_file_repo.move_source_file_to_archive(filename)
54-
return
37+
if self._is_duplicate_file(filename):
38+
# Mark as processed and return without error so next event will be picked up from queue
39+
logger.error("A duplicate file has already been processed. Filename: %s", filename)
40+
self._batch_audit_repository.update_status(
41+
message_id,
42+
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
43+
)
44+
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
45+
self._batch_file_repo.move_source_file_to_archive(filename)
46+
return
5547

56-
print("apply_filter...check for event already processing for supplier and vacc type...")
57-
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
58-
supplier,
59-
vaccine_type
60-
):
61-
print("apply_filter...event already processing for supplier and vacc type found")
62-
# Raise error so event is returned to queue and retried again later
63-
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
64-
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
65-
f"{supplier} and vacc type: {vaccine_type}")
48+
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
49+
supplier,
50+
vaccine_type
51+
):
52+
# Raise error so event is returned to queue and retried again later
53+
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
54+
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
55+
f"{supplier} and vacc type: {vaccine_type}")
6656

67-
print("apply_filter...forwarding file for processing...")
68-
self._queue_client.send_message(
69-
QueueUrl=QUEUE_URL,
70-
MessageBody=json.dumps(batch_file_created_event),
71-
MessageGroupId=f"{supplier}_{vaccine_type}"
72-
)
73-
print("apply_filter...updating status to processing...")
74-
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
57+
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
58+
self._queue_client.send_message(
59+
QueueUrl=QUEUE_URL,
60+
MessageBody=json.dumps(batch_file_created_event),
61+
MessageGroupId=f"{supplier}_{vaccine_type}"
62+
)
7563

76-
print("apply_filter...sending log to firehose...")
77-
successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
78-
logger.info(successful_log_message)
79-
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})
80-
print("apply_filter...done")
81-
except Exception as ex:
82-
logger.error("Error in batch processor filter service: %s", str(ex))
83-
raise ex
64+
successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
65+
logger.info(successful_log_message)
66+
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})

e2e_batch/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
-include .env
22

3-
run-immunization-batch:
4-
ENVIRONMENT=$(environment) poetry run python -m unittest -v -c
3+
test:
4+
ENVIRONMENT=$(ENVIRONMENT) poetry run python -m unittest -v -c

e2e_batch/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# End-to-End Batch Test Suite (test_e2e_batch.py)
2+
3+
This test suite provides automated end-to-end (E2E) testing for the Immunisation FHIR API batch processing pipeline. It verifies that batch file submissions are correctly processed, acknowledged, and validated across the system.
4+
5+
## Overview
6+
- Framework: Python unittest
7+
- Purpose: Simulate real-world batch file submissions, poll for acknowledgements, and validate processing results.
8+
- Test Scenarios: Defined in the scenarios module and enabled in setUp().
9+
- Key Features:
10+
- - Uploads test batch files to S3.
11+
- - Waits for and validates ACK (acknowledgement) files.
12+
- - Cleans up SQS queues and test artifacts after each run.
13+
14+
## Test Flow
15+
1. Setup (setUp)
16+
- Loads and enables a set of test scenarios.
17+
- Prepares test data for batch submission.
18+
2. Test Execution (test_batch_submission)
19+
- Uploads ALL enabled test files to S3.
20+
- Polls for ALL ACK responses and forwarded files.
21+
- Validates the content and structure of ACK files.
22+
3. Teardown (tearDown)
23+
- Cleans up SQS queues and any generated test files.
24+
25+
## Key Functions
26+
- send_files(tests): Uploads enabled test files to the S3 input bucket.
27+
- poll_for_responses(tests, max_timeout): Polls for ACKs and processed files, with a timeout.
28+
- validate_responses(tests): Validates the content of ACK files and checks for expected outcomes.
29+
30+
## How to Run
31+
1. Ensure all dependencies and environment variables are set (see project root README).
32+
2. Run tests from vscode debugger or from makefile using
33+
```
34+
make test
35+
```

e2e_batch/clients.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
"""
44

55
import logging
6-
from constants import (environment, REGION)
6+
from constants import (
7+
environment, REGION,
8+
batch_fifo_queue_name, ack_metadata_queue_name, audit_table_name
9+
)
710
from boto3 import client as boto3_client, resource as boto3_resource
811

912

@@ -16,10 +19,9 @@
1619
sqs_client = boto3_client('sqs', region_name=REGION)
1720
events_table_name = f"imms-{environment}-imms-events"
1821
events_table = dynamodb.Table(events_table_name)
19-
audit_table_name = f"immunisation-batch-{environment}-audit-table"
2022
audit_table = dynamodb.Table(audit_table_name)
21-
batch_fifo_queue_name = f"imms-{environment}-batch-file-created-queue.fifo"
2223
batch_fifo_queue_url = sqs_client.get_queue_url(QueueName=batch_fifo_queue_name)['QueueUrl']
24+
ack_metadata_queue_url = sqs_client.get_queue_url(QueueName=ack_metadata_queue_name)['QueueUrl']
2325
# Logger
2426
logging.basicConfig(level="INFO")
2527
logger = logging.getLogger()

e2e_batch/constants.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
POST_VALIDATION_ERROR = "Validation errors: contained[?(@.resourceType=='Patient')].name[0].given is a mandatory field"
1313
DUPLICATE = "The provided identifier:"
1414
ACK_PREFIX = "ack/"
15+
TEMP_ACK_PREFIX = "TempAck/"
1516
HEADER_RESPONSE_CODE_COLUMN = "HEADER_RESPONSE_CODE"
1617
FILE_NAME_VAL_ERROR = "Infrastructure Level Response Value - Processing Error"
1718
CONFIG_BUCKET = "imms-internal-dev-supplier-config"
1819
PERMISSIONS_CONFIG_FILE_KEY = "permissions_config.json"
1920
RAVS_URI = "https://www.ravs.england.nhs.uk/"
21+
batch_fifo_queue_name = f"imms-{environment}-batch-file-created-queue.fifo"
22+
ack_metadata_queue_name = f"imms-{environment}-ack-metadata-queue.fifo"
23+
audit_table_name = f"immunisation-batch-{environment}-audit-table"
2024

2125

2226
class EventName():
@@ -40,6 +44,38 @@ class ActionFlag():
4044
NONE = "NONE"
4145

4246

47+
class InfResult:
48+
SUCCESS = "Success"
49+
PARTIAL_SUCCESS = "Partial Success"
50+
FATAL_ERROR = "Fatal Error"
51+
52+
53+
class BusRowResult():
54+
SUCCESS = "OK"
55+
FATAL_ERROR = "Fatal Error"
56+
IMMS_NOT_FOUND = "Immunization resource does not exist"
57+
NONE = "NONE"
58+
59+
60+
class OperationOutcome:
61+
IMMS_NOT_FOUND = "Immunization resource does not exist"
62+
TEST = "TEST"
63+
64+
65+
class OpMsgs:
66+
VALIDATION_ERROR = "Validation errors"
67+
MISSING_MANDATORY_FIELD = "is a mandatory field"
68+
DOSE_QUANTITY_NOT_NUMBER = "doseQuantity.value must be a number"
69+
IMM_NOT_EXIST = "Immunization resource does not exist"
70+
IDENTIFIER_PROVIDED = "The provided identifier:"
71+
INVALID_DATE_FORMAT = "is not in the correct format"
72+
73+
74+
class DestinationType:
75+
INF = ACK_PREFIX
76+
BUS = FORWARDEDFILE_PREFIX
77+
78+
4379
class ActionSequence():
4480
def __init__(self, desc: str, actions: list[ActionFlag], outcome: ActionFlag = None):
4581
self.actions = actions
@@ -62,7 +98,7 @@ class TestSet():
6298
UPDATE_FAIL = ActionSequence("Update without Create. Fail", [ActionFlag.UPDATE], outcome=ActionFlag.NONE)
6399

64100

65-
def create_row(unique_id, dose_amount, action_flag: ActionFlag, header, inject_char=None):
101+
def create_row(unique_id, dose_amount, action_flag: str, header, inject_char=None):
66102
"""Helper function to create a single row with the specified UNIQUE_ID and ACTION_FLAG."""
67103

68104
name = "James" if not inject_char else b'Jam\xe9s'

0 commit comments

Comments
 (0)