Skip to content

Commit a0feb20

Browse files
authored
VED-901: Extended Attribute (#1015)
* Refactor extended attributes
1 parent 9685aa9 commit a0feb20

File tree

12 files changed

+441
-110
lines changed

12 files changed

+441
-110
lines changed

infrastructure/instance/file_name_processor.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ resource "aws_lambda_function" "file_processor_lambda" {
277277

278278
environment {
279279
variables = {
280+
ACCOUNT_ID = var.immunisation_account_id
280281
SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket
281282
ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket
282283
QUEUE_URL = aws_sqs_queue.batch_file_created.url

lambdas/filenameprocessor/src/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111
)
1212

1313
SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
14+
15+
# We have used an internal temporary bucket here and an acutal dps bucket will replace this
16+
DPS_DESTINATION_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
17+
EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID")
1418
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
1519
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
1620
VALID_VERSIONS = ["V5"]
1721

1822
VACCINE_TYPE_TO_DISEASES_HASH_KEY = "vacc_to_diseases"
1923
ODS_CODE_TO_SUPPLIER_SYSTEM_HASH_KEY = "ods_code_to_supplier"
24+
EXTENDED_ATTRIBUTES_FILE_PREFIX = "Vaccination_Extended_Attributes"
25+
DPS_DESTINATION_PREFIX = "dps_destination/"
2026

2127
ERROR_TYPE_TO_STATUS_CODE_MAP = {
2228
VaccineTypePermissionsError: 403,

lambdas/filenameprocessor/src/file_name_processor.py

Lines changed: 142 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,20 @@
1010
from uuid import uuid4
1111

1212
from audit_table import upsert_audit_table
13-
from common.aws_s3_utils import move_file
13+
from common.aws_s3_utils import move_file, move_file_to_external_bucket
1414
from common.clients import STREAM_NAME, get_s3_client, logger
1515
from common.log_decorator import logging_decorator
1616
from common.models.errors import UnhandledAuditTableError
1717
from constants import (
18+
DPS_DESTINATION_BUCKET_NAME,
19+
DPS_DESTINATION_PREFIX,
1820
ERROR_TYPE_TO_STATUS_CODE_MAP,
21+
EXTENDED_ATTRIBUTES_FILE_PREFIX,
1922
SOURCE_BUCKET_NAME,
2023
FileNotProcessedReason,
2124
FileStatus,
2225
)
23-
from file_validation import is_file_in_directory_root, validate_file_key
26+
from file_validation import is_file_in_directory_root, validate_batch_file_key, validate_extended_attributes_file_key
2427
from make_and_upload_ack_file import make_and_upload_the_ack_file
2528
from models.errors import (
2629
InvalidFileKeyError,
@@ -53,8 +56,6 @@ def handle_record(record) -> dict:
5356
"error": str(error),
5457
}
5558

56-
vaccine_type = "unknown"
57-
supplier = "unknown"
5859
expiry_timestamp = "unknown"
5960

6061
if bucket_name != SOURCE_BUCKET_NAME:
@@ -72,15 +73,103 @@ def handle_record(record) -> dict:
7273
message_id = "Message id was not created"
7374
created_at_formatted_string = "created_at_time not identified"
7475

76+
message_id = str(uuid4())
77+
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
78+
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
79+
80+
if file_key.startswith(EXTENDED_ATTRIBUTES_FILE_PREFIX):
81+
return handle_extended_attributes_file(
82+
file_key,
83+
bucket_name,
84+
message_id,
85+
created_at_formatted_string,
86+
expiry_timestamp,
87+
)
88+
else:
89+
return handle_batch_file(
90+
file_key,
91+
bucket_name,
92+
message_id,
93+
created_at_formatted_string,
94+
expiry_timestamp,
95+
)
96+
97+
98+
def get_file_status_for_error(error: Exception) -> str:
99+
"""Creates a file status based on the type of error that was thrown"""
100+
if isinstance(error, VaccineTypePermissionsError):
101+
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
102+
103+
return FileStatus.FAILED
104+
105+
106+
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
107+
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
108+
config and overarching design"""
75109
try:
76-
message_id = str(uuid4())
77-
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
78-
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
110+
if file_key.startswith(EXTENDED_ATTRIBUTES_FILE_PREFIX):
111+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
112+
logger.error(
113+
"Unable to process file %s due to unexpected bucket name %s",
114+
file_key,
115+
bucket_name,
116+
)
117+
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
118+
return {
119+
"statusCode": 500,
120+
"message": message,
121+
"file_key": file_key,
122+
"vaccine_supplier_info": extended_attribute_identifier,
123+
}
124+
else:
125+
vaccine_type, supplier = validate_batch_file_key(file_key)
126+
logger.error(
127+
"Unable to process file %s due to unexpected bucket name %s",
128+
file_key,
129+
bucket_name,
130+
)
131+
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
132+
133+
return {
134+
"statusCode": 500,
135+
"message": message,
136+
"file_key": file_key,
137+
"vaccine_type": vaccine_type,
138+
"supplier": supplier,
139+
}
79140

80-
vaccine_type, supplier = validate_file_key(file_key)
81-
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
141+
except Exception as error:
142+
logger.error(
143+
"Unable to process file due to unexpected bucket name %s and file key %s",
144+
bucket_name,
145+
file_key,
146+
)
147+
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
148+
149+
return {
150+
"statusCode": 500,
151+
"message": message,
152+
"file_key": file_key,
153+
"vaccine_type": "unknown",
154+
"supplier": "unknown",
155+
"error": str(error),
156+
}
82157

158+
159+
def handle_batch_file(
160+
file_key: str, bucket_name: str, message_id: str, created_at_formatted_string: str, expiry_timestamp: str
161+
) -> dict:
162+
"""
163+
Processes a single record for batch file.
164+
Returns a dictionary containing information to be included in the logs.
165+
"""
166+
vaccine_type = "unknown"
167+
supplier = "unknown"
168+
try:
169+
vaccine_type, supplier = validate_batch_file_key(file_key)
170+
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
83171
queue_name = f"{supplier}_{vaccine_type}"
172+
84173
upsert_audit_table(
85174
message_id,
86175
file_key,
@@ -97,19 +186,17 @@ def handle_record(record) -> dict:
97186
supplier,
98187
created_at_formatted_string,
99188
)
100-
101189
logger.info("Lambda invocation successful for file '%s'", file_key)
102190

103-
# Return details for logs
104191
return {
105192
"statusCode": 200,
106193
"message": "Successfully sent to SQS for further processing",
107194
"file_key": file_key,
108195
"message_id": message_id,
109196
"vaccine_type": vaccine_type,
110197
"supplier": supplier,
198+
"queue_name": queue_name,
111199
}
112-
113200
except ( # pylint: disable=broad-exception-caught
114201
VaccineTypePermissionsError,
115202
InvalidFileKeyError,
@@ -119,8 +206,8 @@ def handle_record(record) -> dict:
119206
) as error:
120207
logger.error("Error processing file '%s': %s", file_key, str(error))
121208

122-
queue_name = f"{supplier}_{vaccine_type}"
123209
file_status = get_file_status_for_error(error)
210+
queue_name = f"{supplier}_{vaccine_type}"
124211

125212
upsert_audit_table(
126213
message_id,
@@ -151,48 +238,61 @@ def handle_record(record) -> dict:
151238
}
152239

153240

154-
def get_file_status_for_error(error: Exception) -> str:
155-
"""Creates a file status based on the type of error that was thrown"""
156-
if isinstance(error, VaccineTypePermissionsError):
157-
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
158-
159-
return FileStatus.FAILED
160-
161-
162-
def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
163-
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
164-
config and overarching design"""
241+
def handle_extended_attributes_file(
242+
file_key: str, bucket_name: str, message_id: str, created_at_formatted_string: str, expiry_timestamp: str
243+
) -> dict:
244+
"""
245+
Processes a single record for extended attributes file.
246+
Returns a dictionary containing information to be included in the logs.
247+
"""
165248
try:
166-
vaccine_type, supplier = validate_file_key(file_key)
167-
logger.error(
168-
"Unable to process file %s due to unexpected bucket name %s",
169-
file_key,
170-
bucket_name,
249+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
250+
move_file_to_external_bucket(
251+
bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"{DPS_DESTINATION_PREFIX}{file_key}"
171252
)
172-
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
173253

254+
upsert_audit_table(
255+
message_id,
256+
file_key,
257+
created_at_formatted_string,
258+
expiry_timestamp,
259+
extended_attribute_identifier,
260+
FileStatus.PROCESSING,
261+
)
174262
return {
175-
"statusCode": 500,
176-
"message": message,
263+
"statusCode": 200,
264+
"message": "Extended Attributes file successfully processed",
177265
"file_key": file_key,
178-
"vaccine_type": vaccine_type,
179-
"supplier": supplier,
266+
"message_id": message_id,
267+
"queue_name": extended_attribute_identifier,
180268
}
269+
except ( # pylint: disable=broad-exception-caught
270+
VaccineTypePermissionsError,
271+
InvalidFileKeyError,
272+
UnhandledAuditTableError,
273+
UnhandledSqsError,
274+
Exception,
275+
) as error:
276+
logger.error("Error processing file '%s': %s", file_key, str(error))
181277

182-
except Exception as error:
183-
logger.error(
184-
"Unable to process file due to unexpected bucket name %s and file key %s",
185-
bucket_name,
278+
file_status = get_file_status_for_error(error)
279+
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
280+
281+
upsert_audit_table(
282+
message_id,
186283
file_key,
284+
created_at_formatted_string,
285+
expiry_timestamp,
286+
extended_attribute_identifier,
287+
file_status,
288+
error_details=str(error),
187289
)
188-
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"
189290

190291
return {
191292
"statusCode": 500,
192-
"message": message,
293+
"message": f"Failed to process extended attributes file {file_key} from bucket {bucket_name}",
193294
"file_key": file_key,
194-
"vaccine_type": "unknown",
195-
"supplier": "unknown",
295+
"message_id": message_id,
196296
"error": str(error),
197297
}
198298

lambdas/filenameprocessor/src/file_validation.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,20 @@ def is_valid_datetime(timestamp: str) -> bool:
3737
return True
3838

3939

40-
def validate_file_key(file_key: str) -> tuple[str, str]:
40+
def validate_extended_attributes_file_key(file_key: str) -> str:
41+
"""
42+
Checks that all elements of the file key are valid, raises an exception otherwise.
43+
Returns a string containing the organization code and COVID vaccine type needed in the audit table.
44+
"""
45+
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
46+
raise InvalidFileKeyError("Initial file validation failed: invalid extended attributes file key format")
47+
48+
file_key_parts_without_extension, _ = split_file_key(file_key)
49+
organization_code = file_key_parts_without_extension[5]
50+
return f"{organization_code}_COVID"
51+
52+
53+
def validate_batch_file_key(file_key: str) -> tuple[str, str]:
4154
"""
4255
Checks that all elements of the file key are valid, raises an exception otherwise.
4356
Returns a tuple containing the vaccine_type and supplier (both converted to upper case).
@@ -46,20 +59,14 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
4659
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
4760
raise InvalidFileKeyError("Initial file validation failed: invalid file key format")
4861

49-
file_key = file_key.upper()
50-
file_name_and_extension = file_key.rsplit(".", 1)
51-
52-
if len(file_name_and_extension) != 2:
53-
raise InvalidFileKeyError("Initial file validation failed: missing file extension")
54-
55-
file_key_parts_without_extension = file_name_and_extension[0].split("_")
62+
file_key_parts_without_extension, file_name_and_extension = split_file_key(file_key)
5663

5764
vaccine_type = file_key_parts_without_extension[0]
5865
vaccination = file_key_parts_without_extension[1]
5966
version = file_key_parts_without_extension[2]
6067
ods_code = file_key_parts_without_extension[3]
6168
timestamp = file_key_parts_without_extension[4]
62-
extension = file_name_and_extension[1]
69+
extension = file_name_and_extension
6370
supplier = get_supplier_system_from_cache(ods_code)
6471

6572
valid_vaccine_types = get_valid_vaccine_types_from_cache()
@@ -76,3 +83,13 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
7683
raise InvalidFileKeyError("Initial file validation failed: invalid file key")
7784

7885
return vaccine_type, supplier
86+
87+
88+
def split_file_key(file_key: str) -> tuple[list[str], str]:
89+
file_key = file_key.upper()
90+
file_name_and_extension = file_key.rsplit(".", 1)
91+
92+
if len(file_name_and_extension) != 2:
93+
raise InvalidFileKeyError("Initial file validation failed: missing file extension")
94+
95+
return file_name_and_extension[0].split("_"), file_name_and_extension[1]

0 commit comments

Comments
 (0)