Skip to content

Commit da76129

Browse files
committed
VED-470: Handle receiving multipart content from MESH.
1 parent 890ead7 commit da76129

File tree

4 files changed

+254
-82
lines changed

4 files changed

+254
-82
lines changed

mesh_processor/poetry.lock

Lines changed: 22 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mesh_processor/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ packages = [{include = "src"}]
1010
python = "~3.10"
1111
boto3 = "~1.38.42"
1212
mypy-boto3-dynamodb = "^1.38.4"
13-
moto = "~5.1.6"
13+
moto = {extras = ["s3"], version = "^5.1.8"}
1414
coverage = "^7.9.1"
1515

1616
[build-system]

mesh_processor/src/converter.py

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,84 @@
1+
import logging
2+
13
import boto3
24
import os
35

46

5-
def lambda_handler(event, context):
6-
s3 = boto3.client('s3')
7+
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
78

8-
# Destination bucket name
9-
destination_bucket = os.getenv("Destination_BUCKET_NAME")
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger()
1011

11-
for record in event["Records"]:
12-
bucket_name = record["s3"]["bucket"]["name"]
13-
file_key = record["s3"]["object"]["key"]
14-
copy_source = {
15-
'Bucket': record["s3"]["bucket"]["name"],
16-
'Key': record["s3"]["object"]["key"]
17-
}
12+
s3_client = boto3.client('s3')
1813

19-
# Read the .dat file from S3
20-
dat_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
2114

22-
# Update the filename from Metadata
23-
file_name = ensure_dat_extension(dat_obj['Metadata'].get('mex-filename', None))
15+
def parse_headers(headers_str: str):
16+
headers = dict(
17+
header_str.split(":", 1)
18+
for header_str in headers_str.split("\r\n")
19+
if ":" in header_str
20+
)
21+
return {k.strip(): v.strip() for k, v in headers.items()}
2422

25-
s3.copy_object(CopySource=copy_source, Bucket=destination_bucket, Key=file_name)
2623

27-
return {
28-
'statusCode': 200,
29-
'body': 'Files converted and uploaded successfully!'
30-
}
24+
def parse_header_value(header_value: str):
25+
main_value, *params = header_value.split(";")
26+
parsed_params = dict(
27+
param.strip().split("=", 1)
28+
for param in params
29+
)
30+
parsed_params = {k: v.strip('"') for k, v in parsed_params.items()}
31+
return main_value, parsed_params
32+
33+
34+
def process_record(record):
35+
bucket_name = record["s3"]["bucket"]["name"]
36+
file_key = record["s3"]["object"]["key"]
37+
logger.info(f"Processing {file_key}")
38+
39+
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
40+
filename = response["Metadata"].get("mex-filename") or file_key
41+
# TODO - this will read everything into memory - look at streaming instead
42+
content = response["Body"].read().decode("utf-8")
3143

44+
content_type = response['ContentType']
45+
media_type, content_type_params = parse_header_value(content_type)
3246

33-
def ensure_dat_extension(file_name):
34-
if '.' in file_name:
35-
# Split the filename and extension
36-
base_name, extension = file_name.rsplit('.', 1)
47+
# Handle multipart content by parsing the filename and content from the first part
48+
if media_type.startswith("multipart/"):
49+
logger.info("Found multipart content")
50+
boundary = content_type_params["boundary"]
51+
parts = [
52+
part.lstrip(f"--{boundary}")
53+
for part in content.split(f"\r\n--{boundary}")
54+
if part.strip() != "" and part.strip() != "--"
55+
]
56+
if len(parts) > 1:
57+
logger.warning(f"Got {len(parts)} parts, but will only process the first")
3758

38-
# Check if the extension is not 'dat'
39-
if extension != 'dat':
40-
file_name = f"{base_name}.dat"
41-
else:
42-
file_name += '.dat'
59+
headers_str, content = parts[0].split("\r\n\r\n", 1)
60+
headers = parse_headers(headers_str)
61+
content_disposition = headers["Content-Disposition"]
62+
_, content_disposition_params = parse_header_value(content_disposition)
63+
filename = content_disposition_params.get("filename") or filename
4364

44-
return file_name
65+
s3_client.put_object(Bucket=DESTINATION_BUCKET_NAME, Key=filename, Body=content.encode("utf-8"))
66+
67+
68+
def lambda_handler(event, _):
69+
success = True
70+
71+
for record in event["Records"]:
72+
try:
73+
process_record(record)
74+
except Exception:
75+
logger.exception("Failed to process record")
76+
success = False
77+
78+
return {
79+
'statusCode': 200,
80+
'body': 'Files converted and uploaded successfully!'
81+
} if success else {
82+
'statusCode': 500,
83+
'body': 'Errors occurred during processing'
84+
}

0 commit comments

Comments
 (0)