Skip to content

Commit d0f081f

Browse files
committed
add upserts for EA files
1 parent 817b831 commit d0f081f

File tree

6 files changed

+64
-11
lines changed

6 files changed

+64
-11
lines changed

lambdas/filenameprocessor/src/audit_table.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def upsert_audit_table(
1515
queue_name: str,
1616
file_status: str,
1717
error_details: Optional[str] = None,
18+
condition_expression: str = ""
1819
) -> None:
1920
"""
2021
Updates the audit table with the file details
@@ -36,7 +37,7 @@ def upsert_audit_table(
3637
dynamodb_client.put_item(
3738
TableName=AUDIT_TABLE_NAME,
3839
Item=audit_item,
39-
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
40+
ConditionExpression=condition_expression,
4041
)
4142
logger.info(
4243
"%s file, with message id %s, successfully added to audit table",

lambdas/filenameprocessor/src/file_name_processor.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@
77
"""
88

99
import argparse
10+
from botocore.exceptions import ClientError
1011
from uuid import uuid4
1112

1213
from audit_table import upsert_audit_table
13-
from common.aws_s3_utils import move_file, move_file_outside_bucket
14+
from common.aws_s3_utils import (
15+
copy_file_outside_bucket,
16+
delete_file,
17+
is_file_in_bucket,
18+
move_file,
19+
)
1420
from common.clients import STREAM_NAME, get_s3_client, logger
1521
from common.log_decorator import logging_decorator
1622
from common.models.errors import UnhandledAuditTableError
@@ -174,6 +180,7 @@ def handle_batch_file(file_key, bucket_name, message_id, created_at_formatted_st
174180
expiry_timestamp,
175181
queue_name,
176182
FileStatus.QUEUED,
183+
condition_expression="attribute_not_exists(message_id)", # Prevents accidental overwrites
177184
)
178185
make_and_send_sqs_message(
179186
file_key,
@@ -242,9 +249,17 @@ def handle_extended_attributes_file(
242249
Processes a single record for extended attributes file.
243250
Returns a dictionary containing information to be included in the logs.
244251
"""
252+
253+
# here: the sequence of events should be
254+
# 1. upsert 'processing'
255+
# 2. move the file to the dest bucket
256+
# 3. check the file is present in the dest bucket
257+
# 4. if it is, delete it from the src bucket, upsert 'processed'
258+
# 5. if it isn't, move it to the archive/ folder, upsert 'failed'
259+
# NB for this to work we have to retool upsert so it accepts overwrites, i.e. ignore the ConditionExpression
260+
245261
try:
246262
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
247-
move_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
248263
queue_name = extended_attribute_identifier
249264

250265
upsert_audit_table(
@@ -255,14 +270,30 @@ def handle_extended_attributes_file(
255270
queue_name,
256271
FileStatus.PROCESSING,
257272
)
273+
274+
copy_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
275+
is_file_in_bucket(DPS_DESTINATION_BUCKET_NAME, file_key)
276+
delete_file(DPS_DESTINATION_BUCKET_NAME, file_key)
277+
278+
upsert_audit_table(
279+
message_id,
280+
file_key,
281+
created_at_formatted_string,
282+
expiry_timestamp,
283+
queue_name,
284+
FileStatus.PROCESSED,
285+
)
286+
258287
return {
259288
"statusCode": 200,
260289
"message": "Extended Attributes file successfully processed",
261290
"file_key": file_key,
262291
"message_id": message_id,
263292
"queue_name": queue_name,
264293
}
294+
265295
except ( # pylint: disable=broad-exception-caught
296+
ClientError,
266297
VaccineTypePermissionsError,
267298
InvalidFileKeyError,
268299
UnhandledAuditTableError,
@@ -275,13 +306,16 @@ def handle_extended_attributes_file(
275306
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
276307
queue_name = extended_attribute_identifier
277308

309+
# Move file to archive
310+
move_file(bucket_name, file_key, f"archive/{file_key}")
311+
278312
upsert_audit_table(
279313
message_id,
280314
file_key,
281315
created_at_formatted_string,
282316
expiry_timestamp,
283317
extended_attribute_identifier,
284-
file_status,
318+
queue_name,
285319
error_details=str(error),
286320
)
287321

lambdas/filenameprocessor/tests/test_audit_table.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def test_upsert_audit_table_with_duplicate_message_id_raises_exception(self):
7373
queue_name=ravs_rsv_test_file.queue_name,
7474
file_status=FileStatus.PROCESSED,
7575
expiry_timestamp=ravs_rsv_test_file.expires_at,
76+
condition_expression="attribute_not_exists(message_id)",
7677
)
7778

7879
assert_audit_table_entry(ravs_rsv_test_file, FileStatus.PROCESSED)
@@ -85,4 +86,5 @@ def test_upsert_audit_table_with_duplicate_message_id_raises_exception(self):
8586
queue_name=ravs_rsv_test_file.queue_name,
8687
file_status=FileStatus.PROCESSED,
8788
expiry_timestamp=ravs_rsv_test_file.expires_at,
89+
condition_expression="attribute_not_exists(message_id)",
8890
)

lambdas/filenameprocessor/tests/test_lambda_handler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,13 @@ def test_lambda_handler_extended_attributes_success(self):
269269
return_value=test_cases[0].ods_code + "_COVID",
270270
),
271271
patch(
272-
"file_name_processor.move_file_outside_bucket",
272+
"file_name_processor.copy_file_outside_bucket",
273273
side_effect=lambda src_bucket, key, dst_bucket, dst_key: (
274274
s3_client.put_object(
275275
Bucket=BucketNames.DESTINATION,
276276
Key=dst_key,
277277
Body=s3_client.get_object(Bucket=src_bucket, Key=key)["Body"].read(),
278278
),
279-
s3_client.delete_object(Bucket=src_bucket, Key=key),
280279
),
281280
),
282281
):

lambdas/shared/src/common/aws_s3_utils.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from common.clients import get_s3_client, logger
66

77
EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID")
8-
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
98

109

1110
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
@@ -20,7 +19,7 @@ def move_file(bucket_name: str, source_file_key: str, destination_file_key: str)
2019
logger.info("File moved from %s to %s", source_file_key, destination_file_key)
2120

2221

23-
def move_file_outside_bucket(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
22+
def copy_file_outside_bucket(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
2423
s3_client = get_s3_client()
2524
s3_client.copy_object(
2625
CopySource={"Bucket": source_bucket, "Key": source_key},
@@ -29,8 +28,16 @@ def move_file_outside_bucket(source_bucket: str, source_key: str, destination_bu
2928
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
3029
ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
3130
)
31+
32+
def delete_file(source_bucket: str, source_key: str) -> None:
33+
s3_client = get_s3_client()
3234
s3_client.delete_object(
3335
Bucket=source_bucket,
3436
Key=source_key,
3537
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
3638
)
39+
40+
def is_file_in_bucket(bucket_name: str, file_key: str) -> None:
41+
s3_client = get_s3_client()
42+
s3_client.head_object(Bucket=bucket_name, Key=file_key)
43+

lambdas/shared/tests/test_common/test_s3_utils.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def test_move_file_within_bucket(self):
7575
self.mock_logger_info.assert_called_with("File moved from %s to %s", file_key, dest_key)
7676

7777
def test_move_file_outside_bucket_copies_then_deletes(self):
78-
"""File should be copied to destination bucket under destination_key and removed from source bucket."""
78+
"""File should be copied to destination bucket under destination_key and deleted from source bucket"""
7979
source_key = "RSV_Vaccinations_v5_X8E5B_20000101T00000001.csv"
8080
destination_key = f"archive/{source_key}"
8181

@@ -88,8 +88,8 @@ def test_move_file_outside_bucket_copies_then_deletes(self):
8888
with self.assertRaises(self.s3.exceptions.NoSuchKey):
8989
self.s3.get_object(Bucket=self.destination_bucket, Key=destination_key)
9090

91-
# Execute move across buckets
92-
aws_s3_utils.move_file_outside_bucket(
91+
# Execute copy across buckets
92+
aws_s3_utils.copy_file_outside_bucket(
9393
source_bucket=self.source_bucket,
9494
source_key=source_key,
9595
destination_bucket=self.destination_bucket,
@@ -100,6 +100,16 @@ def test_move_file_outside_bucket_copies_then_deletes(self):
100100
dest_obj = self.s3.get_object(Bucket=self.destination_bucket, Key=destination_key)
101101
self.assertEqual(dest_obj["Body"].read(), body_content)
102102

103+
# Assert source object was not deleted
104+
src_obj = self.s3.get_object(Bucket=self.source_bucket, Key=source_key)
105+
self.assertEqual(src_obj["Body"].read(), body_content)
106+
107+
# Execute delete file
108+
aws_s3_utils.delete_file(
109+
source_bucket=self.source_bucket,
110+
source_key=source_key,
111+
)
112+
103113
# Assert source object was deleted
104114
with self.assertRaises(self.s3.exceptions.NoSuchKey):
105115
self.s3.get_object(Bucket=self.source_bucket, Key=source_key)

0 commit comments

Comments
 (0)