Skip to content

Commit 14da9a4

Browse files
[PRMT-71] - use stream
1 parent 0581475 commit 14da9a4

File tree

4 files changed

+296
-110
lines changed

4 files changed

+296
-110
lines changed

lambdas/enums/lambda_error.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,10 @@ def create_error_body(self, params: Optional[dict] = None) -> str:
374374
"err_code": "GMZ_5002",
375375
"message": "Failed to generate document manifest",
376376
}
377+
ZipCreationError = {
378+
"err_code": "GMZ_5003",
379+
"message": "Failed to generate zip",
380+
}
377381
"""
378382
Errors for Update Upload State lambda
379383
"""

lambdas/services/base/s3_service.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,17 @@ def list_all_objects(self, bucket_name: str) -> list[dict]:
160160
def get_file_size(self, s3_bucket_name: str, object_key: str) -> int:
161161
response = self.client.head_object(Bucket=s3_bucket_name, Key=object_key)
162162
return response.get("ContentLength", 0)
163+
164+
def get_object_stream(self, bucket: str, key: str):
165+
response = self.client.get_object(Bucket=bucket, Key=key)
166+
return response["Body"]
167+
168+
def upload_file_obj(self, file_obj, s3_bucket_name: str, file_key: str):
169+
try:
170+
self.client.upload_fileobj(file_obj, s3_bucket_name, file_key)
171+
logger.info(f"Uploaded file object to s3://{s3_bucket_name}/{file_key}")
172+
except ClientError as e:
173+
logger.error(
174+
f"Failed to upload file object to s3://{s3_bucket_name}/{file_key} - {e}"
175+
)
176+
raise
Lines changed: 115 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import io
12
import os
23
import shutil
3-
import tempfile
44
import zipfile
55

66
from botocore.exceptions import ClientError
@@ -20,44 +20,94 @@ class DocumentManifestZipService:
2020
def __init__(self, zip_trace: DocumentManifestZipTrace):
2121
self.s3_service = S3Service()
2222
self.dynamo_service = DynamoDBService()
23-
self.temp_output_dir = tempfile.mkdtemp()
24-
self.temp_downloads_dir = tempfile.mkdtemp()
23+
# self.temp_output_dir = tempfile.mkdtemp()
24+
# self.temp_downloads_dir = tempfile.mkdtemp()
2525
self.zip_trace_object = zip_trace
2626
self.zip_output_bucket = os.environ["ZIPPED_STORE_BUCKET_NAME"]
2727
self.zip_trace_table = os.environ["ZIPPED_STORE_DYNAMODB_NAME"]
2828
self.zip_file_name = f"patient-record-{zip_trace.job_id}.zip"
29-
self.zip_file_path = os.path.join(self.temp_output_dir, self.zip_file_name)
29+
# self.zip_file_path = os.path.join(self.temp_output_dir, self.zip_file_name)
3030

3131
def handle_zip_request(self):
3232
self.update_processing_status()
33-
self.download_documents_to_be_zipped()
34-
self.zip_files()
35-
self.upload_zip_file()
36-
self.cleanup_temp_files()
33+
# self.download_documents_to_be_zipped()
34+
# Stream and zip the documents in memory
35+
zip_buffer = self.stream_zip_documents()
36+
# Upload the zip file to S3
37+
self.upload_zip_file(zip_buffer)
38+
39+
# self.zip_files()
40+
# self.upload_zip_file()
41+
# self.cleanup_temp_files()
3742
self.update_dynamo_with_fields({"job_status", "zip_file_location"})
3843

39-
def download_documents_to_be_zipped(self):
40-
logger.info("Downloading documents to be zipped")
41-
documents = self.zip_trace_object.files_to_download
42-
for document_location, document_name in documents.items():
43-
self.download_file_from_s3(document_name, document_location)
44+
def stream_zip_documents(self) -> io.BytesIO:
45+
logger.info("Streaming and zipping documents in-memory")
46+
documents = self.zip_trace_object.files_to_download # Dict[str, str]
47+
48+
zip_buffer = io.BytesIO()
4449

45-
def download_file_from_s3(self, document_name: str, document_location: str):
46-
download_path = os.path.join(self.temp_downloads_dir, document_name)
47-
file_bucket, file_key = self.get_file_bucket_and_key(document_location)
4850
try:
49-
self.s3_service.download_file(file_bucket, file_key, download_path)
50-
except ClientError as e:
51+
with zipfile.ZipFile(
52+
zip_buffer, "w", compression=zipfile.ZIP_DEFLATED
53+
) as zipf:
54+
for document_location, document_name in documents.items():
55+
file_bucket, file_key = self.get_file_bucket_and_key(
56+
document_location
57+
)
58+
try:
59+
# Stream file from S3
60+
s3_object_stream = self.s3_service.get_object_stream(
61+
file_bucket, file_key
62+
)
63+
64+
with s3_object_stream as stream:
65+
with zipf.open(document_name, mode="w") as zip_member:
66+
shutil.copyfileobj(stream, zip_member, length=64 * 1024)
67+
68+
except ClientError as e:
69+
self.update_failed_status()
70+
msg = f"Failed to fetch S3 object {file_bucket}/{file_key}: {e}"
71+
logger.error(
72+
f"{LambdaError.ZipServiceClientError.to_str()} {msg}"
73+
)
74+
raise GenerateManifestZipException(
75+
status_code=500, error=LambdaError.ZipServiceClientError
76+
)
77+
78+
zip_buffer.seek(0)
79+
return zip_buffer
80+
81+
except Exception as e:
5182
self.update_failed_status()
52-
msg = f"{file_key} may reference missing file in s3 bucket: {file_bucket}"
53-
logger.error(
54-
f"{LambdaError.ZipServiceClientError.to_str()} {msg + str(e)}",
55-
{"Result": "Failed to create document manifest"},
56-
)
83+
if isinstance(e, GenerateManifestZipException):
84+
raise
85+
logger.error(f"ZIP creation failed: {e}", {"Result": "Failure"})
5786
raise GenerateManifestZipException(
58-
status_code=500, error=LambdaError.ZipServiceClientError
87+
status_code=500, error=LambdaError.ZipCreationError
5988
)
6089

