Skip to content

Commit ca1f209

Browse files
committed
Setup batch filter lambda
1 parent faf50d3 commit ca1f209

File tree

12 files changed

+1097
-14
lines changed

12 files changed

+1097
-14
lines changed

ack_backend/src/constants.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import os
44

55
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
6-
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")
7-
AUDIT_TABLE_FILENAME_GSI = "filename_index"
8-
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
96

107
def get_source_bucket_name() -> str:
118
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""

batch_processor_filter/poetry.lock

Lines changed: 667 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

batch_processor_filter/pyproject.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ readme = "README.md"
77
packages = [{include = "src"}]
88

99
[tool.poetry.dependencies]
10-
python = "~3.11"
11-
coverage = "^7.9.1"
10+
python = "~3.11"
11+
coverage = "^7.9.1"
12+
aws-lambda-typing = "~2.20.0"
13+
boto3 = "~1.38.42"
14+
moto = "^4"
1215

1316
[build-system]
1417
requires = ["poetry-core ~= 1.5.0"]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import boto3
2+
from boto3.dynamodb.conditions import Key
3+
4+
from constants import AUDIT_TABLE_NAME, REGION_NAME, AUDIT_TABLE_FILENAME_GSI, AuditTableKeys, FileStatus, \
5+
AUDIT_TABLE_QUEUE_NAME_GSI
6+
7+
8+
class BatchAuditRepository:
9+
"""Batch audit repository class."""
10+
_DUPLICATE_CHECK_FILE_STATUS_CONDITION = (
11+
Key(AuditTableKeys.STATUS).eq(FileStatus.PROCESSED)
12+
| Key(AuditTableKeys.STATUS).eq(FileStatus.PREPROCESSED)
13+
| Key(AuditTableKeys.STATUS).eq(FileStatus.PROCESSING)
14+
)
15+
16+
def __init__(self):
17+
self._batch_audit_table = boto3.resource("dynamodb", region_name=REGION_NAME).Table(AUDIT_TABLE_NAME)
18+
19+
def is_duplicate_file(self, file_key: str) -> bool:
20+
matching_files = self._batch_audit_table.query(
21+
IndexName=AUDIT_TABLE_FILENAME_GSI,
22+
KeyConditionExpression=Key(AuditTableKeys.FILENAME).eq(file_key),
23+
FilterExpression=self._DUPLICATE_CHECK_FILE_STATUS_CONDITION
24+
).get("Items", [])
25+
26+
return len(matching_files) > 0
27+
28+
def is_event_processing_for_supplier_and_vacc_type(self, supplier: str, vacc_type: str) -> bool:
29+
queue_name = f"{supplier}_{vacc_type}"
30+
31+
files_in_processing = self._batch_audit_table.query(
32+
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
33+
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name) & Key(AuditTableKeys.STATUS)
34+
.eq(FileStatus.PROCESSING)
35+
).get("Items", [])
36+
37+
return len(files_in_processing) > 0
38+
39+
def update_status(self, message_id: str, updated_status: str) -> None:
40+
self._batch_audit_table.update_item(
41+
Key={AuditTableKeys.MESSAGE_ID: message_id},
42+
UpdateExpression="SET #status = :status",
43+
ExpressionAttributeNames={"#status": "status"},
44+
ExpressionAttributeValues={":status": updated_status},
45+
ConditionExpression="attribute_exists(message_id)"
46+
)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from typing import TypedDict
2+
3+
4+
class BatchFileCreatedEvent(TypedDict):
5+
message_id: str
6+
vaccine_type: str
7+
supplier: str
8+
filename: str
9+
permission: list[str]
10+
created_at_formatted_string: str
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""Batch processor filter service module"""
2+
import boto3
3+
import json
4+
import logging
5+
6+
from batch_audit_repository import BatchAuditRepository
7+
from batch_file_created_event import BatchFileCreatedEvent
8+
from constants import REGION_NAME, FileStatus, QUEUE_URL
9+
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError
10+
from send_log_to_firehose import send_log_to_firehose
11+
12+
logging.basicConfig(level="INFO")
13+
logger = logging.getLogger()
14+
logger.setLevel("INFO")
15+
16+
17+
class BatchProcessorFilterService:
18+
"""Batch processor filter service class. Provides the business logic for the Lambda function"""
19+
def __init__(self, audit_repo: BatchAuditRepository = BatchAuditRepository()):
20+
self._batch_audit_repository = audit_repo
21+
self._queue_client = boto3.client('sqs', region_name=REGION_NAME)
22+
23+
def _is_duplicate_file(self, file_key: str) -> bool:
24+
"""Checks if a file with the same name has already been processed or marked for processing"""
25+
return self._batch_audit_repository.is_duplicate_file(file_key)
26+
27+
def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
28+
filename = batch_file_created_event["filename"]
29+
message_id = batch_file_created_event["message_id"]
30+
supplier = batch_file_created_event["supplier"]
31+
vaccine_type = batch_file_created_event["vaccine_type"]
32+
33+
if self._is_duplicate_file(filename):
34+
# Mark as processed and return without error so next event will be picked up from queue
35+
logger.info("A duplicate file has already been processed. Filename: %s", filename)
36+
self._batch_audit_repository.update_status(message_id, FileStatus.DUPLICATE)
37+
return
38+
39+
if self._batch_audit_repository.is_event_processing_for_supplier_and_vacc_type(supplier, vaccine_type):
40+
# Raise error so event is returned to queue and retried again later
41+
logger.info("Batch event already being processed for supplier and vacc type. Filename: %s", filename)
42+
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
43+
f"{supplier} and vacc type: {vaccine_type}")
44+
45+
self._queue_client.send_message(
46+
QueueUrl=QUEUE_URL,
47+
MessageBody=json.dumps(batch_file_created_event),
48+
MessageGroupId=f"{supplier}_{vaccine_type}"
49+
)
50+
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
51+
52+
successful_log_message = "File forwarded for processing by ECS"
53+
logger.info(successful_log_message)
54+
send_log_to_firehose({**batch_file_created_event, "message": successful_log_message})
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
from enum import StrEnum
3+
4+
REGION_NAME = "eu-west-2"
5+
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
6+
AUDIT_TABLE_FILENAME_GSI = os.getenv("FILE_NAME_GSI")
7+
AUDIT_TABLE_QUEUE_NAME_GSI = os.getenv("QUEUE_NAME_GSI")
8+
QUEUE_URL = os.getenv("QUEUE_URL")
9+
SPLUNK_FIREHOSE_STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME")
10+
11+
12+
class FileStatus(StrEnum):
13+
"""File status constants"""
14+
15+
QUEUED = "Queued"
16+
PROCESSING = "Processing"
17+
PREPROCESSED = "Preprocessed"
18+
PROCESSED = "Processed"
19+
DUPLICATE = "Not processed - duplicate"
20+
21+
22+
class AuditTableKeys(StrEnum):
23+
"""Audit table keys"""
24+
25+
FILENAME = "filename"
26+
MESSAGE_ID = "message_id"
27+
QUEUE_NAME = "queue_name"
28+
STATUS = "status"
29+
TIMESTAMP = "timestamp"
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""Exceptions for the batch processor filter Lambda function"""
2+
3+
4+
class InvalidBatchSizeError(Exception):
5+
"""Raised when the SQS event batch size is anything other than 1"""
6+
pass
7+
8+
9+
class EventAlreadyProcessingForSupplierAndVaccTypeError(Exception):
10+
"""Raised when there is already a batch processing event in flight for the same supplier and vaccine type"""
11+
pass
Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,23 @@
1-
def lambda_handler(event, _):
2-
print("hello world")
3-
print(event)
4-
return True
1+
import json
2+
from aws_lambda_typing import events, context
3+
4+
from batch_file_created_event import BatchFileCreatedEvent
5+
from batch_processor_filter_service import BatchProcessorFilterService
6+
from exceptions import InvalidBatchSizeError
7+
8+
9+
service = BatchProcessorFilterService()
10+
11+
12+
def lambda_handler(event: events.SQSEvent, _: context):
13+
event_records = event.get("Records", [])
14+
15+
# Terraform is configured so this Lambda will get a batch size of 1. We are using SQS FIFO with the message group
16+
# id set to {supplier}_{vaccine_type} so we will only want batches of 1 at a time.
17+
# Lambda will scale out to handle multiple message groups in parallel:
18+
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/fifo-queue-lambda-behavior.html
19+
if len(event_records) != 1:
20+
raise InvalidBatchSizeError(f"Received {len(event_records)} records, expected 1")
21+
22+
batch_file_created_event: BatchFileCreatedEvent = json.loads(event_records[0].get("body"))
23+
service.apply_filter(batch_file_created_event)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import json
2+
import boto3
3+
4+
from constants import SPLUNK_FIREHOSE_STREAM_NAME, REGION_NAME
5+
6+
firehose_client = boto3.client("firehose", region_name=REGION_NAME)
7+
8+
9+
# Not keen on including blocking calls in function code for log data
10+
# Consider simply logging and setting up CW subscription filters to forward to Firehose
11+
# https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html
12+
def send_log_to_firehose(log_data: dict) -> None:
13+
"""Sends the log_message to Firehose"""
14+
record = {"Data": json.dumps({"event": log_data}).encode("utf-8")}
15+
firehose_client.put_record(DeliveryStreamName=SPLUNK_FIREHOSE_STREAM_NAME, Record=record)

0 commit comments

Comments
 (0)