Skip to content

Commit d22b0b9

Browse files
[PRMT-71] Improve efficiency of LG download (#637)
* [PRMT-71] - use stream * [PRMT-71] - removed commented code * [PRMT-71] - reduced delay * [PRMT-71] - fixed some comments * [PRMT-71] - fixed tests * [PRMT-71] - updated code to reflect comments * [PRMT-153] - removed commented code * [PRMT-71] - addressed comments * [PRMT-71] - forced table column word to stay together * [PRMT-71] - removed first variable * [PRMT-71] - added service tests * [PRMT-71] - removed comments
1 parent 4b2065f commit d22b0b9

File tree

8 files changed

+211
-139
lines changed

8 files changed

+211
-139
lines changed

app/src/helpers/requests/getPresignedUrlForZip.test.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ describe('getPresignedUrlForZip', () => {
8282
);
8383
});
8484

85-
it('wait for 10 secs before the 1st polling', async () => {
85+
it('does not wait before the 1st polling', async () => {
8686
mockedAxios.post.mockResolvedValueOnce(mockJobIdResponse);
8787
mockedAxios.get.mockResolvedValueOnce(mockCompletedResponse);
8888

@@ -92,11 +92,10 @@ describe('getPresignedUrlForZip', () => {
9292
baseUrl,
9393
});
9494

95-
expect(mockWaitForSeconds).toHaveBeenCalledTimes(1);
96-
expect(mockWaitForSeconds).toHaveBeenCalledWith(0);
95+
expect(mockWaitForSeconds).toHaveBeenCalledTimes(0);
9796
});
9897

99-
it('wait for 10 secs between every polling', async () => {
98+
it('wait between every polling', async () => {
10099
mockedAxios.post.mockResolvedValueOnce(mockJobIdResponse);
101100
mockedAxios.get
102101
.mockResolvedValueOnce(mockPendingResponse)
@@ -110,13 +109,20 @@ describe('getPresignedUrlForZip', () => {
110109
baseUrl,
111110
});
112111

113-
expect(mockWaitForSeconds).toHaveBeenCalledTimes(4);
112+
expect(mockWaitForSeconds).toHaveBeenCalledTimes(3);
114113
expect(mockWaitForSeconds).toHaveBeenCalledWith(0);
115114
});
116115

117-
it('throw an error if got pending status for 3 times', async () => {
116+
it('throw an error if got pending status for 10 times', async () => {
118117
mockedAxios.post.mockResolvedValueOnce(mockJobIdResponse);
119118
mockedAxios.get
119+
.mockResolvedValueOnce(mockPendingResponse)
120+
.mockResolvedValueOnce(mockPendingResponse)
121+
.mockResolvedValueOnce(mockPendingResponse)
122+
.mockResolvedValueOnce(mockPendingResponse)
123+
.mockResolvedValueOnce(mockPendingResponse)
124+
.mockResolvedValueOnce(mockPendingResponse)
125+
.mockResolvedValueOnce(mockPendingResponse)
120126
.mockResolvedValueOnce(mockPendingResponse)
121127
.mockResolvedValueOnce(mockPendingResponse)
122128
.mockResolvedValueOnce(mockPendingResponse);

app/src/helpers/requests/getPresignedUrlForZip.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import waitForSeconds from '../utils/waitForSeconds';
77
import { DownloadManifestError } from '../../types/generic/errors';
88
import { isRunningInCypress } from '../utils/isLocal';
99

10-
export const DELAY_BETWEEN_POLLING_IN_SECONDS = isRunningInCypress() ? 0 : 10;
10+
export const DELAY_BETWEEN_POLLING_IN_SECONDS = isRunningInCypress() ? 0 : 3;
1111

1212
type Args = {
1313
nhsNumber: string;
@@ -33,9 +33,10 @@ const getPresignedUrlForZip = async (args: Args) => {
3333

3434
const jobId = await requestJobId(args);
3535
let pendingCount = 0;
36-
37-
while (pendingCount < 3) {
38-
await waitForSeconds(DELAY_BETWEEN_POLLING_IN_SECONDS);
36+
while (pendingCount < 10) {
37+
if (pendingCount > 0) {
38+
await waitForSeconds(DELAY_BETWEEN_POLLING_IN_SECONDS);
39+
}
3940
const pollingResponse = await pollForPresignedUrl({
4041
baseUrl,
4142
baseHeaders,

app/src/styles/App.scss

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,11 @@ $hunit: '%';
666666
font-size: 1.2rem !important;
667667
}
668668
}
669+
@media (min-width: 380px) {
670+
#available-files-table-title .table-column-header {
671+
word-break: keep-all;
672+
}
673+
}
669674

670675
//Feedback
671676
#feedback-form {

lambdas/services/base/s3_service.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,17 @@ def save_or_create_file(self, source_bucket: str, file_key: str, body: bytes):
176176
return self.client.put_object(
177177
Bucket=source_bucket, Key=file_key, Body=BytesIO(body)
178178
)
179+
180+
def get_object_stream(self, bucket: str, key: str):
181+
response = self.client.get_object(Bucket=bucket, Key=key)
182+
return response.get("Body")
183+
184+
def upload_file_obj(self, file_obj, s3_bucket_name: str, file_key: str):
185+
try:
186+
self.client.upload_fileobj(file_obj, s3_bucket_name, file_key)
187+
logger.info(f"Uploaded file object to s3://{s3_bucket_name}/{file_key}")
188+
except ClientError as e:
189+
logger.error(
190+
f"Failed to upload file object to s3://{s3_bucket_name}/{file_key} - {e}"
191+
)
192+
raise e
Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os
22
import shutil
3-
import tempfile
43
import zipfile
4+
from contextlib import closing
5+
from io import BytesIO
56

67
from botocore.exceptions import ClientError
78
from enums.lambda_error import LambdaError
@@ -20,84 +21,77 @@ class DocumentManifestZipService:
2021
def __init__(self, zip_trace: DocumentManifestZipTrace):
2122
self.s3_service = S3Service()
2223
self.dynamo_service = DynamoDBService()
23-
self.temp_output_dir = tempfile.mkdtemp()
24-
self.temp_downloads_dir = tempfile.mkdtemp()
2524
self.zip_trace_object = zip_trace
2625
self.zip_output_bucket = os.environ["ZIPPED_STORE_BUCKET_NAME"]
2726
self.zip_trace_table = os.environ["ZIPPED_STORE_DYNAMODB_NAME"]
2827
self.zip_file_name = f"patient-record-{zip_trace.job_id}.zip"
29-
self.zip_file_path = os.path.join(self.temp_output_dir, self.zip_file_name)
3028

3129
def handle_zip_request(self):
32-
self.update_processing_status()
33-
self.download_documents_to_be_zipped()
34-
self.zip_files()
35-
self.upload_zip_file()
36-
self.cleanup_temp_files()
30+
self.update_status(TraceStatus.PROCESSING)
31+
zip_buffer = self.stream_zip_documents()
32+
self.upload_zip_file(zip_buffer)
3733
self.update_dynamo_with_fields({"job_status", "zip_file_location"})
3834

39-
def download_documents_to_be_zipped(self):
40-
logger.info("Downloading documents to be zipped")
35+
def stream_zip_documents(self) -> BytesIO:
36+
logger.info("Streaming and zipping documents in-memory")
4137
documents = self.zip_trace_object.files_to_download
42-
for document_location, document_name in documents.items():
43-
self.download_file_from_s3(document_name, document_location)
4438

45-
def download_file_from_s3(self, document_name: str, document_location: str):
46-
download_path = os.path.join(self.temp_downloads_dir, document_name)
47-
file_bucket, file_key = self.get_file_bucket_and_key(document_location)
48-
try:
49-
self.s3_service.download_file(file_bucket, file_key, download_path)
50-
except ClientError as e:
51-
self.update_failed_status()
52-
msg = f"{file_key} may reference missing file in s3 bucket: {file_bucket}"
53-
logger.error(
54-
f"{LambdaError.ZipServiceClientError.to_str()} {msg + str(e)}",
55-
{"Result": "Failed to create document manifest"},
56-
)
57-
raise GenerateManifestZipException(
58-
status_code=500, error=LambdaError.ZipServiceClientError
59-
)
39+
zip_buffer = BytesIO()
40+
41+
with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
42+
for document_location, document_name in documents.items():
43+
file_bucket, file_key = self.get_file_bucket_and_key(document_location)
44+
try:
45+
with closing(
46+
self.s3_service.get_object_stream(file_bucket, file_key)
47+
) as s3_object_stream:
48+
with zipf.open(document_name, mode="w") as zip_member:
49+
shutil.copyfileobj(
50+
s3_object_stream, zip_member, length=64 * 1024
51+
)
52+
except ClientError as e:
53+
self.update_status(TraceStatus.FAILED)
54+
msg = f"Failed to fetch S3 object {file_bucket}/{file_key}: {e}"
55+
logger.error(f"{LambdaError.ZipServiceClientError.to_str()} {msg}")
56+
raise GenerateManifestZipException(
57+
status_code=500, error=LambdaError.ZipServiceClientError
58+
)
59+
zip_buffer.seek(0)
60+
return zip_buffer
6061

6162
def get_file_bucket_and_key(self, file_location: str):
6263
try:
6364
file_bucket, file_key = file_location.replace("s3://", "").split("/", 1)
6465
return file_bucket, file_key
6566
except ValueError:
66-
self.update_failed_status()
67+
self.update_status(TraceStatus.FAILED)
6768
raise InvalidDocumentReferenceException(
6869
"Failed to parse bucket from file location string"
6970
)
7071

71-
def upload_zip_file(self):
72+
def upload_zip_file(self, zip_buffer: BytesIO):
7273
logger.info("Uploading zip file to s3")
74+
zip_file_key = f"{self.zip_file_name}"
7375
self.zip_trace_object.zip_file_location = (
74-
f"s3://{self.zip_output_bucket}/{self.zip_file_name}"
76+
f"s3://{self.zip_output_bucket}/{zip_file_key}"
7577
)
76-
7778
try:
78-
self.s3_service.upload_file(
79-
file_name=self.zip_file_path,
80-
s3_bucket_name=self.zip_output_bucket,
81-
file_key=f"{self.zip_file_name}",
79+
self.s3_service.upload_file_obj(
80+
zip_buffer, self.zip_output_bucket, zip_file_key
8281
)
82+
logger.info(
83+
f"Successfully uploaded ZIP file to S3: s3://{self.zip_output_bucket}/{zip_file_key}"
84+
)
85+
8386
except ClientError as e:
84-
self.update_failed_status()
87+
self.update_status(TraceStatus.FAILED)
8588
logger.error(e, {"Result": "Failed to create document manifest"})
8689
raise GenerateManifestZipException(
8790
status_code=500, error=LambdaError.ZipServiceClientError
8891
)
8992

9093
self.zip_trace_object.job_status = TraceStatus.COMPLETED
9194

92-
def zip_files(self):
93-
logger.info("Creating zip from files")
94-
with zipfile.ZipFile(self.zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
95-
for root, _, files in os.walk(self.temp_downloads_dir):
96-
for file in files:
97-
file_path = os.path.join(root, file)
98-
arc_name = os.path.relpath(file_path, self.temp_downloads_dir)
99-
zipf.write(file_path, arc_name)
100-
10195
def update_dynamo_with_fields(self, fields: set):
10296
logger.info("Writing zip trace to db")
10397
self.dynamo_service.update_item(
@@ -108,14 +102,6 @@ def update_dynamo_with_fields(self, fields: set):
108102
),
109103
)
110104

111-
def update_processing_status(self):
112-
self.zip_trace_object.job_status = TraceStatus.PROCESSING
105+
def update_status(self, job_status: TraceStatus):
106+
self.zip_trace_object.job_status = job_status
113107
self.update_dynamo_with_fields({"job_status"})
114-
115-
def update_failed_status(self):
116-
self.zip_trace_object.job_status = TraceStatus.FAILED
117-
self.update_dynamo_with_fields({"job_status"})
118-
119-
def cleanup_temp_files(self):
120-
shutil.rmtree(self.temp_downloads_dir)
121-
shutil.rmtree(self.temp_output_dir)

lambdas/services/pds_api_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def pds_request(self, nhs_number: str, retry_on_expired: bool):
4747
pds_response = self.session.get(
4848
url=url_endpoint, headers=authorization_header
4949
)
50-
logger.info("PDS Call Completed",{ "Event": "NDR-TR1" })
50+
logger.info("PDS Call Completed", {"Event": "NDR-TR1"})
5151

5252
if pds_response.status_code == 401 and retry_on_expired:
5353
return self.pds_request(nhs_number, retry_on_expired=False)

lambdas/tests/unit/services/base/test_s3_service.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ def test_save_or_create_file(mock_service, mock_client):
410410
assert kwargs["Bucket"] == MOCK_BUCKET
411411
assert kwargs["Key"] == TEST_FILE_NAME
412412
assert kwargs["Body"].read() == body
413-
413+
414414

415415
def test_returns_binary_file_content_when_file_exists(
416416
mock_service, mock_client, mocker
@@ -431,3 +431,44 @@ def test_raises_exception_when_file_does_not_exist(mock_service, mock_client):
431431
with pytest.raises(ClientError):
432432
mock_service.get_binary_file("test-bucket", "nonexistent-key")
433433

434+
435+
def test_get_object_stream_returns_body_stream(mock_service, mock_client, mocker):
436+
mock_stream = mocker.Mock(name="MockS3BodyStream")
437+
mock_client.get_object.return_value = {"Body": mock_stream}
438+
439+
result = mock_service.get_object_stream(bucket=MOCK_BUCKET, key=TEST_FILE_KEY)
440+
441+
assert result == mock_stream
442+
mock_client.get_object.assert_called_once_with(
443+
Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY
444+
)
445+
446+
447+
def test_upload_file_obj_success(mock_service, mock_client, mocker):
448+
mock_file_obj = mocker.Mock(name="MockFileObj")
449+
450+
mock_service.upload_file_obj(mock_file_obj, MOCK_BUCKET, TEST_FILE_KEY)
451+
452+
mock_client.upload_fileobj.assert_called_once_with(
453+
mock_file_obj, MOCK_BUCKET, TEST_FILE_KEY
454+
)
455+
456+
457+
def test_upload_file_obj_raises_client_error(mock_service, mock_client, mocker):
458+
mock_file_obj = mocker.Mock(name="MockFileObj")
459+
mock_client.upload_fileobj.side_effect = MOCK_CLIENT_ERROR
460+
461+
with pytest.raises(ClientError) as except_info:
462+
mock_service.upload_file_obj(mock_file_obj, MOCK_BUCKET, TEST_FILE_KEY)
463+
464+
mock_client.upload_fileobj.assert_called_once_with(
465+
mock_file_obj, MOCK_BUCKET, TEST_FILE_KEY
466+
)
467+
assert (
468+
except_info.value.response["Error"]["Code"]
469+
== MOCK_CLIENT_ERROR.response["Error"]["Code"]
470+
)
471+
assert (
472+
except_info.value.response["Error"]["Message"]
473+
== MOCK_CLIENT_ERROR.response["Error"]["Message"]
474+
)

0 commit comments

Comments
 (0)