90+
# def download_documents_to_be_zipped(self):
91+
# logger.info("Downloading documents to be zipped")
92+
# documents = self.zip_trace_object.files_to_download
93+
# for document_location, document_name in documents.items():
94+
# self.download_file_from_s3(document_name, document_location)
95+
#
96+
# def download_file_from_s3(self, document_name: str, document_location: str):
97+
# download_path = os.path.join(self.temp_downloads_dir, document_name)
98+
# file_bucket, file_key = self.get_file_bucket_and_key(document_location)
99+
# try:
100+
# self.s3_service.download_file(file_bucket, file_key, download_path)
101+
# except ClientError as e:
102+
# self.update_failed_status()
103+
# msg = f"{file_key} may reference missing file in s3 bucket: {file_bucket}"
104+
# logger.error(
105+
# f"{LambdaError.ZipServiceClientError.to_str()} {msg + str(e)}",
106+
# {"Result": "Failed to create document manifest"},
107+
# )
108+
# raise GenerateManifestZipException(
109+
# status_code=500, error=LambdaError.ZipServiceClientError
110+
61111
def get_file_bucket_and_key(self, file_location: str):
62112
try:
63113
file_bucket, file_key = file_location.replace("s3://", "").split("/", 1)
@@ -68,18 +118,19 @@ def get_file_bucket_and_key(self, file_location: str):
68118
"Failed to parse bucket from file location string"
69119
)
70120

71-
def upload_zip_file(self):
72-
logger.info("Uploading zip file to s3")
121+
def upload_zip_file(self, zip_buffer: io.BytesIO):
122+
zip_file_key = f"{self.zip_file_name}"
73123
self.zip_trace_object.zip_file_location = (
74-
f"s3://{self.zip_output_bucket}/{self.zip_file_name}"
124+
f"s3://{self.zip_output_bucket}/{zip_file_key}"
75125
)
76-
77126
try:
78-
self.s3_service.upload_file(
79-
file_name=self.zip_file_path,
80-
s3_bucket_name=self.zip_output_bucket,
81-
file_key=f"{self.zip_file_name}",
127+
self.s3_service.upload_file_obj(
128+
zip_buffer, self.zip_output_bucket, zip_file_key
82129
)
130+
logger.info(
131+
f"Successfully uploaded ZIP file to S3: s3://{self.zip_output_bucket}/{zip_file_key}"
132+
)
133+
83134
except ClientError as e:
84135
self.update_failed_status()
85136
logger.error(e, {"Result": "Failed to create document manifest"})
@@ -89,14 +140,35 @@ def upload_zip_file(self):
89140

90141
self.zip_trace_object.job_status = TraceStatus.COMPLETED
91142

92-
def zip_files(self):
93-
logger.info("Creating zip from files")
94-
with zipfile.ZipFile(self.zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
95-
for root, _, files in os.walk(self.temp_downloads_dir):
96-
for file in files:
97-
file_path = os.path.join(root, file)
98-
arc_name = os.path.relpath(file_path, self.temp_downloads_dir)
99-
zipf.write(file_path, arc_name)
143+
# def upload_zip_file(self):
144+
# logger.info("Uploading zip file to s3")
145+
# self.zip_trace_object.zip_file_location = (
146+
# f"s3://{self.zip_output_bucket}/{self.zip_file_name}"
147+
# )
148+
#
149+
# try:
150+
# self.s3_service.upload_file(
151+
# file_name=self.zip_file_path,
152+
# s3_bucket_name=self.zip_output_bucket,
153+
# file_key=f"{self.zip_file_name}",
154+
# )
155+
# except ClientError as e:
156+
# self.update_failed_status()
157+
# logger.error(e, {"Result": "Failed to create document manifest"})
158+
# raise GenerateManifestZipException(
159+
# status_code=500, error=LambdaError.ZipServiceClientError
160+
# )
161+
#
162+
# self.zip_trace_object.job_status = TraceStatus.COMPLETED
163+
164+
# def zip_files(self):
165+
# logger.info("Creating zip from files")
166+
# with zipfile.ZipFile(self.zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
167+
# for root, _, files in os.walk(self.temp_downloads_dir):
168+
# for file in files:
169+
# file_path = os.path.join(root, file)
170+
# arc_name = os.path.relpath(file_path, self.temp_downloads_dir)
171+
# zipf.write(file_path, arc_name)
100172

101173
def update_dynamo_with_fields(self, fields: set):
102174
logger.info("Writing zip trace to db")
@@ -116,6 +188,6 @@ def update_failed_status(self):
116188
self.zip_trace_object.job_status = TraceStatus.FAILED
117189
self.update_dynamo_with_fields({"job_status"})
118190

119-
def cleanup_temp_files(self):
120-
shutil.rmtree(self.temp_downloads_dir)
121-
shutil.rmtree(self.temp_output_dir)
191+
# def cleanup_temp_files(self):
192+
# shutil.rmtree(self.temp_downloads_dir)
193+
# shutil.rmtree(self.temp_output_dir)

0 commit comments

Comments
 (0)