Skip to content

Commit b38a2a8

Browse files
committed
VED-470: Stream content line by line so we can handle arbitrary file sizes.
1 parent da76129 commit b38a2a8

File tree

3 files changed

+202
-25
lines changed

3 files changed

+202
-25
lines changed

mesh_processor/poetry.lock

Lines changed: 117 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mesh_processor/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ boto3 = "~1.38.42"
1212
mypy-boto3-dynamodb = "^1.38.4"
1313
moto = {extras = ["s3"], version = "^5.1.8"}
1414
coverage = "^7.9.1"
15+
smart-open = {extras = ["s3"], version = "^7.3.0.post1"}
1516

1617
[build-system]
1718
requires = ["poetry-core"]

mesh_processor/src/converter.py

Lines changed: 84 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import logging
2-
3-
import boto3
42
import os
53

4+
import boto3
5+
from smart_open import open
66

77
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
88

@@ -31,38 +31,98 @@ def parse_header_value(header_value: str):
3131
return main_value, parsed_params
3232

3333

34+
def read_until_part_start(input_file, boundary):
35+
while line := input_file.readline():
36+
if line == b"--" + boundary + b"\r\n":
37+
return
38+
else:
39+
raise ValueError(f"Unexpected EOF")
40+
41+
42+
def read_headers_bytes(input_file):
43+
headers_bytes = b''
44+
while line := input_file.readline():
45+
if line == b"\r\n":
46+
return headers_bytes
47+
headers_bytes += line
48+
else:
49+
raise ValueError("Unexpected EOF")
50+
51+
52+
def read_part_headers(input_file):
53+
headers_bytes = read_headers_bytes(input_file)
54+
headers_str = headers_bytes.decode("utf-8")
55+
return parse_headers(headers_str)
56+
57+
58+
def stream_part_body(input_file, boundary, output_file):
59+
previous_line = None
60+
found_part_end = False
61+
while not found_part_end:
62+
if (line := input_file.readline()) is None:
63+
raise ValueError("Unexpected EOF")
64+
65+
if line == b"--" + boundary + b"\r\n":
66+
logger.warning("Found additional part which will not be processed")
67+
found_part_end = True
68+
if line == b"--" + boundary + b"--\r\n":
69+
found_part_end = True
70+
71+
if previous_line is not None:
72+
if found_part_end:
73+
# The final \r\n is part of the encapsulation boundary, so should not be included
74+
output_file.write(previous_line.rstrip(b'\r\n'))
75+
else:
76+
output_file.write(previous_line)
77+
78+
previous_line = line
79+
80+
81+
def transfer_multipart_content(bucket_name, file_key, boundary, filename):
82+
with open(
83+
f"s3://{bucket_name}/{file_key}",
84+
"rb",
85+
transport_params={"client": s3_client}
86+
) as input_file:
87+
read_until_part_start(input_file, boundary)
88+
89+
headers = read_part_headers(input_file)
90+
content_disposition = headers.get("Content-Disposition")
91+
if content_disposition:
92+
_, content_disposition_params = parse_header_value(content_disposition)
93+
filename = content_disposition_params.get("filename") or filename
94+
95+
with open(
96+
f"s3://{DESTINATION_BUCKET_NAME}/{filename}",
97+
"wb",
98+
transport_params={"client": s3_client}
99+
) as output_file:
100+
stream_part_body(input_file, boundary, output_file)
101+
102+
34103
def process_record(record):
35104
bucket_name = record["s3"]["bucket"]["name"]
36105
file_key = record["s3"]["object"]["key"]
37106
logger.info(f"Processing {file_key}")
38107

39-
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
40-
filename = response["Metadata"].get("mex-filename") or file_key
41-
# TODO - this will read everything into memory - look at streaming instead
42-
content = response["Body"].read().decode("utf-8")
43-
108+
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
44109
content_type = response['ContentType']
45110
media_type, content_type_params = parse_header_value(content_type)
111+
filename = response["Metadata"].get("mex-filename") or file_key
46112

47-
# Handle multipart content by parsing the filename and content from the first part
113+
# Handle multipart content by parsing the filename from headers and streaming the content from the first part
48114
if media_type.startswith("multipart/"):
49115
logger.info("Found multipart content")
50-
boundary = content_type_params["boundary"]
51-
parts = [
52-
part.lstrip(f"--{boundary}")
53-
for part in content.split(f"\r\n--{boundary}")
54-
if part.strip() != "" and part.strip() != "--"
55-
]
56-
if len(parts) > 1:
57-
logger.warning(f"Got {len(parts)} parts, but will only process the first")
58-
59-
headers_str, content = parts[0].split("\r\n\r\n", 1)
60-
headers = parse_headers(headers_str)
61-
content_disposition = headers["Content-Disposition"]
62-
_, content_disposition_params = parse_header_value(content_disposition)
63-
filename = content_disposition_params.get("filename") or filename
64-
65-
s3_client.put_object(Bucket=DESTINATION_BUCKET_NAME, Key=filename, Body=content.encode("utf-8"))
116+
boundary = content_type_params["boundary"].encode("utf-8")
117+
transfer_multipart_content(bucket_name, file_key, boundary, filename)
118+
else:
119+
s3_client.copy_object(
120+
Bucket=DESTINATION_BUCKET_NAME,
121+
CopySource={"Bucket": bucket_name, "Key": file_key},
122+
Key=filename
123+
)
124+
125+
logger.info(f"Transfer complete for {file_key}")
66126

67127

68128
def lambda_handler(event, _):

0 commit comments

Comments
 (0)