Skip to content

Commit 8b2afe0

Browse files
committed
refactor extended attribute journeys
1 parent ac926db commit 8b2afe0

File tree

5 files changed

+88
-45
lines changed

5 files changed

+88
-45
lines changed

lambdas/filenameprocessor/src/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212

1313
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
14+
DPS_DESTINATION_BUCKET_NAME = os.getenv("DPS_DESTINATION_BUCKET_NAME")
1415
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
1516
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
1617
VALID_VERSIONS = ["V5"]

lambdas/filenameprocessor/src/file_name_processor.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
from uuid import uuid4
1111

1212
from audit_table import upsert_audit_table
13-
from common.aws_s3_utils import move_file
13+
from common.aws_s3_utils import move_file, move_file_outside_bucket
1414
from common.clients import STREAM_NAME, get_s3_client, logger
1515
from common.log_decorator import logging_decorator
1616
from common.models.errors import UnhandledAuditTableError
1717
from constants import (
18+
DPS_DESTINATION_BUCKET_NAME,
1819
ERROR_TYPE_TO_STATUS_CODE_MAP,
1920
EXTENDED_ATTRIBUTES_PREFIXES,
2021
SOURCE_BUCKET_NAME,
@@ -79,41 +80,49 @@ def handle_record(record) -> dict:
7980
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
8081

8182
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
82-
validate_extended_attributes_file_key(file_key)
83-
move_file(bucket_name, file_key, f"archive/{file_key}")
83+
queue_name = validate_extended_attributes_file_key(file_key)
84+
move_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
85+
86+
upsert_audit_table(
87+
message_id,
88+
file_key,
89+
created_at_formatted_string,
90+
expiry_timestamp,
91+
queue_name,
92+
FileStatus.PROCESSING,
93+
)
8494
else:
8595
vaccine_type, supplier = validate_batch_file_key(file_key)
96+
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
97+
queue_name = f"{supplier}_{vaccine_type}"
98+
upsert_audit_table(
99+
message_id,
100+
file_key,
101+
created_at_formatted_string,
102+
expiry_timestamp,
103+
queue_name,
104+
FileStatus.QUEUED,
105+
)
106+
make_and_send_sqs_message(
107+
file_key,
108+
message_id,
109+
permissions,
110+
vaccine_type,
111+
supplier,
112+
created_at_formatted_string,
113+
)
86114

87-
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
88-
queue_name = f"{supplier}_{vaccine_type}"
89-
upsert_audit_table(
90-
message_id,
91-
file_key,
92-
created_at_formatted_string,
93-
expiry_timestamp,
94-
queue_name,
95-
FileStatus.QUEUED,
96-
)
97-
make_and_send_sqs_message(
98-
file_key,
99-
message_id,
100-
permissions,
101-
vaccine_type,
102-
supplier,
103-
created_at_formatted_string,
104-
)
105-
106-
logger.info("Lambda invocation successful for file '%s'", file_key)
115+
logger.info("Lambda invocation successful for file '%s'", file_key)
107116

108-
# Return details for logs
109-
return {
110-
"statusCode": 200,
111-
"message": "Successfully sent to SQS for further processing",
112-
"file_key": file_key,
113-
"message_id": message_id,
114-
"vaccine_type": vaccine_type,
115-
"supplier": supplier,
116-
}
117+
# Return details for logs
118+
return {
119+
"statusCode": 200,
120+
"message": "Successfully sent to SQS for further processing",
121+
"file_key": file_key,
122+
"message_id": message_id,
123+
"vaccine_type": vaccine_type,
124+
"supplier": supplier,
125+
}
117126

118127
except ( # pylint: disable=broad-exception-caught
119128
VaccineTypePermissionsError,

lambdas/filenameprocessor/src/file_validation.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,18 @@ def is_valid_datetime(timestamp: str) -> bool:
3737
return True
3838

3939

40-
def validate_extended_attributes_file_key(file_key: str) -> tuple[str, str]:
40+
def validate_extended_attributes_file_key(file_key: str) -> str:
41+
"""
42+
Checks that all elements of the file key are valid, raises an exception otherwise.
43+
Returns a string containing the organization code and COVID vaccine type needed in the audit table.
44+
"""
4145
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
4246
raise InvalidFileKeyError("Initial file validation failed: invalid extended attributes file key format")
4347

48+
file_key_parts_without_extension, _ = split_file_key(file_key)
49+
organization_code = file_key_parts_without_extension[5]
50+
return f"{organization_code}_COVID"
51+
4452

4553
def validate_batch_file_key(file_key: str) -> tuple[str, str]:
4654
"""
@@ -51,20 +59,14 @@ def validate_batch_file_key(file_key: str) -> tuple[str, str]:
5159
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
5260
raise InvalidFileKeyError("Initial file validation failed: invalid file key format")
5361

54-
file_key = file_key.upper()
55-
file_name_and_extension = file_key.rsplit(".", 1)
56-
57-
if len(file_name_and_extension) != 2:
58-
raise InvalidFileKeyError("Initial file validation failed: missing file extension")
59-
60-
file_key_parts_without_extension = file_name_and_extension[0].split("_")
62+
file_key_parts_without_extension, file_name_and_extension = split_file_key(file_key)
6163

6264
vaccine_type = file_key_parts_without_extension[0]
6365
vaccination = file_key_parts_without_extension[1]
6466
version = file_key_parts_without_extension[2]
6567
ods_code = file_key_parts_without_extension[3]
6668
timestamp = file_key_parts_without_extension[4]
67-
extension = file_name_and_extension[1]
69+
extension = file_name_and_extension
6870
supplier = get_supplier_system_from_cache(ods_code)
6971

7072
valid_vaccine_types = get_valid_vaccine_types_from_cache()
@@ -81,3 +83,13 @@ def validate_batch_file_key(file_key: str) -> tuple[str, str]:
8183
raise InvalidFileKeyError("Initial file validation failed: invalid file key")
8284

8385
return vaccine_type, supplier
86+
87+
88+
def split_file_key(file_key: str) -> tuple[list[str], str]:
89+
file_key = file_key.upper()
90+
file_name_and_extension = file_key.rsplit(".", 1)
91+
92+
if len(file_name_and_extension) != 2:
93+
raise InvalidFileKeyError("Initial file validation failed: missing file extension")
94+
95+
return file_name_and_extension[0].split("_"), file_name_and_extension[1]

lambdas/filenameprocessor/tests/test_file_key_validation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from file_validation import (
1616
is_file_in_directory_root,
1717
is_valid_datetime,
18-
validate_file_key,
18+
validate_batch_file_key,
1919
)
2020
from models.errors import InvalidFileKeyError
2121

@@ -90,7 +90,7 @@ def test_validate_file_key(self, mock_get_redis_client):
9090
mock_redis.hkeys.return_value = ["FLU", "RSV"]
9191
mock_get_redis_client.return_value = mock_redis
9292

93-
self.assertEqual(validate_file_key(file_key), expected_result)
93+
self.assertEqual(validate_batch_file_key(file_key), expected_result)
9494
mock_redis.hkeys.assert_called_with("vacc_to_diseases")
9595
mock_redis.hget.assert_called_with("ods_code_to_supplier", ods_code)
9696

@@ -169,7 +169,7 @@ def test_validate_file_key_false(self, mock_get_redis_client):
169169
mock_get_redis_client.return_value = mock_redis
170170

171171
with self.assertRaises(InvalidFileKeyError) as context:
172-
validate_file_key(file_key)
172+
validate_batch_file_key(file_key)
173173
self.assertEqual(str(context.exception), expected_result)
174174
mock_redis.hkeys.assert_called_with("vacc_to_diseases")
175175

@@ -207,6 +207,6 @@ def test_validate_file_key_invalid(self, mock_get_redis_client):
207207
mock_get_redis_client.return_value = mock_redis
208208

209209
with self.assertRaises(InvalidFileKeyError) as context:
210-
validate_file_key(file_key)
210+
validate_batch_file_key(file_key)
211211
self.assertEqual(str(context.exception), expected_result)
212212
mock_redis.hkeys.assert_not_called()

lambdas/shared/src/common/aws_s3_utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
"""Non-imms Utility Functions"""
22

3+
import os
4+
35
from common.clients import get_s3_client, logger
46

7+
EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID")
8+
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
9+
510

611
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
712
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
@@ -13,3 +18,19 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
1318
)
1419
s3_client.delete_object(Bucket=bucket_name, Key=source_file_key)
1520
logger.info("File moved from %s to %s", source_file_key, destination_file_key)
21+
22+
23+
def move_file_outside_bucket(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
24+
s3_client = get_s3_client()
25+
s3_client.copy_object(
26+
CopySource={"Bucket": source_bucket, "Key": source_key},
27+
Bucket=destination_bucket,
28+
Key=destination_key,
29+
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
30+
ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
31+
)
32+
s3_client.delete_object(
33+
Bucket=source_bucket,
34+
Key=source_key,
35+
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
36+
)

0 commit comments

Comments
 (0)