|
1 | | -import hashlib |
2 | 1 | import re |
3 | 2 |
|
4 | 3 | from botocore.exceptions import ClientError |
5 | 4 | from enums.lambda_error import LambdaError |
6 | 5 | from services.base.dynamo_service import DynamoDBService |
7 | | -from services.base.s3_service import S3Service |
8 | 6 | from services.base.ssm_service import SSMService |
9 | 7 | from utils.audit_logging_setup import LoggingService |
10 | 8 | from utils.lambda_exceptions import CloudFrontEdgeException |
|
15 | 13 | class EdgePresignService: |
16 | 14 | def __init__(self): |
17 | 15 | self.dynamo_service = DynamoDBService() |
18 | | - self.s3_service = S3Service() |
19 | 16 | self.ssm_service = SSMService() |
20 | 17 | self.table_name_ssm_param = "EDGE_REFERENCE_TABLE" |
21 | 18 |
|
22 | | - def use_presign(self, request_values: dict): |
23 | | - uri: str = request_values["uri"] |
24 | | - querystring: str = request_values["querystring"] |
25 | | - domain_name: str = request_values["domain_name"] |
| 19 | + def use_presigned(self, request_values: dict) -> dict: |
| 20 | + request_id = self._extract_request_id(request_values) |
| 21 | + domain_name = self._extract_domain_name(request_values) |
26 | 22 |
|
27 | | - presign_string: str = f"{uri}?{querystring}" |
28 | | - encoded_presign_string = presign_string.encode("utf-8") |
29 | | - presign_credentials_hash: str = hashlib.md5(encoded_presign_string).hexdigest() |
| 23 | + presigned_url = self._attempt_presigned_ingestion(request_id, domain_name) |
| 24 | + self._update_request_with_presigned_url(request_values, presigned_url) |
30 | 25 |
|
31 | | - self.attempt_presign_ingestion( |
32 | | - uri_hash=presign_credentials_hash, |
33 | | - domain_name=domain_name, |
34 | | - ) |
| 26 | + return request_values |
35 | 27 |
|
36 | | - def attempt_presign_ingestion(self, uri_hash: str, domain_name: str) -> None: |
| 28 | + def _attempt_presigned_ingestion(self, request_id: str, domain_name: str) -> str: |
37 | 29 | try: |
38 | | - environment = self.filter_domain_for_env(domain_name) |
39 | | - logger.info(f"Environment found: {environment}") |
40 | | - base_table_name: str = self.ssm_service.get_ssm_parameter( |
41 | | - self.table_name_ssm_param |
42 | | - ) |
43 | | - formatted_table_name: str = self.extend_table_name( |
44 | | - base_table_name, environment |
45 | | - ) |
46 | | - logger.info(f"Table: {formatted_table_name}") |
47 | | - self.dynamo_service.update_item( |
48 | | - table_name=formatted_table_name, |
49 | | - key_pair={"ID": uri_hash}, |
50 | | - updated_fields={"IsRequested": True}, |
51 | | - condition_expression="attribute_not_exists(IsRequested) OR IsRequested = :false", |
52 | | - expression_attribute_values={":false": False}, |
53 | | - ) |
| 30 | + environment = self._filter_domain_for_env(domain_name) |
| 31 | + table_name = self._get_formatted_table_name(environment) |
| 32 | + updated_item = self._update_dynamo_item(table_name, request_id) |
| 33 | + return self._extract_presigned_url(updated_item) |
54 | 34 | except ClientError as e: |
55 | 35 | logger.error(f"{str(e)}", {"Result": LambdaError.EdgeNoClient.to_str()}) |
56 | 36 | raise CloudFrontEdgeException(400, LambdaError.EdgeNoClient) |
57 | 37 |
|
58 | | - @staticmethod |
59 | | - def update_s3_headers(request: dict, request_values: dict): |
60 | | - domain_name = request_values["domain_name"] |
61 | | - if "authorization" in request["headers"]: |
62 | | - del request["headers"]["authorization"] |
| 38 | + def update_s3_headers(self, request: dict) -> dict: |
| 39 | + domain_name = self._extract_domain_name(request) |
| 40 | + request["headers"].pop("authorization", None) |
63 | 41 | request["headers"]["host"] = [{"key": "Host", "value": domain_name}] |
64 | | - |
65 | 42 | return request |
66 | 43 |
|
67 | | - @staticmethod |
68 | | - def filter_request_values(request: dict) -> dict: |
69 | | - try: |
70 | | - uri: str = request["uri"] |
71 | | - querystring: str = request["querystring"] |
72 | | - headers: dict = request["headers"] |
73 | | - origin: dict = request.get("origin", {}) |
74 | | - domain_name: str = origin["s3"]["domainName"] |
75 | | - except KeyError as e: |
76 | | - logger.error(f"Missing request component: {str(e)}") |
77 | | - raise CloudFrontEdgeException(500, LambdaError.EdgeNoOrigin) |
78 | | - |
79 | | - return { |
80 | | - "uri": uri, |
81 | | - "querystring": querystring, |
82 | | - "headers": headers, |
83 | | - "domain_name": domain_name, |
84 | | - } |
| 44 | + def _extract_request_id(self, request_values: dict) -> str: |
| 45 | + return request_values.get("uri", "").lstrip("/") |
85 | 46 |
|
86 | | - @staticmethod |
87 | | - def filter_domain_for_env(domain_name: str) -> str: |
| 47 | + def _extract_domain_name(self, request_values: dict) -> str: |
| 48 | + return request_values.get("origin", {}).get("s3", {}).get("domainName", "") |
| 49 | + |
| 50 | + def _update_request_with_presigned_url( |
| 51 | + self, request_values: dict, presigned_url: str |
| 52 | + ): |
| 53 | + question_mark_index = presigned_url.find("?") |
| 54 | + querystring = ( |
| 55 | + presigned_url[question_mark_index + 1 :] |
| 56 | + if question_mark_index != -1 |
| 57 | + else "" |
| 58 | + ) |
| 59 | + url_parts = ( |
| 60 | + presigned_url[:question_mark_index].split("/") |
| 61 | + if question_mark_index != -1 |
| 62 | + else presigned_url.split("/") |
| 63 | + ) |
| 64 | + request_values["querystring"] = querystring |
| 65 | + request_values["uri"] = "/" + "/".join(url_parts[3:]) |
| 66 | + |
| 67 | + def _filter_domain_for_env(self, domain_name: str) -> str: |
88 | 68 | match = re.match(r"^[^-]+(?:-[^-]+)?(?=-lloyd)", domain_name) |
89 | | - if match: |
90 | | - return match.group(0) |
91 | | - return "" |
| 69 | + return match.group(0) if match else "" |
| 70 | + |
| 71 | + def _get_formatted_table_name(self, environment: str) -> str: |
| 72 | + base_table_name = self.ssm_service.get_ssm_parameter(self.table_name_ssm_param) |
| 73 | + return f"{environment}_{base_table_name}" if environment else base_table_name |
| 74 | + |
| 75 | + def _update_dynamo_item(self, table_name: str, request_id: str) -> dict: |
| 76 | + return self.dynamo_service.update_item( |
| 77 | + table_name=table_name, |
| 78 | + key_pair={"ID": request_id}, |
| 79 | + updated_fields={"IsRequested": True}, |
| 80 | + condition_expression="attribute_not_exists(IsRequested) OR IsRequested = :false", |
| 81 | + expression_attribute_values={":false": False}, |
| 82 | + ) |
92 | 83 |
|
93 | 84 | @staticmethod |
94 | | - def extend_table_name(base_table_name: str, environment: str) -> str: |
95 | | - if environment: |
96 | | - return f"{environment}_{base_table_name}" |
97 | | - return base_table_name |
| 85 | + def _extract_presigned_url(updated_item: dict) -> str: |
| 86 | + return updated_item.get("Attributes", {}).get("presignedUrl", "") |
0 commit comments