|
9 | 9 | import argparse |
10 | 10 | from uuid import uuid4 |
11 | 11 |
|
| 12 | +from botocore.exceptions import ClientError |
| 13 | + |
12 | 14 | from audit_table import upsert_audit_table |
13 | 15 | from common.aws_s3_utils import move_file |
14 | 16 | from common.clients import STREAM_NAME, get_s3_client, logger |
|
31 | 33 | from supplier_permissions import validate_vaccine_type_permissions |
32 | 34 | from utils_for_filenameprocessor import get_creation_and_expiry_times |
33 | 35 |
|
| 36 | +# PoC for VED-902: |
| 37 | +# 1. if a filename containing a certain string appears in our bucket, |
| 38 | +# move it into a test bucket and upsert in-progress to the audit table (this bit is 901) |
| 39 | +# 2. check that the filename has arrived in the bucket. if so, upsert completed, if not, upsert an error. |
| 40 | +# Thoughts:- there is naturally going to be a delay on the file move; when do we check? |
| 41 | +# We could implement a new lambda triggered on it BUT if it's never triggered, we never get the upsert. |
| 42 | + |
| 43 | +TEST_EA_BUCKET = "902-test-ea-bucket" |
| 44 | +TEST_EA_FILENAME = "Vaccination_Extended_Attributes" |
| 45 | + |
| 46 | + |
| 47 | +# this is a copy of the move_file from mesh_processor |
| 48 | +def move_file_to_bucket(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None: |
| 49 | + s3_client = get_s3_client() |
| 50 | + s3_client.copy_object( |
| 51 | + CopySource={"Bucket": source_bucket, "Key": source_key}, |
| 52 | + Bucket=destination_bucket, |
| 53 | + Key=destination_key, |
| 54 | + ) |
| 55 | + s3_client.delete_object( |
| 56 | + Bucket=source_bucket, |
| 57 | + Key=source_key, |
| 58 | + ) |
| 59 | + |
| 60 | + |
| 61 | +def is_file_in_bucket(bucket_name: str, file_key: str) -> bool: |
| 62 | + try: |
| 63 | + s3_client = get_s3_client() |
| 64 | + s3_client.head_object(Bucket=bucket_name, Key=file_key) |
| 65 | + except ClientError: |
| 66 | + return False |
| 67 | + return True |
| 68 | + |
34 | 69 |
|
35 | 70 | # NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because |
36 | 71 | # the logging_decorator is for an individual record, whereas the lambda_handler could potentially be handling |
@@ -80,35 +115,78 @@ def handle_record(record) -> dict: |
80 | 115 | vaccine_type, supplier = validate_file_key(file_key) |
81 | 116 | permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier) |
82 | 117 |
|
83 | | - queue_name = f"{supplier}_{vaccine_type}" |
84 | | - upsert_audit_table( |
85 | | - message_id, |
86 | | - file_key, |
87 | | - created_at_formatted_string, |
88 | | - expiry_timestamp, |
89 | | - queue_name, |
90 | | - FileStatus.QUEUED, |
91 | | - ) |
92 | | - make_and_send_sqs_message( |
93 | | - file_key, |
94 | | - message_id, |
95 | | - permissions, |
96 | | - vaccine_type, |
97 | | - supplier, |
98 | | - created_at_formatted_string, |
99 | | - ) |
100 | | - |
101 | | - logger.info("Lambda invocation successful for file '%s'", file_key) |
102 | | - |
103 | | - # Return details for logs |
104 | | - return { |
105 | | - "statusCode": 200, |
106 | | - "message": "Successfully sent to SQS for further processing", |
107 | | - "file_key": file_key, |
108 | | - "message_id": message_id, |
109 | | - "vaccine_type": vaccine_type, |
110 | | - "supplier": supplier, |
111 | | - } |
| 118 | + # here: if it's an EA file, move it, and upsert it to PROCESSING; use the bucket name as the queue name |
| 119 | + if TEST_EA_FILENAME in file_key: |
| 120 | + dest_bucket_name = TEST_EA_BUCKET |
| 121 | + |
| 122 | + move_file_to_bucket(bucket_name, file_key, dest_bucket_name, file_key) |
| 123 | + |
| 124 | + upsert_audit_table( |
| 125 | + message_id, |
| 126 | + file_key, |
| 127 | + created_at_formatted_string, |
| 128 | + expiry_timestamp, |
| 129 | + dest_bucket_name, |
| 130 | + FileStatus.PROCESSING, |
| 131 | + ) |
| 132 | + logger.info("Lambda invocation successful for file '%s'", file_key) |
| 133 | + |
| 134 | + # TODO: check the file is in the dest bucket, upsert again accordingly. |
| 135 | + # NB: not clear yet whether we need to do this in an entirely new lambda. |
| 136 | + if is_file_in_bucket(dest_bucket_name, file_key): |
| 137 | + status_code = 200 |
| 138 | + message = (f"Successfully sent to {dest_bucket_name} for further processing",) |
| 139 | + file_status = FileStatus.PROCESSED |
| 140 | + upsert_audit_table( |
| 141 | + message_id, |
| 142 | + file_key, |
| 143 | + created_at_formatted_string, |
| 144 | + expiry_timestamp, |
| 145 | + dest_bucket_name, |
| 146 | + file_status, |
| 147 | + ) |
| 148 | + else: |
| 149 | + status_code = 400 |
| 150 | + message = (f"Failed to send to {dest_bucket_name} for further processing",) |
| 151 | + file_status = FileStatus.FAILED |
| 152 | + |
| 153 | + # Return details for logs |
| 154 | + return { |
| 155 | + "statusCode": status_code, |
| 156 | + "message": message, |
| 157 | + "file_key": file_key, |
| 158 | + "message_id": message_id, |
| 159 | + } |
| 160 | + else: |
| 161 | + queue_name = f"{supplier}_{vaccine_type}" |
| 162 | + upsert_audit_table( |
| 163 | + message_id, |
| 164 | + file_key, |
| 165 | + created_at_formatted_string, |
| 166 | + expiry_timestamp, |
| 167 | + queue_name, |
| 168 | + FileStatus.QUEUED, |
| 169 | + ) |
| 170 | + make_and_send_sqs_message( |
| 171 | + file_key, |
| 172 | + message_id, |
| 173 | + permissions, |
| 174 | + vaccine_type, |
| 175 | + supplier, |
| 176 | + created_at_formatted_string, |
| 177 | + ) |
| 178 | + |
| 179 | + logger.info("Lambda invocation successful for file '%s'", file_key) |
| 180 | + |
| 181 | + # Return details for logs |
| 182 | + return { |
| 183 | + "statusCode": 200, |
| 184 | + "message": "Successfully sent to SQS for further processing", |
| 185 | + "file_key": file_key, |
| 186 | + "message_id": message_id, |
| 187 | + "vaccine_type": vaccine_type, |
| 188 | + "supplier": supplier, |
| 189 | + } |
112 | 190 |
|
113 | 191 | except ( # pylint: disable=broad-exception-caught |
114 | 192 | VaccineTypePermissionsError, |
|
0 commit comments