Skip to content

Commit e293e5b

Browse files
[PRMP-1512] - Add message to sqs-stitching queue during bulk upload (#554)
1 parent 021d1ff commit e293e5b

File tree

7 files changed

+68
-75
lines changed

7 files changed

+68
-75
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from enums.snomed_codes import SnomedCode, SnomedCodes
2+
from pydantic import BaseModel
3+
4+
5+
class PdfStitchingSqsMessage(BaseModel):
6+
nhs_number: str
7+
snomed_code_doc_type: SnomedCode = SnomedCodes.LLOYD_GEORGE.value

lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import uuid
33

4-
from models.nrl_sqs_message import NrlSqsMessage
4+
from models.pdf_stitching_sqs_message import PdfStitchingSqsMessage
55
from models.staging_metadata import StagingMetadata
66
from services.base.sqs_service import SQSService
77
from utils.audit_logging_setup import LoggingService
@@ -42,11 +42,10 @@ def put_sqs_message_back_to_queue(self, sqs_message: dict):
4242
group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}",
4343
)
4444

45-
def send_message_to_nrl_fifo(
46-
self, queue_url: str, message: NrlSqsMessage, group_id: str
45+
def send_message_to_pdf_stitching_queue(
46+
self, queue_url: str, message: PdfStitchingSqsMessage
4747
):
48-
self.sqs_repository.send_message_fifo(
48+
self.sqs_repository.send_message_standard(
4949
queue_url=queue_url,
5050
message_body=message.model_dump_json(),
51-
group_id=group_id,
5251
)

lambdas/services/bulk_upload_service.py

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44

55
import pydantic
66
from botocore.exceptions import ClientError
7-
from enums.nrl_sqs_upload import NrlActionTypes
87
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
98
from enums.snomed_codes import SnomedCodes
109
from enums.upload_status import UploadStatus
1110
from enums.virus_scan_result import VirusScanResult
12-
from models.fhir.R4.nrl_fhir_document_reference import Attachment
1311
from models.nhs_document_reference import NHSDocumentReference
14-
from models.nrl_sqs_message import NrlSqsMessage
12+
from models.pdf_stitching_sqs_message import PdfStitchingSqsMessage
1513
from models.staging_metadata import MetadataFile, StagingMetadata
1614
from repositories.bulk_upload.bulk_upload_dynamo_repository import (
1715
BulkUploadDynamoRepository,
@@ -57,7 +55,7 @@ def __init__(self, strict_mode):
5755
self.pdf_content_type = "application/pdf"
5856
self.unhandled_messages = []
5957
self.file_path_cache = {}
60-
self.nrl_queue_url = os.environ["NRL_SQS_URL"]
58+
self.pdf_stitching_queue_url = os.environ["PDF_STITCHING_SQS_URL"]
6159

6260
def process_message_queue(self, records: list):
6361
for index, message in enumerate(records, start=1):
@@ -251,9 +249,7 @@ def handle_sqs_message(self, message: dict):
251249
)
252250

253251
try:
254-
last_document_processed = self.create_lg_records_and_copy_files(
255-
staging_metadata, patient_ods_code
256-
)
252+
self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code)
257253
logger.info(
258254
f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}",
259255
{"Result": "Successful upload"},
@@ -290,28 +286,18 @@ def handle_sqs_message(self, message: dict):
290286
accepted_reason,
291287
patient_ods_code,
292288
)
293-
if len(file_names) == 1:
294-
document_api_endpoint = (
295-
os.environ.get("APIM_API_URL", "")
296-
+ "/DocumentReference/"
297-
+ SnomedCodes.LLOYD_GEORGE.value.code
298-
+ "~"
299-
+ last_document_processed.id
300-
)
301-
doc_details = Attachment(
302-
url=document_api_endpoint,
303-
content_type="application/pdf",
304-
)
305-
nrl_sqs_message = NrlSqsMessage(
306-
nhs_number=staging_metadata.nhs_number,
307-
action=NrlActionTypes.CREATE,
308-
attachment=doc_details,
309-
)
310-
self.sqs_repository.send_message_to_nrl_fifo(
311-
queue_url=self.nrl_queue_url,
312-
message=nrl_sqs_message,
313-
group_id=f"nrl_sqs_{uuid.uuid4()}",
314-
)
289+
290+
pdf_stitching_sqs_message = PdfStitchingSqsMessage(
291+
nhs_number=staging_metadata.nhs_number,
292+
snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value,
293+
)
294+
self.sqs_repository.send_message_to_pdf_stitching_queue(
295+
queue_url=self.pdf_stitching_queue_url,
296+
message=pdf_stitching_sqs_message,
297+
)
298+
logger.info(
299+
f"Message sent to stitching queue for patient {staging_metadata.nhs_number}"
300+
)
315301

