Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/quality-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ jobs:
working-directory: lambdas/ack_backend
id: acklambda
env:
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:${{ github.workspace }}/ack_backend/tests
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:tests:${{ env.SHARED_PATH }}/src
continue-on-error: true
run: |
poetry install
Expand Down
49 changes: 25 additions & 24 deletions lambdas/ack_backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lambdas/ack_backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ python = "~3.11"
boto3 = "~1.40.45"
mypy-boto3-dynamodb = "^1.40.44"
freezegun = "^1.5.2"
moto = "^4"
moto = "^5.1.14"
coverage = "^7.10.7"


Expand Down
30 changes: 16 additions & 14 deletions lambdas/ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import json
from logging_decorators import ack_lambda_handler_logging_decorator
from update_ack_file import update_ack_file
from update_ack_file import update_ack_file, complete_batch_file_process
from utils_for_ack_lambda import is_ack_processing_complete
from convert_message_to_ack_row import convert_message_to_ack_row


@ack_lambda_handler_logging_decorator
def lambda_handler(event, context):
def lambda_handler(event, _):
"""
Ack lambda handler.
For each record: each message in the array of messages is converted to an ack row,
Expand All @@ -22,6 +23,7 @@ def lambda_handler(event, context):
message_id = None

ack_data_rows = []
total_ack_rows_processed = 0

for i, record in enumerate(event["Records"]):

Expand All @@ -31,10 +33,8 @@ def lambda_handler(event, context):
raise ValueError("Could not load incoming message body") from body_json_error

if i == 0:
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case, provided that
# there is only one file processing at a time for each supplier queue (combination of supplier and vaccine
# type).
# The SQS FIFO MessageGroupId that this lambda consumes from is based on the source filename + created at
# datetime. Therefore, can safely retrieve file metadata from the first record in the list
file_key = incoming_message_body[0].get("file_key")
message_id = (incoming_message_body[0].get("row_id", "")).split("^")[0]
vaccine_type = incoming_message_body[0].get("vaccine_type")
Expand All @@ -44,14 +44,16 @@ def lambda_handler(event, context):
for message in incoming_message_body:
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))

update_ack_file(
file_key,
message_id,
supplier,
vaccine_type,
created_at_formatted_string,
ack_data_rows,
)
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)

# Get the row count of the final processed record
# Format of the row id is {batch_message_id}^{row_number}
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])

if is_ack_processing_complete(message_id, total_ack_rows_processed):
complete_batch_file_process(
message_id, supplier, vaccine_type, created_at_formatted_string, file_key, total_ack_rows_processed
)

return {
"statusCode": 200,
Expand Down
15 changes: 15 additions & 0 deletions lambdas/ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Add the filename to the audit table and check for duplicates."""

from typing import Optional
from common.clients import dynamodb_client, logger
from common.models.errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, FileStatus, AuditTableKeys
Expand Down Expand Up @@ -28,3 +29,17 @@ def change_audit_table_status_to_processed(file_key: str, message_id: str) -> No
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error


def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
"""Retrieves full audit entry by unique event message ID"""
audit_record = dynamodb_client.get_item(
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
)

record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")

if not record_count:
return None

return int(record_count)
6 changes: 6 additions & 0 deletions lambdas/ack_backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")

COMPLETED_ACK_DIR = "forwardedFile"
TEMP_ACK_DIR = "TempAck"
BATCH_FILE_PROCESSING_DIR = "processing"
BATCH_FILE_ARCHIVE_DIR = "archive"


def get_source_bucket_name() -> str:
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""
Expand All @@ -30,6 +35,7 @@ class AuditTableKeys:
FILENAME = "filename"
MESSAGE_ID = "message_id"
QUEUE_NAME = "queue_name"
RECORD_COUNT = "record_count"
STATUS = "status"
TIMESTAMP = "timestamp"

Expand Down
2 changes: 1 addition & 1 deletion lambdas/ack_backend/src/logging_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def wrapper(message, created_at_formatted_string):
return wrapper


def upload_ack_file_logging_decorator(func):
def complete_batch_file_process_logging_decorator(func):
"""This decorator logs when record processing is complete."""

@wraps(func)
Expand Down
114 changes: 50 additions & 64 deletions lambdas/ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

