Skip to content

Commit 4bbd68c

Browse files
committed
Merge branch 'master' into VED-721-refactor-prelim
2 parents 3893008 + 27555af commit 4bbd68c

File tree

90 files changed

+1717
-569
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+1717
-569
lines changed

ack_backend/poetry.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ack_backend/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ packages = [
1010

1111
[tool.poetry.dependencies]
1212
python = "~3.11"
13-
boto3 = "~1.40.22"
13+
boto3 = "~1.40.28"
1414
mypy-boto3-dynamodb = "^1.40.20"
1515
freezegun = "^1.5.2"
1616
moto = "^4"

backend/poetry.lock

Lines changed: 20 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ packages = [{include = "src"}]
99
[tool.poetry.dependencies]
1010
python = "~3.11"
1111
"fhir.resources" = "~7.0.2"
12-
boto3 = "~1.40.22"
13-
boto3-stubs-lite = {extras = ["dynamodb"], version = "~1.40.22"}
12+
boto3 = "~1.40.28"
13+
boto3-stubs-lite = {extras = ["dynamodb"], version = "~1.40.28"}
1414
aws-lambda-typing = "~2.20.0"
1515
redis = "^4.6.0"
16-
moto = "^5.1.11"
16+
moto = "^5.1.12"
1717
requests = "~2.32.5"
1818
responses = "~0.25.7"
1919
pydantic = "~1.10.13"

batch_processor_filter/poetry.lock

Lines changed: 15 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

batch_processor_filter/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ packages = [{include = "src"}]
1010
python = "~3.11"
1111
coverage = "^7.10.6"
1212
aws-lambda-typing = "~2.20.0"
13-
boto3 = "~1.40.22"
14-
moto = "^5.1.11"
13+
boto3 = "~1.40.28"
14+
moto = "^5.1.12"
1515

1616
[build-system]
1717
requires = ["poetry-core ~= 1.5.0"]

batch_processor_filter/src/batch_audit_repository.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class BatchAuditRepository:
1212
| Key(AuditTableKeys.STATUS).eq(FileStatus.PREPROCESSED)
1313
| Key(AuditTableKeys.STATUS).eq(FileStatus.PROCESSING)
1414
)
15+
_PROCESSING_AND_FAILED_STATUSES = {FileStatus.PROCESSING, FileStatus.FAILED}
1516

1617
def __init__(self):
1718
self._batch_audit_table = boto3.resource("dynamodb", region_name=REGION_NAME).Table(AUDIT_TABLE_NAME)
@@ -25,16 +26,20 @@ def is_duplicate_file(self, file_key: str) -> bool:
2526

2627
return len(matching_files) > 0
2728

28-
def is_event_processing_for_supplier_and_vacc_type(self, supplier: str, vacc_type: str) -> bool:
29+
def is_event_processing_or_failed_for_supplier_and_vacc_type(self, supplier: str, vacc_type: str) -> bool:
2930
queue_name = f"{supplier}_{vacc_type}"
3031

31-
files_in_processing = self._batch_audit_table.query(
32-
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
33-
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name) & Key(AuditTableKeys.STATUS)
34-
.eq(FileStatus.PROCESSING)
35-
).get("Items", [])
32+
for status in self._PROCESSING_AND_FAILED_STATUSES:
33+
files_in_queue = self._batch_audit_table.query(
34+
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
35+
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name) & Key(AuditTableKeys.STATUS)
36+
.eq(status)
37+
).get("Items", [])
38+
39+
if len(files_in_queue) > 0:
40+
return True
3641

37-
return len(files_in_processing) > 0
42+
return False
3843

3944
def update_status(self, message_id: str, updated_status: str) -> None:
4045
self._batch_audit_table.update_item(

batch_processor_filter/src/batch_processor_filter_service.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
"""Batch processor filter service module"""
22
import boto3
33
import json
4-
import logging
54

65
from batch_audit_repository import BatchAuditRepository
76
from batch_file_created_event import BatchFileCreatedEvent
87
from batch_file_repository import BatchFileRepository
9-
from constants import REGION_NAME, FileStatus, QUEUE_URL
8+
from constants import REGION_NAME, FileStatus, QUEUE_URL, FileNotProcessedReason
109
from exceptions import EventAlreadyProcessingForSupplierAndVaccTypeError
10+
from logger import logger
1111
from send_log_to_firehose import send_log_to_firehose
1212

13-
logging.basicConfig(level="INFO")
14-
logger = logging.getLogger()
15-
logger.setLevel("INFO")
16-
1713

1814
class BatchProcessorFilterService:
1915
"""Batch processor filter service class. Provides the business logic for the Lambda function"""
@@ -38,15 +34,21 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
3834

3935
if self._is_duplicate_file(filename):
4036
# Mark as processed and return without error so next event will be picked up from queue
41-
logger.info("A duplicate file has already been processed. Filename: %s", filename)
42-
self._batch_audit_repository.update_status(message_id, FileStatus.DUPLICATE)
37+
logger.error("A duplicate file has already been processed. Filename: %s", filename)
38+
self._batch_audit_repository.update_status(
39+
message_id,
40+
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.DUPLICATE}"
41+
)
4342
self._batch_file_repo.upload_failure_ack(batch_file_created_event)
4443
self._batch_file_repo.move_source_file_to_archive(filename)
4544
return
4645

47-
if self._batch_audit_repository.is_event_processing_for_supplier_and_vacc_type(supplier, vaccine_type):
46+
if self._batch_audit_repository.is_event_processing_or_failed_for_supplier_and_vacc_type(
47+
supplier,
48+
vaccine_type
49+
):
4850
# Raise error so event is returned to queue and retried again later
49-
logger.info("Batch event already being processed for supplier and vacc type. Filename: %s", filename)
51+
logger.info("Batch event already processing for supplier and vacc type. Filename: %s", filename)
5052
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
5153
f"{supplier} and vacc type: {vaccine_type}")
5254

0 commit comments

Comments
 (0)