|
1 | 1 | from wireup import service |
| 2 | +import boto3 |
| 3 | +from botocore.client import BaseClient |
| 4 | +from botocore.exceptions import ClientError |
| 5 | +from wireup import Inject, service |
| 6 | +from typing import Annotated |
2 | 7 |
|
3 | | -@service(qualifier="nhs_hmac_key") |
4 | | -def nhs_hmac_key_factory() -> bytes: |
5 | | - return b"abc123" # salt |
| 8 | +from boto3 import Session |
6 | 9 |
|
7 | | -# import boto3 |
8 | | -# from botocore.exceptions import ClientError |
| 10 | +# @service(qualifier="nhs_hmac_key") |
| 11 | +# def nhs_hmac_key_factory() -> bytes: # -> dict[str, bytes]: |
| 12 | +# # return b"abc123" |
| 13 | +# keys: dict[str, bytes] = {} |
| 14 | +# #return keys |
9 | 15 | # |
10 | | -# secret_name = "eligibility-signposting-api-dev/hashing_secret" |
11 | | -# region_name = "eu-west-2" |
| 16 | +# # return { |
| 17 | +# # "AWSCURRENT": b"current_dummy_key", |
| 18 | +# # "AWSPREVIOUS": b"previous_dummy_key" |
| 19 | +# # } |
12 | 20 | # |
13 | | -# # Create a Secrets Manager client |
14 | | -# session = boto3.session.Session() |
15 | | -# client = session.client( |
16 | | -# service_name='secretsmanager', |
17 | | -# region_name=region_name |
18 | | -# ) |
| 21 | +# for stage in ("AWSCURRENT", "AWSPREVIOUS"): |
| 22 | +# value = get_secret_by_stage() |
| 23 | +# if value: |
| 24 | +# keys[stage] = value.encode("utf-8") |
19 | 25 | # |
20 | | -# try: |
21 | | -# get_secret_value_response = client.get_secret_value( |
22 | | -# SecretId=secret_name |
23 | | -# ) |
24 | | -# except ClientError as e: |
25 | | -# # For a list of exceptions thrown, see |
26 | | -# # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html |
27 | | -# raise e |
| 26 | +# # if keys = {}, then raise error |
| 27 | +# if not keys: |
| 28 | +# raise RuntimeError("No valid NHS HMAC secret keys available") |
28 | 29 | # |
29 | | -# secret = get_secret_value_response['SecretString'] |
| 30 | +# return keys |
| 31 | + |
| 32 | +@service(qualifier="secretsmanager") |
| 33 | +def secretsmanager_client_factory(session: Session) -> BaseClient: |
| 34 | + return session.client("secretsmanager") |
| 35 | + |
| 36 | + |
| 37 | +@service(qualifier="nhs_hmac_key") |
| 38 | +def nhs_hmac_key_factory( |
| 39 | + secrets_client: Annotated[BaseClient, Inject(qualifier="secretsmanager")], |
| 40 | +) -> bytes: |
| 41 | + keys: dict[str, bytes] = {} |
| 42 | + |
| 43 | + for stage in ("AWSCURRENT", "AWSPREVIOUS"): |
| 44 | + value = get_secret_by_stage(secrets_client) |
| 45 | + if value: |
| 46 | + keys[stage] = value.encode("utf-8") |
| 47 | + |
| 48 | + if not keys: |
| 49 | + raise RuntimeError("No valid NHS HMAC secret keys available") |
| 50 | + |
| 51 | + return keys |
| 52 | + |
| 53 | + |
| 54 | +def get_secret_by_stage(secrets_client: BaseClient |
| 55 | + #secrets_client: Annotated[BaseClient, Inject(qualifier="secretsmanager")], |
| 56 | + ) -> str: |
| 57 | + secret_name = "eligibility-signposting-api-dev/hashing_secret" |
| 58 | + region_name = "eu-west-2" |
| 59 | + # client = boto3.client("secretsmanager", region_name=region_name) |
| 60 | + stage="AWSCURRENT" |
| 61 | + try: |
| 62 | + secrets_client.describe_secret(SecretId=secret_name) |
| 63 | + except ClientError as e: |
| 64 | + raise RuntimeError(f"Secret '{secret_name}' does not exist") from e |
| 65 | + |
| 66 | + version_stages = ["AWSCURRENT", "AWSPREVIOUS"] # add "AWSPENDING" later |
| 67 | + |
| 68 | + #for stage in version_stages: |
| 69 | + try: |
| 70 | + response = secrets_client.get_secret_value( |
| 71 | + SecretId=secret_name, |
| 72 | + VersionStage=stage |
| 73 | + ) |
| 74 | + secret = response.get("SecretString") # response.get("SecretBinary") |
| 75 | + if secret: |
| 76 | + return secret |
| 77 | + # except ClientError as e: |
| 78 | + # raise e # Needs expanding as it would raise after 1 iteration currently |
| 79 | + |
| 80 | + except ClientError as e: |
| 81 | + if e.response["Error"]["Code"] not in ( |
| 82 | + "ResourceNotFoundException", |
| 83 | + "ResourceNotFound", |
| 84 | + "ValidationException", |
| 85 | + ): |
| 86 | + raise e |
| 87 | + |
| 88 | + raise RuntimeError(f"No valid version found for secret '{secret_name}'") |
| 89 | + |
| 90 | + |
0 commit comments