316302
def resolve_source_file_path(self, staging_metadata: StagingMetadata):
317303
sample_file_path = staging_metadata.files[0].file_path
@@ -357,7 +343,6 @@ def create_lg_records_and_copy_files(
357343
self, staging_metadata: StagingMetadata, current_gp_ods: str
358344
):
359345
nhs_number = staging_metadata.nhs_number
360-
document_reference = None
361346
for file_metadata in staging_metadata.files:
362347
document_reference = self.convert_to_document_reference(
363348
file_metadata, nhs_number, current_gp_ods
@@ -371,8 +356,6 @@ def create_lg_records_and_copy_files(
371356
)
372357
document_reference.set_uploaded_to_true()
373358
self.dynamo_repository.create_record_in_lg_dynamo_table(document_reference)
374-
# returning last document ref until stitching as default is implemented
375-
return document_reference
376359

377360
def rollback_transaction(self):
378361
try:

lambdas/tests/unit/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
MOCK_ZIP_OUTPUT_BUCKET_ENV_NAME = "ZIPPED_STORE_BUCKET_NAME"
3030
MOCK_ZIP_TRACE_TABLE_ENV_NAME = "ZIPPED_STORE_DYNAMODB_NAME"
3131
MOCK_METADATA_NRL_SQS_URL_ENV_NAME = "NRL_SQS_URL"
32+
MOCK_PDF_STITCHING_SQS_URL_ENV_NAME = "PDF_STITCHING_SQS_URL"
3233

3334
MOCK_LG_STAGING_STORE_BUCKET_ENV_NAME = "STAGING_STORE_BUCKET_NAME"
3435
MOCK_LG_METADATA_SQS_QUEUE_ENV_NAME = "METADATA_SQS_QUEUE_URL"
@@ -113,6 +114,10 @@
113114
NRL_SQS_URL = "https://sqs.us-east-1.amazonaws.com/177715257436/MyQueue"
114115
APIM_API_URL = "https://apim.api.service.uk"
115116

117+
PDF_STITCHING_SQS_URL = (
118+
"https://sqs.us-east-1.amazonaws.com/977715257439/MyPdfStitchingQueue"
119+
)
120+
116121

117122
@pytest.fixture
118123
def set_env(monkeypatch):
@@ -131,6 +136,7 @@ def set_env(monkeypatch):
131136
monkeypatch.setenv(MOCK_LG_INVALID_SQS_QUEUE_ENV_NAME, MOCK_LG_INVALID_SQS_QUEUE)
132137
monkeypatch.setenv(MOCK_AUTH_STATE_TABLE_NAME_ENV_NAME, AUTH_STATE_TABLE_NAME)
133138
monkeypatch.setenv(MOCK_METADATA_NRL_SQS_URL_ENV_NAME, NRL_SQS_URL)
139+
monkeypatch.setenv(MOCK_PDF_STITCHING_SQS_URL_ENV_NAME, PDF_STITCHING_SQS_URL)
134140
monkeypatch.setenv(MOCK_AUTH_SESSION_TABLE_NAME_ENV_NAME, AUTH_SESSION_TABLE_NAME)
135141
monkeypatch.setenv(MOCK_OIDC_CALLBACK_URL_ENV_NAME, OIDC_CALLBACK_URL)
136142
monkeypatch.setenv(MOCK_OIDC_CLIENT_ID_ENV_NAME, OIDC_CLIENT_ID)

lambdas/tests/unit/helpers/data/bulk_upload/test_data.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import os
22

3+
from enums.snomed_codes import SnomedCodes
34
from enums.virus_scan_result import VirusScanResult
45
from freezegun import freeze_time
56
from models.nhs_document_reference import NHSDocumentReference
67
from models.nrl_sqs_message import NrlSqsMessage
8+
from models.pdf_stitching_sqs_message import PdfStitchingSqsMessage
79
from models.staging_metadata import MetadataFile, StagingMetadata
810
from tests.unit.conftest import MOCK_LG_BUCKET, TEST_CURRENT_GP_ODS, TEST_UUID
911

@@ -155,6 +157,17 @@ def build_test_nrl_sqs_fifo_message(nhs_number: str, action: str) -> NrlSqsMessa
155157
return nrl_sqs_message
156158

157159

160+
def build_test_pdf_stitching_sqs_message(
161+
nhs_number: str, snomed_code_doc_type
162+
) -> PdfStitchingSqsMessage:
163+
message_body = {
164+
"nhs_number": nhs_number,
165+
"snomed_code_doc_type": snomed_code_doc_type,
166+
}
167+
pdf_stitching_sqs_message = PdfStitchingSqsMessage(**message_body)
168+
return pdf_stitching_sqs_message
169+
170+
158171
@freeze_time("2024-01-01 12:00:00")
159172
def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"):
160173
doc_ref = NHSDocumentReference(
@@ -181,6 +194,10 @@ def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"
181194
TEST_NRL_SQS_MESSAGE = build_test_nrl_sqs_fifo_message(
182195
TEST_NHS_NUMBER_FOR_BULK_UPLOAD, NrlActionTypes.CREATE
183196
)
197+
TEST_SNOMED_CODE_FOR_PDF_STITCHING = SnomedCodes.LLOYD_GEORGE.value
198+
TEST_PDF_STITCHING_SQS_MESSAGE = build_test_pdf_stitching_sqs_message(
199+
TEST_NHS_NUMBER_FOR_BULK_UPLOAD, TEST_SNOMED_CODE_FOR_PDF_STITCHING
200+
)
184201
TEST_STAGING_METADATA_WITH_INVALID_FILENAME = build_test_staging_metadata(
185202
[*make_valid_lg_file_names(2), "invalid_file_name.txt"]
186203
)

lambdas/tests/unit/repositories/bulk_upload/test_bulk_upload_sqs_repository.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22

33
import pytest
44
from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository
5-
from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, NRL_SQS_URL
5+
from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, PDF_STITCHING_SQS_URL
66
from tests.unit.helpers.data.bulk_upload.test_data import (
7-
TEST_GROUP_ID,
87
TEST_NHS_NUMBER_FOR_BULK_UPLOAD,
9-
TEST_NRL_SQS_MESSAGE,
8+
TEST_PDF_STITCHING_SQS_MESSAGE,
109
TEST_SQS_MESSAGE,
1110
TEST_STAGING_METADATA,
1211
)
12+
from utils.audit_logging_setup import LoggingService
13+
14+
logger = LoggingService(__name__)
1315

1416

1517
@pytest.fixture
@@ -47,15 +49,13 @@ def test_put_sqs_message_back_to_queue(set_env, repo_under_test, mock_uuid):
4749
)
4850

4951

50-
def test_send_message_to_nrl_sqs_fifo(set_env, repo_under_test):
51-
repo_under_test.send_message_to_nrl_fifo(
52-
NRL_SQS_URL,
53-
TEST_NRL_SQS_MESSAGE,
54-
TEST_GROUP_ID,
52+
def test_send_message_to_pdf_stitching_queue(set_env, repo_under_test):
53+
repo_under_test.send_message_to_pdf_stitching_queue(
54+
PDF_STITCHING_SQS_URL,
55+
TEST_PDF_STITCHING_SQS_MESSAGE,
5556
)
56-
message_body = TEST_NRL_SQS_MESSAGE
57-
repo_under_test.sqs_repository.send_message_fifo.assert_called_with(
58-
queue_url=NRL_SQS_URL,
57+
message_body = TEST_PDF_STITCHING_SQS_MESSAGE
58+
repo_under_test.sqs_repository.send_message_standard.assert_called_with(
59+
queue_url=PDF_STITCHING_SQS_URL,
5960
message_body=message_body.model_dump_json(),
60-
group_id="123",
6161
)

lambdas/tests/unit/services/test_bulk_upload_service.py

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,17 @@
33

44
import pytest
55
from botocore.exceptions import ClientError
6-
from enums.nrl_sqs_upload import NrlActionTypes
76
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
8-
from enums.snomed_codes import SnomedCodes
97
from enums.upload_status import UploadStatus
108
from enums.virus_scan_result import SCAN_RESULT_TAG_KEY, VirusScanResult
119
from freezegun import freeze_time
12-
from models.fhir.R4.nrl_fhir_document_reference import Attachment
13-
from models.nrl_sqs_message import NrlSqsMessage
1410
from models.pds_models import Patient
1511
from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository
1612
from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository
1713
from services.bulk_upload_service import BulkUploadService
1814
from tests.unit.conftest import (
19-
APIM_API_URL,
2015
MOCK_LG_BUCKET,
2116
MOCK_STAGING_STORE_BUCKET,
22-
NRL_SQS_URL,
2317
TEST_CURRENT_GP_ODS,
2418
)
2519
from tests.unit.helpers.data.bulk_upload.test_data import (
@@ -236,7 +230,7 @@ def test_handle_sqs_message_happy_path(
236230
mock_pds_validation_strict.assert_called()
237231
mock_report_upload_complete.assert_called()
238232
mock_remove_ingested_file_from_source_bucket.assert_called()
239-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
233+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_called()
240234

241235

242236
def test_handle_sqs_message_happy_path_single_file(
@@ -250,14 +244,6 @@ def test_handle_sqs_message_happy_path_single_file(
250244
mock_ods_validation,
251245
):
252246
TEST_STAGING_METADATA.retries = 0
253-
mock_nrl_attachment = Attachment(
254-
url=f"{APIM_API_URL}/DocumentReference/{SnomedCodes.LLOYD_GEORGE.value.code}~{TEST_DOCUMENT_REFERENCE.id}",
255-
)
256-
mock_nrl_message = NrlSqsMessage(
257-
nhs_number=TEST_STAGING_METADATA.nhs_number,
258-
action=NrlActionTypes.CREATE,
259-
attachment=mock_nrl_attachment,
260-
)
261247
mock_create_lg_records_and_copy_files = mocker.patch.object(
262248
BulkUploadService, "create_lg_records_and_copy_files"
263249
)
@@ -277,11 +263,7 @@ def test_handle_sqs_message_happy_path_single_file(
277263
)
278264
mock_report_upload_complete.assert_called()
279265
mock_remove_ingested_file_from_source_bucket.assert_called()
280-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_called_with(
281-
queue_url=NRL_SQS_URL,
282-
message=mock_nrl_message,
283-
group_id=f"nrl_sqs_{mock_uuid}",
284-
)
266+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_called()
285267

286268

287269
def set_up_mocks_for_non_ascii_files(
@@ -394,7 +376,7 @@ def test_handle_sqs_message_calls_report_upload_failure_when_patient_record_alre
394376
mock_report_upload_failure.assert_called_with(
395377
TEST_STAGING_METADATA, UploadStatus.FAILED, str(mocked_error), ""
396378
)
397-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
379+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called()
398380

399381

400382
def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invalid(
@@ -431,7 +413,7 @@ def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invali
431413
str(mocked_error),
432414
"",
433415
)
434-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
416+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called()
435417

436418

437419
def test_handle_sqs_message_report_failure_when_document_is_infected(
@@ -469,7 +451,7 @@ def test_handle_sqs_message_report_failure_when_document_is_infected(
469451
)
470452
mock_create_lg_records_and_copy_files.assert_not_called()
471453
mock_remove_ingested_file_from_source_bucket.assert_not_called()
472-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
454+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called()
473455

474456

475457
def test_handle_sqs_message_report_failure_when_document_not_exist(
@@ -499,7 +481,7 @@ def test_handle_sqs_message_report_failure_when_document_not_exist(
499481
"One or more of the files is not accessible from staging bucket",
500482
"Y12345",
501483
)
502-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
484+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called()
503485

504486

505487
def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_formally_deceased(
@@ -685,7 +667,7 @@ def test_handle_sqs_message_put_staging_metadata_back_to_queue_when_virus_scan_r
685667
mock_report_upload_failure.assert_not_called()
686668
mock_create_lg_records_and_copy_files.assert_not_called()
687669
mock_remove_ingested_file_from_source_bucket.assert_not_called()
688-
repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
670+
repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called()
689671

690672

691673
def test_handle_sqs_message_rollback_transaction_when_validation_pass_but_file_transfer_failed_halfway(
@@ -1039,7 +1021,6 @@ def test_handle_sqs_message_lenient_mode_happy_path(
10391021
mock_pds_validation_strict.assert_not_called()
10401022
mock_report_upload_complete.assert_called()
10411023
mock_remove_ingested_file_from_source_bucket.assert_called()
1042-
service.sqs_repository.send_message_to_nrl_fifo.assert_not_called()
10431024

10441025

10451026
def test_concatenate_acceptance_reason(repo_under_test):

0 commit comments

Comments
 (0)