Skip to content

Commit 2bec2b3

Browse files
[PRMT-70] Improve efficiency of LG retrieval (#644)
* [PRMT-70] - added some streaming logic * [PRMT-70] - updated tests * [PRMT-70] - reduced delay * [PRMT-70] - download_lloyd_george_files now downloads to memory * [PRMT-70] - updated code to not copy into buffer if there is only 1 file * [PRMT-70] - removed commented code * [PRMT-70] - commented test to try in env * [PRMT-70] - added logging * [PRMT-70] - remvoed optimization * [PRMT-70] - removed commented test * [PRMT-70] - removed logic for single file * [PRMT-70] - fixed tests * [PRMT-70] - merged with main * [PRMT-70] - addressed comments * [PRMT-70] - fixed typo * [PRMT-70] - addressed comments and fixxed tests * [PRMT-70] - merged main * [PRMT-70] - merged main and renamed variable * [PRMT-70] - fixed typo * [PRMT-70] - fixed typo * removed dupplciated * [PRMT-70]- added performance changes * PRMT-70 found out pikepdf is faster * [PRMT-70]- removed commented code * [PRMT-70]- added unit tests * [PRMT-70]- fixed comments * [PRMT-70]- fixed comments * [PRMT-70]- fixed test * [PRMT-70]- fixed test
1 parent b49f3fb commit 2bec2b3

12 files changed

+730
-709
lines changed

app/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ services:
1111
ports:
1212
- "${HOST_PORT}:${CONTAINER_PORT}"
1313
env_file:
14-
- .env
14+
- .env

app/src/helpers/requests/getLloydGeorgeRecord.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { JOB_STATUS } from '../../types/generic/downloadManifestJobStatus';
66
import { isRunningInCypress } from '../utils/isLocal';
77
import { StitchRecordError } from '../../types/generic/errors';
88

9-
export const DELAY_BETWEEN_POLLING_IN_SECONDS = isRunningInCypress() ? 0 : 10;
9+
export const DELAY_BETWEEN_POLLING_IN_SECONDS = isRunningInCypress() ? 0 : 3;
1010

1111
type Args = {
1212
nhsNumber: string;
@@ -29,10 +29,11 @@ const UnexpectedResponseMessage =
2929
async function getLloydGeorgeRecord(args: Args): Promise<LloydGeorgeStitchResult> {
3030
const postResponse = await requestStitchJob(args);
3131
let pendingCount = 0;
32-
while (pendingCount < 3) {
33-
if (postResponse !== JOB_STATUS.COMPLETED) {
32+
while (pendingCount < 10) {
33+
if (postResponse !== JOB_STATUS.COMPLETED || pendingCount > 0) {
3434
await waitForSeconds(DELAY_BETWEEN_POLLING_IN_SECONDS);
3535
}
36+
3637
const pollingResponse = await pollForPresignedUrl(args);
3738

3839
switch (pollingResponse?.jobStatus) {

lambdas/requirements/layers/requirements_core_lambda_layer.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ responses==0.23.1
1717
six==1.16.0
1818
types-PyYAML==6.0.12.11
1919
regex==2023.12.25
20+
pikepdf==8.4.0

lambdas/services/base/s3_service.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import io
12
from datetime import datetime, timedelta, timezone
23
from io import BytesIO
34
from typing import Any, Mapping
@@ -79,14 +80,6 @@ def create_download_presigned_url(self, s3_bucket_name: str, file_key: str):
7980
def download_file(self, s3_bucket_name: str, file_key: str, download_path: str):
8081
return self.client.download_file(s3_bucket_name, file_key, download_path)
8182

82-
def get_binary_file(self, s3_bucket_name: str, file_key: str):
83-
response = self.client.get_object(
84-
Bucket=s3_bucket_name,
85-
Key=file_key,
86-
)
87-
file = response["Body"].read()
88-
return file
89-
9083
def upload_file(self, file_name: str, s3_bucket_name: str, file_key: str):
9184
return self.client.upload_file(file_name, s3_bucket_name, file_key)
9285

@@ -172,21 +165,40 @@ def get_file_size(self, s3_bucket_name: str, object_key: str) -> int:
172165
response = self.client.head_object(Bucket=s3_bucket_name, Key=object_key)
173166
return response.get("ContentLength", 0)
174167

175-
def save_or_create_file(self, source_bucket: str, file_key: str, body: bytes):
176-
return self.client.put_object(
177-
Bucket=source_bucket, Key=file_key, Body=BytesIO(body)
178-
)
179-
180168
def get_object_stream(self, bucket: str, key: str):
181169
response = self.client.get_object(Bucket=bucket, Key=key)
182170
return response.get("Body")
183171

184-
def upload_file_obj(self, file_obj, s3_bucket_name: str, file_key: str):
172+
def stream_s3_object_to_memory(self, bucket: str, key: str) -> BytesIO:
173+
response = self.client.get_object(Bucket=bucket, Key=key)
174+
buf = BytesIO()
175+
for chunk in iter(lambda: response["Body"].read(64 * 1024), b""):
176+
buf.write(chunk)
177+
buf.seek(0)
178+
return buf
179+
180+
def upload_file_obj(
181+
self,
182+
file_obj: io.BytesIO,
183+
s3_bucket_name: str,
184+
file_key: str,
185+
extra_args: Mapping[str, Any] = None,
186+
):
185187
try:
186-
self.client.upload_fileobj(file_obj, s3_bucket_name, file_key)
188+
self.client.upload_fileobj(
189+
Fileobj=file_obj,
190+
Bucket=s3_bucket_name,
191+
Key=file_key,
192+
ExtraArgs=extra_args or {},
193+
)
187194
logger.info(f"Uploaded file object to s3://{s3_bucket_name}/{file_key}")
188195
except ClientError as e:
189196
logger.error(
190197
f"Failed to upload file object to s3://{s3_bucket_name}/{file_key} - {e}"
191198
)
192199
raise e
200+
201+
def save_or_create_file(self, source_bucket: str, file_key: str, body: bytes):
202+
return self.client.put_object(
203+
Bucket=source_bucket, Key=file_key, Body=BytesIO(body)
204+
)

lambdas/services/get_fhir_document_reference_service.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,11 @@ def create_document_reference_fhir_response(
104104
)
105105
if file_size < FileSize.MAX_FILE_SIZE:
106106
logger.info("File size is smaller than 8MB. Returning binary file.")
107-
binary_file = self.s3_service.get_binary_file(
108-
s3_bucket_name=bucket_name,
109-
file_key=file_location,
107+
s3_stream = self.s3_service.get_object_stream(
108+
bucket=bucket_name,
109+
key=file_location,
110110
)
111+
binary_file = s3_stream.read()
111112
base64_encoded_file = base64.b64encode(binary_file)
112113
document_details.data = base64_encoded_file
113114

lambdas/services/lloyd_george_generate_stitch_service.py

Lines changed: 96 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
import os
2-
import shutil
3-
import tempfile
42
import uuid
3+
from concurrent.futures import ThreadPoolExecutor
4+
from io import BytesIO
55
from urllib import parse
66

77
from botocore.exceptions import ClientError
88
from enums.lambda_error import LambdaError
99
from enums.trace_status import TraceStatus
1010
from models.document_reference import DocumentReference
1111
from models.stitch_trace import StitchTrace
12+
from pikepdf import Pdf
1213
from pypdf.errors import PyPdfError
1314
from services.base.s3_service import S3Service
1415
from services.document_service import DocumentService
15-
from services.pdf_stitch_service import stitch_pdf
1616
from utils.audit_logging_setup import LoggingService
1717
from utils.exceptions import NoAvailableDocument
1818
from utils.filename_utils import extract_page_number
1919
from utils.lambda_exceptions import LGStitchServiceException
2020
from utils.lloyd_george_validator import check_for_number_of_files_match_expected
21-
from utils.utilities import create_reference_id, get_file_key_from_s3_url
21+
from utils.utilities import get_file_key_from_s3_url
2222

2323
logger = LoggingService(__name__)
2424

@@ -33,70 +33,107 @@ def __init__(self, stitch_trace: StitchTrace):
3333

3434
self.s3_service = S3Service()
3535
self.document_service = DocumentService()
36-
self.temp_folder = tempfile.mkdtemp()
3736
self.stitch_trace_object = stitch_trace
3837
self.stitch_trace_table = os.environ.get("STITCH_METADATA_DYNAMODB_NAME")
3938
self.stitch_file_name = f"patient-record-{str(uuid.uuid4())}"
40-
self.stitch_file_path = os.path.join(self.temp_folder, self.stitch_file_name)
39+
self.combined_file_folder = "combined_files"
4140

4241
def handle_stitch_request(self):
4342
self.stitch_lloyd_george_record()
4443
self.update_stitch_job_complete()
4544

4645
def stitch_lloyd_george_record(self):
4746
try:
48-
all_lg_parts = self.get_documents_for_stitching()
49-
stitched_lg_record = stitch_pdf(all_lg_parts, self.temp_folder)
50-
filename_for_stitched_file = os.path.basename(stitched_lg_record)
47+
documents_for_stitching = self.get_lloyd_george_record_for_patient()
48+
if not documents_for_stitching:
49+
raise LGStitchServiceException(404, LambdaError.StitchNotFound)
5150

52-
self.stitch_trace_object.total_file_size_in_bytes = (
53-
self.get_total_file_size_in_bytes(all_lg_parts)
54-
)
55-
self.upload_stitched_lg_record(
56-
stitched_lg_record=stitched_lg_record,
57-
filename_on_bucket=f"combined_files/{filename_for_stitched_file}",
58-
)
59-
logger.audit_splunk_info(
60-
"User has viewed Lloyd George records",
61-
{"Result": "Successful viewing LG"},
62-
)
51+
if len(documents_for_stitching) == 1:
52+
document_to_stitch = documents_for_stitching[0]
53+
file_location = document_to_stitch.file_location
54+
file_s3_key = get_file_key_from_s3_url(file_location)
55+
56+
self.prepare_documents_for_stitching(documents_for_stitching)
57+
self.stitch_trace_object.total_file_size_in_bytes = (
58+
self.get_total_file_size_in_bytes(document=document_to_stitch)
59+
)
60+
self.stitch_trace_object.stitched_file_location = file_s3_key
61+
62+
else:
63+
filename_for_stitched_file = f"{self.stitch_file_name}.pdf"
64+
destination_key = (
65+
f"{self.combined_file_folder}/{filename_for_stitched_file}"
66+
)
67+
ordered_documents = self.prepare_documents_for_stitching(
68+
documents_for_stitching
69+
)
70+
stitched_lg_stream = self.stream_and_stitch_documents(ordered_documents)
71+
self.stitch_trace_object.total_file_size_in_bytes = (
72+
stitched_lg_stream.getbuffer().nbytes
73+
)
74+
75+
self.upload_stitched_lg_record(
76+
stitched_lg_stream=stitched_lg_stream,
77+
filename_on_bucket=destination_key,
78+
)
79+
80+
self.stitch_trace_object.stitched_file_location = destination_key
81+
82+
logger.audit_splunk_info(
83+
"User has viewed Lloyd George records",
84+
{"Result": "Successful viewing LG"},
85+
)
6386

6487
except (ClientError, PyPdfError, FileNotFoundError, NoAvailableDocument) as e:
6588
logger.error(
6689
f"{LambdaError.StitchClient.to_str()}: {str(e)}",
6790
{"Result": "Lloyd George stitching failed"},
6891
)
6992
raise LGStitchServiceException(500, LambdaError.StitchClient)
70-
finally:
71-
shutil.rmtree(self.temp_folder)
7293

73-
def get_documents_for_stitching(self):
74-
try:
75-
documents_for_stitching = self.get_lloyd_george_record_for_patient()
76-
if not documents_for_stitching:
77-
raise LGStitchServiceException(404, LambdaError.StitchNotFound)
94+
def fetch_pdf(self, doc: DocumentReference) -> Pdf:
95+
s3_key = get_file_key_from_s3_url(doc.file_location)
96+
stream = self.s3_service.stream_s3_object_to_memory(
97+
bucket=self.lloyd_george_bucket_name,
98+
key=s3_key,
99+
)
100+
stream.seek(0)
101+
return Pdf.open(stream)
102+
103+
def stream_and_stitch_documents(
104+
self, documents: list[DocumentReference]
105+
) -> BytesIO:
106+
output_pdf = Pdf.new()
107+
108+
with ThreadPoolExecutor(max_workers=5) as executor:
109+
futures = [executor.submit(self.fetch_pdf, doc) for doc in documents]
110+
111+
for future in futures:
112+
pdf = future.result()
113+
output_pdf.pages.extend(pdf.pages)
114+
pdf.close()
115+
116+
output_stream = BytesIO()
117+
output_pdf.save(output_stream)
118+
output_pdf.close()
119+
output_stream.seek(0)
120+
return output_stream
121+
122+
def prepare_documents_for_stitching(
123+
self, documents: list[DocumentReference]
124+
) -> list[DocumentReference]:
125+
self.update_trace_status(TraceStatus.PROCESSING)
126+
127+
if len(documents) == 1:
128+
sorted_docs = documents
129+
else:
130+
sorted_docs = self.sort_documents_by_filenames(documents)
131+
self.stitch_trace_object.number_of_files = len(sorted_docs)
132+
self.stitch_trace_object.file_last_updated = self.get_most_recent_created_date(
133+
sorted_docs
134+
)
78135

79-
self.update_trace_status(TraceStatus.PROCESSING)
80-
sorted_documents_for_stitching = self.sort_documents_by_filenames(
81-
documents_for_stitching
82-
)
83-
all_lg_parts = self.download_lloyd_george_files(
84-
sorted_documents_for_stitching
85-
)
86-
self.stitch_trace_object.number_of_files = len(documents_for_stitching)
87-
self.stitch_trace_object.file_last_updated = (
88-
self.get_most_recent_created_date(sorted_documents_for_stitching)
89-
)
90-
except ClientError as e:
91-
logger.error(
92-
f"{LambdaError.StitchNoService.to_str()}: {str(e)}",
93-
{"Result": "Lloyd George stitching failed"},
94-
)
95-
raise LGStitchServiceException(
96-
500,
97-
LambdaError.StitchNoService,
98-
)
99-
return all_lg_parts
136+
return sorted_docs
100137

101138
@staticmethod
102139
def sort_documents_by_filenames(
@@ -111,53 +148,39 @@ def sort_documents_by_filenames(
111148
)
112149
raise LGStitchServiceException(500, LambdaError.StitchValidation)
113150

114-
def download_lloyd_george_files(
115-
self,
116-
ordered_lg_records: list[DocumentReference],
117-
) -> list[str]:
118-
all_lg_parts = []
119-
120-
for lg_part in ordered_lg_records:
121-
file_location_on_s3 = lg_part.file_location
122-
s3_file_path = get_file_key_from_s3_url(file_location_on_s3)
123-
local_file_name = os.path.join(self.temp_folder, create_reference_id())
124-
self.s3_service.download_file(
125-
self.lloyd_george_bucket_name, s3_file_path, local_file_name
126-
)
127-
all_lg_parts.append(local_file_name)
128-
129-
return all_lg_parts
130-
131151
def upload_stitched_lg_record(
132-
self, stitched_lg_record: str, filename_on_bucket: str
152+
self, stitched_lg_stream: BytesIO, filename_on_bucket: str
133153
):
134154
try:
135155
extra_args = {
136156
"Tagging": parse.urlencode({self.lifecycle_policy_tag: "true"}),
137157
"ContentDisposition": "inline",
138158
"ContentType": "application/pdf",
139159
}
140-
self.s3_service.upload_file_with_extra_args(
141-
file_name=stitched_lg_record,
160+
self.s3_service.upload_file_obj(
161+
file_obj=stitched_lg_stream,
142162
s3_bucket_name=self.lloyd_george_bucket_name,
143163
file_key=filename_on_bucket,
144164
extra_args=extra_args,
145165
)
146-
self.stitch_trace_object.stitched_file_location = filename_on_bucket
166+
logger.info(
167+
f"Uploaded stitched file to {self.lloyd_george_bucket_name} with key {filename_on_bucket}"
168+
)
147169
except ValueError as e:
148170
logger.error(
149171
f"{LambdaError.StitchCloudFront.to_str()}: {str(e)}",
150-
{"Result": "Failed to format CloudFront URL due to invalid input."},
172+
{"Result": "Failed to format CloudFront URL."},
151173
)
152174
raise LGStitchServiceException(500, LambdaError.StitchCloudFront)
153175

154176
@staticmethod
155177
def get_most_recent_created_date(documents: list[DocumentReference]) -> str:
156178
return max(doc.created for doc in documents)
157179

158-
@staticmethod
159-
def get_total_file_size_in_bytes(filepaths: list[str]) -> int:
160-
return sum(os.path.getsize(filepath) for filepath in filepaths)
180+
def get_total_file_size_in_bytes(self, document: DocumentReference) -> int:
181+
bucket = document.s3_bucket_name
182+
key = document.s3_file_key
183+
return self.s3_service.get_file_size(bucket, key)
161184

162185
def update_stitch_job_complete(self):
163186
logger.info("Writing stitch trace to db")

0 commit comments

Comments
 (0)