from botocore.exceptions import ClientError
from io import StringIO, BytesIO
from typing import Optional
from audit_table import change_audit_table_status_to_processed
from common.clients import get_s3_client, logger
from constants import ACK_HEADERS, get_source_bucket_name, get_ack_bucket_name
from logging_decorators import upload_ack_file_logging_decorator
from utils_for_ack_lambda import get_row_count
from constants import (
ACK_HEADERS,
get_source_bucket_name,
get_ack_bucket_name,
COMPLETED_ACK_DIR,
TEMP_ACK_DIR,
BATCH_FILE_PROCESSING_DIR,
BATCH_FILE_ARCHIVE_DIR,
)
from logging_decorators import complete_batch_file_process_logging_decorator


def create_ack_data(
Expand Down Expand Up @@ -45,6 +51,35 @@ def create_ack_data(
}


@complete_batch_file_process_logging_decorator
def complete_batch_file_process(
message_id: str,
supplier: str,
vaccine_type: str,
created_at_formatted_string: str,
file_key: str,
total_ack_rows_processed: int,
) -> dict:
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
the audit table status"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"

move_file(get_ack_bucket_name(), f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
move_file(
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
)

change_audit_table_status_to_processed(file_key, message_id)

return {
"message_id": message_id,
"file_key": file_key,
"supplier": supplier,
"vaccine_type": vaccine_type,
"row_count": total_ack_rows_processed,
}


def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
Expand All @@ -65,76 +100,27 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
return accumulated_csv_content


@upload_ack_file_logging_decorator
def upload_ack_file(
temp_ack_file_key: str,
message_id: str,
supplier: str,
vaccine_type: str,
accumulated_csv_content: StringIO,
ack_data_rows: list,
archive_ack_file_key: str,
def update_ack_file(
file_key: str,
) -> Optional[dict]:
"""Adds the data row to the uploaded ack file"""
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)

for row in ack_data_rows:
data_row_str = [str(item) for item in row.values()]
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
accumulated_csv_content.write(cleaned_row + "\n")
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))

csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
ack_bucket_name = get_ack_bucket_name()
source_bucket_name = get_source_bucket_name()

get_s3_client().upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)

row_count_source = get_row_count(source_bucket_name, f"processing/{file_key}")
row_count_destination = get_row_count(ack_bucket_name, temp_ack_file_key)
# TODO: Should we check for > and if so what handling is required
if row_count_destination == row_count_source:
move_file(ack_bucket_name, temp_ack_file_key, archive_ack_file_key)
move_file(source_bucket_name, f"processing/{file_key}", f"archive/{file_key}")

# Update the audit table
change_audit_table_status_to_processed(file_key, message_id)

# Ingestion of this file is complete
result = {
"message_id": message_id,
"file_key": file_key,
"supplier": supplier,
"vaccine_type": vaccine_type,
"row_count": row_count_source - 1,
}
else:
result = None
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)
return result


def update_ack_file(
file_key: str,
message_id: str,
supplier: str,
vaccine_type: str,
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
temp_ack_file_key = f"TempAck/{ack_filename}"
archive_ack_file_key = f"forwardedFile/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
upload_ack_file(
temp_ack_file_key,
message_id,
supplier,
vaccine_type,
accumulated_csv_content,
ack_data_rows,
archive_ack_file_key,
file_key,
)


def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
Expand Down
25 changes: 17 additions & 8 deletions lambdas/ack_backend/src/utils_for_ack_lambda.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Utils for ack lambda"""

from common.clients import get_s3_client
from audit_table import get_record_count_by_message_id

_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP: dict[str, int] = {}

def get_row_count(bucket_name: str, file_key: str) -> int:
"""
Looks in the given bucket and returns the count of the number of lines in the given file.
NOTE: Blank lines are not included in the count.
"""
response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
return sum(1 for line in response["Body"].iter_lines() if line.strip())

def is_ack_processing_complete(batch_event_message_id: str, processed_ack_count: int) -> bool:
"""Checks if we have received all the acknowledgement rows for the original source file. Also caches the value of
the source file record count to reduce traffic to DynamoDB"""
if batch_event_message_id in _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP:
return _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] == processed_ack_count

record_count = get_record_count_by_message_id(batch_event_message_id)

if not record_count:
# Record count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
return False

_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] = record_count
return record_count == processed_ack_count
Loading
Loading