Skip to content

Commit e00f0b9

Browse files
lillie-daesteph-torres-nhsNogaNHS
authored
[PRMP-585] Create ReviewProcessor lambda logic (#846)
Co-authored-by: steph-torres-nhs <[email protected]> Co-authored-by: NogaNHS <[email protected]>
1 parent adc7b8b commit e00f0b9

16 files changed

+1023
-26
lines changed

.github/workflows/base-lambdas-reusable-deploy-all.yml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ jobs:
696696
lambda_layer_names: "core_lambda_layer"
697697
secrets:
698698
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
699-
699+
700700
deploy_get_document_reference_by_id_lambda:
701701
name: Deploy get_document_reference_lambda
702702
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
@@ -710,3 +710,17 @@ jobs:
710710
lambda_layer_names: "core_lambda_layer"
711711
secrets:
712712
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
713+
714+
deploy_review_processor_lambda:
715+
name: Deploy Review Processor Lambda
716+
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
717+
with:
718+
environment: ${{ inputs.environment }}
719+
python_version: ${{ inputs.python_version }}
720+
build_branch: ${{ inputs.build_branch }}
721+
sandbox: ${{ inputs.sandbox }}
722+
lambda_handler_name: document_review_processor_handler
723+
lambda_aws_name: DocumentReviewProcessor
724+
lambda_layer_names: "core_lambda_layer"
725+
secrets:
726+
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from models.sqs.review_message_body import ReviewMessageBody
2+
from pydantic import ValidationError
3+
from services.document_review_processor_service import ReviewProcessorService
4+
from utils.audit_logging_setup import LoggingService
5+
from utils.decorators.ensure_env_var import ensure_environment_variables
6+
from utils.decorators.override_error_check import override_error_check
7+
from utils.decorators.set_audit_arg import set_request_context_for_logging
8+
from utils.request_context import request_context
9+
10+
logger = LoggingService(__name__)
11+
12+
13+
@set_request_context_for_logging
14+
@ensure_environment_variables(
15+
names=[
16+
"DOCUMENT_REVIEW_DYNAMODB_NAME",
17+
"STAGING_STORE_BUCKET_NAME",
18+
"PENDING_REVIEW_BUCKET_NAME",
19+
]
20+
)
21+
@override_error_check
22+
def lambda_handler(event, context):
23+
"""
24+
This handler consumes SQS messages from the document review queue, creates DynamoDB
25+
records in the DocumentReview table, and moves files from the staging bucket
26+
to the pending review bucket.
27+
28+
Args:
29+
event: Lambda event containing SQS Event
30+
context: Lambda context
31+
32+
Returns:
33+
None
34+
"""
35+
logger.info("Starting review processor Lambda")
36+
37+
sqs_messages = event.get("Records", [])
38+
review_service = ReviewProcessorService()
39+
40+
for sqs_message in sqs_messages:
41+
try:
42+
message = ReviewMessageBody.model_validate_json(sqs_message["body"])
43+
44+
review_service.process_review_message(message)
45+
46+
except ValidationError as error:
47+
logger.error("Malformed review message")
48+
logger.error(error)
49+
raise error
50+
51+
except Exception as error:
52+
logger.error(
53+
f"Failed to process review message: {str(error)}",
54+
{"Result": "Review processing failed"},
55+
)
56+
raise error
57+
58+
request_context.patient_nhs_no = ""
59+
logger.info("Continuing to next message.")

lambdas/models/document_review.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class DocumentUploadReviewReference(BaseModel):
4444
ttl: int | None = Field(
4545
alias=str(DocumentReferenceMetadataFields.TTL.value), default=None
4646
)
47-
document_reference_id: str = Field(default=None)
47+
document_reference_id: str | None = Field(default=None)
4848
document_snomed_code_type: str = Field(default=SnomedCodes.LLOYD_GEORGE.value.code)
4949

5050
def model_dump_camel_case(self, *args, **kwargs):
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from pydantic import BaseModel, Field
2+
3+
4+
class ReviewMessageFile(BaseModel):
5+
"""Model for individual file in SQS message body from the document review queue."""
6+
7+
file_name: str
8+
file_path: str = Field(description="Location in the staging bucket")
9+
"""Location in the staging bucket"""
10+
11+
12+
class ReviewMessageBody(BaseModel):
13+
"""Model for SQS message body from the document review queue."""
14+
15+
upload_id: str
16+
files: list[ReviewMessageFile]
17+
nhs_number: str
18+
failure_reason: str
19+
upload_date: str
20+
uploader_ods: str
21+
current_gp: str

lambdas/requirements/layers/requirements_core_lambda_layer.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PyJWT==2.8.0
22
PyYAML==6.0.1
3-
boto3==1.34.128
4-
botocore==1.34.128
3+
boto3==1.40.71
4+
botocore==1.40.71
55
charset-normalizer==3.2.0
66
cryptography==44.0.1
77
idna==3.7

lambdas/services/base/dynamo_service.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,27 @@ def query_table(
164164

165165
return items
166166

167-
def create_item(self, table_name, item):
167+
def create_item(self, table_name, item, key_name: str | None = None):
168+
"""
169+
Put an item into the specified DynamoDB table with a condition on the existence of the key.
170+
Args:
171+
table_name: Name of the DynamoDB table
172+
item: The item to be inserted (as a dictionary)
173+
key_name: The name of the key field to check existance for conditional put
174+
Returns:
175+
Response from the DynamoDB put_item operation
176+
Raises:
177+
ClientError: For AWS service errors (DynamoDB)
178+
"""
168179
try:
169180
table = self.get_table(table_name)
170181
logger.info(f"Writing item to table: {table_name}")
171-
table.put_item(Item=item)
182+
if key_name:
183+
return table.put_item(
184+
Item=item, ConditionExpression=f"attribute_not_exists({key_name})"
185+
)
186+
else:
187+
return table.put_item(Item=item)
172188
except ClientError as e:
173189
logger.error(
174190
str(e), {"Result": f"Unable to write item to table: {table_name}"}

lambdas/services/base/s3_service.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,32 @@ def copy_across_bucket(
116116
source_file_key: str,
117117
dest_bucket: str,
118118
dest_file_key: str,
119+
if_none_match: str | None = None,
119120
):
121+
if if_none_match is not None:
122+
return self.client.copy_object(
123+
Bucket=dest_bucket,
124+
Key=dest_file_key,
125+
CopySource={"Bucket": source_bucket, "Key": source_file_key},
126+
IfNoneMatch=if_none_match,
127+
StorageClass="INTELLIGENT_TIERING",
128+
)
120129
return self.client.copy_object(
121130
Bucket=dest_bucket,
122131
Key=dest_file_key,
123132
CopySource={"Bucket": source_bucket, "Key": source_file_key},
124133
StorageClass="INTELLIGENT_TIERING",
125134
)
126135

127-
def delete_object(self, s3_bucket_name: str, file_key: str, version_id: str | None = None):
136+
def delete_object(
137+
self, s3_bucket_name: str, file_key: str, version_id: str | None = None
138+
):
128139
if version_id is None:
129140
return self.client.delete_object(Bucket=s3_bucket_name, Key=file_key)
130-
131-
return self.client.delete_object(Bucket=s3_bucket_name, Key=file_key, VersionId=version_id)
141+
142+
return self.client.delete_object(
143+
Bucket=s3_bucket_name, Key=file_key, VersionId=version_id
144+
)
132145

133146
def create_object_tag(
134147
self, s3_bucket_name: str, file_key: str, tag_key: str, tag_value: str
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import os
2+
from datetime import datetime, timezone
3+
4+
from botocore.exceptions import ClientError
5+
from enums.document_review_status import DocumentReviewStatus
6+
from models.document_reference import DocumentReferenceMetadataFields
7+
from models.document_review import (
8+
DocumentReviewFileDetails,
9+
DocumentUploadReviewReference,
10+
)
11+
from models.sqs.review_message_body import ReviewMessageBody
12+
from services.base.dynamo_service import DynamoDBService
13+
from services.base.s3_service import S3Service
14+
from utils.audit_logging_setup import LoggingService
15+
from utils.request_context import request_context
16+
17+
logger = LoggingService(__name__)
18+
19+
20+
class ReviewProcessorService:
21+
"""
22+
Service for processing single SQS messages from the document review queue.
23+
"""
24+
25+
def __init__(self):
26+
"""Initialize the review processor service with required AWS services."""
27+
self.dynamo_service = DynamoDBService()
28+
self.s3_service = S3Service()
29+
30+
self.review_table_name = os.environ["DOCUMENT_REVIEW_DYNAMODB_NAME"]
31+
self.staging_bucket_name = os.environ["STAGING_STORE_BUCKET_NAME"]
32+
self.review_bucket_name = os.environ["PENDING_REVIEW_BUCKET_NAME"]
33+
34+
def process_review_message(self, review_message: ReviewMessageBody) -> None:
35+
"""
36+
Process a single SQS message from the review queue.
37+
38+
Args:
39+
sqs_message: SQS message record containing file and failure information
40+
41+
Raises:
42+
InvalidMessageException: If message format is invalid or required fields missing
43+
S3FileNotFoundException: If file doesn't exist in staging bucket
44+
ClientError: For AWS service errors (DynamoDB, S3)
45+
"""
46+
47+
logger.info("Processing review queue message")
48+
49+
request_context.patient_nhs_no = review_message.nhs_number
50+
51+
review_id = review_message.upload_id
52+
review_files = self._move_files_to_review_bucket(review_message, review_id)
53+
document_upload_review = self._build_review_record(
54+
review_message, review_id, review_files
55+
)
56+
try:
57+
self.dynamo_service.create_item(
58+
table_name=self.review_table_name,
59+
item=document_upload_review.model_dump(
60+
by_alias=True, exclude_none=True
61+
),
62+
key_name=DocumentReferenceMetadataFields.ID.value,
63+
)
64+
65+
logger.info(f"Created review record {document_upload_review.id}")
66+
except ClientError as e:
67+
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
68+
logger.info("Entry already exists on Document Review table")
69+
else:
70+
raise e
71+
72+
self._delete_files_from_staging(review_message)
73+
74+
def _build_review_record(
75+
self,
76+
message_data: ReviewMessageBody,
77+
review_id: str,
78+
review_files: list[DocumentReviewFileDetails],
79+
) -> DocumentUploadReviewReference:
80+
return DocumentUploadReviewReference(
81+
id=review_id,
82+
nhs_number=message_data.nhs_number,
83+
review_status=DocumentReviewStatus.PENDING_REVIEW,
84+
review_reason=message_data.failure_reason,
85+
author=message_data.uploader_ods,
86+
custodian=message_data.current_gp,
87+
files=review_files,
88+
upload_date=int(datetime.now(tz=timezone.utc).timestamp()),
89+
)
90+
91+
def _move_files_to_review_bucket(
92+
self, message_data: ReviewMessageBody, review_record_id: str
93+
) -> list[DocumentReviewFileDetails]:
94+
"""
95+
Move file from staging to review bucket.
96+
97+
Args:
98+
message_data: Review queue message data
99+
review_record_id: ID of the review record being created
100+
101+
Returns:
102+
List of DocumentReviewFileDetails with new file locations in review bucket
103+
"""
104+
new_file_keys: list[DocumentReviewFileDetails] = []
105+
106+
for file in message_data.files:
107+
new_file_key = (
108+
f"{message_data.nhs_number}/{review_record_id}/{file.file_name}"
109+
)
110+
111+
logger.info(
112+
f"Copying file from ({file.file_path}) in staging to review bucket: {new_file_key}"
113+
)
114+
try:
115+
116+
self.s3_service.copy_across_bucket(
117+
source_bucket=self.staging_bucket_name,
118+
source_file_key=file.file_path,
119+
dest_bucket=self.review_bucket_name,
120+
dest_file_key=new_file_key,
121+
if_none_match="*",
122+
)
123+
logger.info("File successfully copied to review bucket")
124+
logger.info(f"Successfully moved file to: {new_file_key}")
125+
126+
except ClientError as e:
127+
if e.response["Error"]["Code"] == "PreconditionFailed":
128+
logger.info("File already exists in the Review Bucket")
129+
else:
130+
raise e
131+
132+
new_file_keys.append(
133+
DocumentReviewFileDetails(
134+
file_name=file.file_name,
135+
file_location=new_file_key,
136+
)
137+
)
138+
139+
return new_file_keys
140+
141+
def _delete_files_from_staging(self, message_data: ReviewMessageBody) -> None:
142+
for file in message_data.files:
143+
try:
144+
logger.info(f"Deleting file from staging bucket: {file.file_path}")
145+
self.s3_service.delete_object(
146+
s3_bucket_name=self.staging_bucket_name, file_key=file.file_path
147+
)
148+
except Exception as e:
149+
logger.error(f"Error deleting files from staging: {str(e)}")
150+
# Continue processing as files

lambdas/services/document_upload_review_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
class DocumentUploadReviewService(DocumentService):
1919
"""Service for handling DocumentUploadReviewReference operations."""
20+
2021
DEFAULT_QUERY_LIMIT = 50
22+
2123
def __init__(self):
2224
super().__init__()
2325
self._table_name = os.environ.get("DOCUMENT_REVIEW_DYNAMODB_NAME")
24-
self._s3_bucket = os.environ.get("DOCUMENT_REVIEW_S3_BUCKET_NAME")
26+
self._s3_bucket = os.environ.get("PENDING_REVIEW_BUCKET_NAME")
2527

2628
@property
2729
def table_name(self) -> str:

lambdas/tests/unit/conftest.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,9 @@
138138
MOCK_ALERTING_SLACK_CHANNEL_ID = "slack_channel_id"
139139
MOCK_DOCUMENT_REVIEW_TABLE = "test_document_review"
140140
MOCK_DOCUMENT_REVIEW_BUCKET = "test_document_review_bucket"
141-
MOCK_EDGE_TABLE = "test_edge_reference_table"
142-
143141
MOCK_EDGE_REFERENCE_TABLE = "test_edge_reference_table"
144142

143+
145144
@pytest.fixture
146145
def set_env(monkeypatch):
147146
monkeypatch.setenv("AWS_DEFAULT_REGION", REGION_NAME)
@@ -230,12 +229,12 @@ def set_env(monkeypatch):
230229
monkeypatch.setenv("SLACK_CHANNEL_ID", MOCK_ALERTING_SLACK_CHANNEL_ID)
231230
monkeypatch.setenv("ITOC_TESTING_ODS_CODES", MOCK_ITOC_ODS_CODES)
232231
monkeypatch.setenv("DOCUMENT_REVIEW_DYNAMODB_NAME", MOCK_DOCUMENT_REVIEW_TABLE)
233-
monkeypatch.setenv("DOCUMENT_REVIEW_S3_BUCKET_NAME", MOCK_DOCUMENT_REVIEW_BUCKET)
234-
monkeypatch.setenv("EDGE_REFERENCE_TABLE", MOCK_EDGE_TABLE)
232+
monkeypatch.setenv("PENDING_REVIEW_BUCKET_NAME", MOCK_DOCUMENT_REVIEW_BUCKET)
235233
monkeypatch.setenv("STAGING_STORE_BUCKET_NAME", MOCK_STAGING_STORE_BUCKET)
236234
monkeypatch.setenv("METADATA_SQS_QUEUE_URL", MOCK_LG_METADATA_SQS_QUEUE)
237235
monkeypatch.setenv("EDGE_REFERENCE_TABLE", MOCK_EDGE_REFERENCE_TABLE)
238236

237+
239238
EXPECTED_PARSED_PATIENT_BASE_CASE = PatientDetails(
240239
givenName=["Jane"],
241240
familyName="Smith",

0 commit comments

Comments
 (0)