Skip to content

Commit 508f8fc

Browse files
authored
VED-470: Handle receiving multipart content from MESH. (#657)
* VED-470: Handle receiving multipart content from MESH. * VED-470: Stream content line by line so we can handle arbitrary file sizes. * VED-470: Tidy up. Add test. Add type hints. * VED-470: Resolve Sonar warnings. Remove redundant multipart handling in record processor. Add missing TF changes. * VED-470: Stream records in record processor instead of loading everything into memory. * VED-470: Handle some extra cases. Fix EOF check. Upgrade to Python 3.11. * VED-470: Fix lint errors. * VED-470: Consolidate tests.
1 parent d969d05 commit 508f8fc

19 files changed

+578
-274
lines changed

.github/workflows/sonarcloud.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ jobs:
107107
id: meshprocessor
108108
continue-on-error: true
109109
run: |
110-
poetry env use 3.10
110+
poetry env use 3.11
111111
poetry install
112112
poetry run coverage run -m unittest discover || echo "mesh_processor tests failed" >> ../failed_tests.txt
113113
poetry run coverage xml -o ../mesh_processor-coverage.xml

mesh_processor/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM public.ecr.aws/lambda/python:3.10 AS base
1+
FROM public.ecr.aws/lambda/python:3.11 AS base
22

33
# Create a non-root user with a specific UID and GID
44
RUN mkdir -p /home/appuser && \

mesh_processor/poetry.lock

Lines changed: 139 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mesh_processor/pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ readme = "README.md"
77
packages = [{include = "src"}]
88

99
[tool.poetry.dependencies]
10-
python = "~3.10"
10+
python = "~3.11"
1111
boto3 = "~1.38.42"
1212
mypy-boto3-dynamodb = "^1.38.4"
13-
moto = "~5.1.6"
13+
moto = {extras = ["s3"], version = "^5.1.8"}
1414
coverage = "^7.9.1"
15+
smart-open = {extras = ["s3"], version = "^7.3.0.post1"}
1516

1617
[build-system]
1718
requires = ["poetry-core"]

mesh_processor/src/converter.py

Lines changed: 139 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,153 @@
1-
import boto3
1+
import logging
22
import os
3+
from typing import BinaryIO
34

5+
import boto3
6+
from smart_open import open
47

5-
def lambda_handler(event, context):
6-
s3 = boto3.client('s3')
8+
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
79

8-
# Destination bucket name
9-
destination_bucket = os.getenv("Destination_BUCKET_NAME")
10+
logging.basicConfig(level=logging.INFO)
11+
logger = logging.getLogger()
1012

11-
for record in event["Records"]:
12-
bucket_name = record["s3"]["bucket"]["name"]
13-
file_key = record["s3"]["object"]["key"]
14-
copy_source = {
15-
'Bucket': record["s3"]["bucket"]["name"],
16-
'Key': record["s3"]["object"]["key"]
17-
}
13+
s3_client = boto3.client('s3')
1814

19-
# Read the .dat file from S3
20-
dat_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
2115

22-
# Update the filename from Metadata
23-
file_name = ensure_dat_extension(dat_obj['Metadata'].get('mex-filename', None))
16+
def parse_headers(headers_str: str) -> dict[str, str]:
17+
headers = dict(
18+
header_str.split(":", 1)
19+
for header_str in headers_str.split("\r\n")
20+
if ":" in header_str
21+
)
22+
return {k.strip(): v.strip() for k, v in headers.items()}
2423

25-
s3.copy_object(CopySource=copy_source, Bucket=destination_bucket, Key=file_name)
2624

27-
return {
28-
'statusCode': 200,
29-
'body': 'Files converted and uploaded successfully!'
30-
}
25+
def parse_header_value(header_value: str) -> tuple[str, dict[str, str]]:
26+
main_value, *params = header_value.split(";")
27+
parsed_params = dict(
28+
param.strip().split("=", 1)
29+
for param in params
30+
)
31+
parsed_params = {k: v.strip('"') for k, v in parsed_params.items()}
32+
return main_value, parsed_params
33+
34+
35+
def read_until_part_start(input_file: BinaryIO, boundary: bytes) -> None:
36+
while line := input_file.readline():
37+
if line == b"--" + boundary + b"\r\n":
38+
return
39+
raise ValueError("Unexpected EOF")
40+
41+
42+
def read_headers_bytes(input_file: BinaryIO) -> bytes:
43+
headers_bytes = b''
44+
while line := input_file.readline():
45+
if line == b"\r\n":
46+
return headers_bytes
47+
headers_bytes += line
48+
raise ValueError("Unexpected EOF")
49+
50+
51+
def read_part_headers(input_file: BinaryIO) -> dict[str, str]:
52+
headers_bytes = read_headers_bytes(input_file)
53+
headers_str = headers_bytes.decode("utf-8")
54+
return parse_headers(headers_str)
55+
56+
57+
def stream_part_body(input_file: BinaryIO, boundary: bytes, output_file: BinaryIO) -> None:
58+
previous_line = None
59+
found_part_end = False
60+
while line := input_file.readline():
61+
if line == b"--" + boundary + b"\r\n":
62+
logger.warning("Found additional part which will not be processed")
63+
found_part_end = True
64+
if line.startswith(b"--" + boundary + b"--"):
65+
found_part_end = True
66+
67+
if previous_line is not None:
68+
if found_part_end:
69+
# The final \r\n is part of the encapsulation boundary, so should not be included
70+
output_file.write(previous_line.rstrip(b'\r\n'))
71+
return
72+
else:
73+
output_file.write(previous_line)
74+
75+
previous_line = line
76+
raise ValueError("Unexpected EOF")
3177

3278

33-
def ensure_dat_extension(file_name):
34-
if '.' in file_name:
35-
# Split the filename and extension
36-
base_name, extension = file_name.rsplit('.', 1)
79+
def move_file(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
80+
s3_client.copy_object(
81+
CopySource={"Bucket": source_bucket, "Key": source_key},
82+
Bucket=destination_bucket,
83+
Key=destination_key
84+
)
85+
s3_client.delete_object(Bucket=source_bucket, Key=source_key)
3786

38-
# Check if the extension is not 'dat'
39-
if extension != 'dat':
40-
file_name = f"{base_name}.dat"
87+
88+
def transfer_multipart_content(bucket_name: str, file_key: str, boundary: bytes, filename: str) -> None:
89+
with open(
90+
f"s3://{bucket_name}/{file_key}",
91+
"rb",
92+
transport_params={"client": s3_client}
93+
) as input_file:
94+
read_until_part_start(input_file, boundary)
95+
96+
headers = read_part_headers(input_file)
97+
content_disposition = headers.get("Content-Disposition")
98+
if content_disposition:
99+
_, content_disposition_params = parse_header_value(content_disposition)
100+
filename = content_disposition_params.get("filename") or filename
101+
102+
with open(
103+
f"s3://{DESTINATION_BUCKET_NAME}/streaming/{filename}",
104+
"wb",
105+
transport_params={"client": s3_client}
106+
) as output_file:
107+
stream_part_body(input_file, boundary, output_file)
108+
109+
move_file(DESTINATION_BUCKET_NAME, f"streaming/{filename}", DESTINATION_BUCKET_NAME, filename)
110+
111+
112+
def process_record(record: dict) -> None:
113+
bucket_name = record["s3"]["bucket"]["name"]
114+
file_key = record["s3"]["object"]["key"]
115+
logger.info(f"Processing {file_key}")
116+
117+
response = s3_client.head_object(Bucket=bucket_name, Key=file_key)
118+
content_type = response['ContentType']
119+
media_type, content_type_params = parse_header_value(content_type)
120+
filename = response["Metadata"].get("mex-filename") or file_key
121+
122+
# Handle multipart content by parsing the filename from headers and streaming the content from the first part
123+
if media_type.startswith("multipart/"):
124+
logger.info("Found multipart content")
125+
boundary = content_type_params["boundary"].encode("utf-8")
126+
transfer_multipart_content(bucket_name, file_key, boundary, filename)
41127
else:
42-
file_name += '.dat'
128+
s3_client.copy_object(
129+
Bucket=DESTINATION_BUCKET_NAME,
130+
CopySource={"Bucket": bucket_name, "Key": file_key},
131+
Key=filename
132+
)
133+
134+
logger.info(f"Transfer complete for {file_key}")
43135

44-
return file_name
136+
137+
def lambda_handler(event: dict, _context: dict) -> dict:
138+
success = True
139+
140+
for record in event["Records"]:
141+
try:
142+
process_record(record)
143+
except Exception:
144+
logger.exception("Failed to process record")
145+
success = False
146+
147+
return {
148+
'statusCode': 200,
149+
'body': 'Files converted and uploaded successfully!'
150+
} if success else {
151+
'statusCode': 500,
152+
'body': 'Errors occurred during processing'
153+
}

0 commit comments

Comments
 (0)