Skip to content

Commit 9b780b1

Browse files
committed
Merge branch 'master' into VED-80-id-sync-sqs-infra
2 parents bc306e6 + 9d001c9 commit 9b780b1

File tree

4 files changed

+58
-7
lines changed

4 files changed

+58
-7
lines changed

mesh_processor/src/converter.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,19 @@ def transfer_multipart_content(bucket_name: str, file_key: str, boundary: bytes,
9898
if content_disposition:
9999
_, content_disposition_params = parse_header_value(content_disposition)
100100
filename = content_disposition_params.get("filename") or filename
101+
content_type = headers.get("Content-Type") or "application/octet-stream"
101102

102103
with open(
103104
f"s3://{DESTINATION_BUCKET_NAME}/streaming/{filename}",
104105
"wb",
105-
transport_params={"client": s3_client}
106+
transport_params={
107+
"client": s3_client,
108+
"client_kwargs": {
109+
"S3.Client.create_multipart_upload": {
110+
"ContentType": content_type
111+
}
112+
}
113+
}
106114
) as output_file:
107115
stream_part_body(input_file, boundary, output_file)
108116

@@ -133,6 +141,10 @@ def process_record(record: dict) -> None:
133141

134142
logger.info(f"Transfer complete for {file_key}")
135143

144+
move_file(bucket_name, file_key, bucket_name, f"archive/{file_key}")
145+
146+
logger.info(f"Archived {file_key}")
147+
136148

137149
def lambda_handler(event: dict, _context: dict) -> dict:
138150
success = True

mesh_processor/tests/test_converter.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,18 @@ def test_non_multipart_content_type(self):
4848
result = invoke_lambda("test-csv-file.csv")
4949
self.assertEqual(result["statusCode"], 200)
5050

51-
response = s3.get_object(Bucket="destination-bucket", Key="overridden-filename.csv")
52-
body = response["Body"].read().decode("utf-8")
51+
get_target_response = s3.get_object(Bucket="destination-bucket", Key="overridden-filename.csv")
52+
body = get_target_response["Body"].read().decode("utf-8")
5353
assert body == "some CSV content"
5454

55-
def test_non_multipart_content_type_no_mesh_metadata(self):
55+
with self.assertRaises(ClientError) as e:
56+
s3.head_object(Bucket="source-bucket", Key="test-csv-file.csv")
57+
self.assertEqual(e.exception.response["Error"]["Code"], "404")
58+
59+
head_archive_response = s3.head_object(Bucket="source-bucket", Key="archive/test-csv-file.csv")
60+
assert head_archive_response["ResponseMetadata"]["HTTPStatusCode"] == 200
61+
62+
def test_non_multipart_content_type_without_mesh_metadata(self):
5663
s3 = boto3.client("s3", region_name="eu-west-2")
5764
s3.put_object(
5865
Bucket="source-bucket",
@@ -142,8 +149,10 @@ def test_multipart_content_type(self):
142149
response = s3.get_object(Bucket="destination-bucket", Key="test-csv-file.csv")
143150
body = response["Body"].read().decode("utf-8")
144151
assert body == "some CSV content"
152+
content_type = response["ContentType"]
153+
assert content_type == "text/csv"
145154

146-
def test_multipart_content_type_without_filename_from_headers(self):
155+
def test_multipart_content_type_without_filename_in_headers(self):
147156
cases = [
148157
(
149158
"no filename in header",
@@ -187,6 +196,33 @@ def test_multipart_content_type_without_filename_from_headers(self):
187196
body = response["Body"].read().decode("utf-8")
188197
assert body == "some CSV content"
189198

199+
def test_multipart_content_type_without_content_type_in_headers(self):
200+
body = "\r\n".join([
201+
"",
202+
"--12345678",
203+
'Content-Disposition: form-data; name="File"; filename="test-csv-file.csv"',
204+
"",
205+
"some CSV content",
206+
"--12345678--",
207+
""
208+
])
209+
s3 = boto3.client("s3", region_name="eu-west-2")
210+
s3.put_object(
211+
Bucket="source-bucket",
212+
Key="test-dat-file.dat",
213+
Body=body.encode("utf-8"),
214+
ContentType="multipart/form-data; boundary=12345678",
215+
)
216+
217+
result = invoke_lambda("test-dat-file.dat")
218+
self.assertEqual(result["statusCode"], 200)
219+
220+
response = s3.get_object(Bucket="destination-bucket", Key="test-csv-file.csv")
221+
body = response["Body"].read().decode("utf-8")
222+
assert body == "some CSV content"
223+
content_type = response["ContentType"]
224+
assert content_type == "application/octet-stream"
225+
190226
def test_multipart_content_type_with_unix_line_endings(self):
191227
body = "\r\n".join([
192228
"",

recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def get_csv_content_dict_reader(file_key: str) -> DictReader:
1919
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
2020
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
2121
binary_io = response["Body"]
22-
text_io = TextIOWrapper(binary_io, encoding="utf-8")
22+
text_io = TextIOWrapper(binary_io, encoding="utf-8", newline="")
2323
return DictReader(text_io, delimiter="|")
2424

2525

terraform/mesh_processor.tf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ resource "aws_iam_policy" "mesh_processor_lambda_exec_policy" {
137137
"s3:GetObject",
138138
"s3:ListBucket",
139139
"s3:PutObject",
140-
"s3:CopyObject"
140+
"s3:CopyObject",
141+
"s3:DeleteObject"
141142
]
142143
Resource = [
143144
aws_s3_bucket.batch_data_source_bucket.arn,
@@ -213,6 +214,7 @@ resource "aws_lambda_function" "mesh_file_converter_lambda" {
213214
image_uri = module.mesh_processor_docker_image[0].image_uri
214215
architectures = ["x86_64"]
215216
timeout = 900
217+
memory_size = 1024
216218

217219
environment {
218220
variables = {
@@ -240,6 +242,7 @@ resource "aws_s3_bucket_notification" "mesh_datasources_lambda_notification" {
240242
lambda_function {
241243
lambda_function_arn = aws_lambda_function.mesh_file_converter_lambda[0].arn
242244
events = ["s3:ObjectCreated:*"]
245+
filter_prefix = "inbound/"
243246
}
244247
}
245248

0 commit comments

Comments
 (0)