|
| 1 | +""" |
| 2 | +Script to call the GovNotify API to retrieve data from the |
| 3 | +Housing LBH Communal Repairs account and write to S3. |
| 4 | +Retrieved data is written to S3 Landing as a json string and parquet file. |
| 5 | +Data is then normalised and written to s3 Raw for use by analysts. |
| 6 | +Both zones are crawled so that data is exposed in the Glue data catalog. |
| 7 | +""" |
| 8 | + |
| 9 | +from datetime import datetime |
| 10 | +from io import BytesIO |
| 11 | +import json |
| 12 | +import logging |
| 13 | +from os import getenv |
| 14 | + |
| 15 | +from botocore.exceptions import ClientError |
| 16 | +import boto3 |
| 17 | +from notifications_python_client.notifications import NotificationsAPIClient |
| 18 | +from notifications_python_client.errors import HTTPError |
| 19 | +import pandas as pd |
| 20 | + |
| 21 | +# Set up logging |
| 22 | +logging.basicConfig(level=logging.INFO) |
| 23 | +logger = logging.getLogger(__name__) |
| 24 | + |
| 25 | + |
| 26 | +def initialize_s3_client(): |
| 27 | + """ |
| 28 | + Initialise and return an AWS S3 client using default credentials. |
| 29 | +
|
| 30 | + Returns: |
| 31 | + boto3.client: S3 client instance. |
| 32 | + """ |
| 33 | + return boto3.client('s3') |
| 34 | + |
| 35 | + |
| 36 | +def get_api_secret(api_secret_name, region_name): |
| 37 | + session = boto3.session.Session() |
| 38 | + client = session.client(service_name="secretsmanager", region_name=region_name) |
| 39 | + try: |
| 40 | + get_secret_value_response = client.get_secret_value(SecretId=api_secret_name) |
| 41 | + except ClientError as e: |
| 42 | + raise e |
| 43 | + return get_secret_value_response["SecretString"] |
| 44 | + |
| 45 | + |
| 46 | +def initialise_notification_client(api_key): |
| 47 | + """ |
| 48 | + Initialise and return a GovNotify Python API client using api key (secret). |
| 49 | + Args: |
| 50 | + api_key (str) |
| 51 | +
|
| 52 | + Returns: |
| 53 | + GovNotify Python API client instance |
| 54 | + """ |
| 55 | + return NotificationsAPIClient(api_key) |
| 56 | + |
| 57 | + |
| 58 | +def get_response(query): |
| 59 | + try: |
| 60 | + response = query |
| 61 | + except HTTPError as e: |
| 62 | + logger.error( |
| 63 | + f"Error requesting response from {query}: {e}" |
| 64 | + ) |
| 65 | + raise |
| 66 | + return response |
| 67 | + |
| 68 | + |
| 69 | +def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name): |
| 70 | + """ |
| 71 | + Upload file content to AWS S3. |
| 72 | +
|
| 73 | + Args: |
| 74 | + s3_bucket_name (): Name of S3 bucket. |
| 75 | + s3_client (boto3.client): S3 client instance. |
| 76 | + file_content (bytes): File content as bytes. |
| 77 | + file_name (str): Name of the file in S3. |
| 78 | +
|
| 79 | + Returns: |
| 80 | + None |
| 81 | + """ |
| 82 | + try: |
| 83 | + s3_client.put_object(Bucket=s3_bucket_name, Key=file_name, Body=file_content) |
| 84 | + logger.info(f"Uploaded {file_name} to S3") |
| 85 | + except Exception as e: |
| 86 | + logger.error(f"Error uploading {file_name} to S3: {str(e)}") |
| 87 | + |
| 88 | + |
| 89 | +def json_to_parquet(response, label): |
| 90 | + """ |
| 91 | +
|
| 92 | + Args: |
| 93 | + response (dict): dict containing response from API |
| 94 | + label (str): Name of the api endpoint 'table' retrieved. |
| 95 | +
|
| 96 | + Returns: |
| 97 | + parquet buffer object |
| 98 | +
|
| 99 | + """ |
| 100 | + df = pd.DataFrame.from_dict(response[label]) |
| 101 | + parquet_buffer = BytesIO() |
| 102 | + df.to_parquet(parquet_buffer, index=False, engine='pyarrow') |
| 103 | + return parquet_buffer |
| 104 | + |
| 105 | + |
| 106 | +def json_to_parquet_normalised(response, label): |
| 107 | + """ |
| 108 | + Args: |
| 109 | + response (json str): json string containing json response from API |
| 110 | + label (str): Name of the api endpoint 'table' retrieved. |
| 111 | + return: |
| 112 | + parquet buffer object |
| 113 | + """ |
| 114 | + data = json.loads(response) |
| 115 | + df = pd.json_normalize(data[label], max_level=1) |
| 116 | + parquet_buffer = BytesIO() |
| 117 | + df.to_parquet(parquet_buffer, index=False, engine='pyarrow') |
| 118 | + return parquet_buffer |
| 119 | + |
| 120 | + |
| 121 | +def prepare_json(response): |
| 122 | + return json.dumps(response).encode('utf-8') |
| 123 | + |
| 124 | + |
| 125 | +def add_date_partition_key_to_s3_prefix(s3_prefix): |
| 126 | + t = datetime.today() |
| 127 | + partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa |
| 128 | + return f"{s3_prefix}{partition_key}" |
| 129 | + |
| 130 | + |
| 131 | +def lambda_handler(event, context): |
| 132 | + logger.info("Set up S3 client...") |
| 133 | + s3_client = boto3.client('s3') |
| 134 | + glue_client = boto3.client('glue') |
| 135 | + |
| 136 | + api_secret_name = getenv("API_SECRET_NAME") |
| 137 | + region_name = getenv("AWS_REGION") |
| 138 | + |
| 139 | + output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING") |
| 140 | + output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW") |
| 141 | + output_folder = getenv("TARGET_S3_FOLDER") |
| 142 | + crawler_landing = getenv("CRAWLER_NAME_LANDING") |
| 143 | + crawler_raw = getenv("CRAWLER_NAME_RAW") |
| 144 | + |
| 145 | + logger.info("Get API secret...") |
| 146 | + api_secret_string = get_api_secret(api_secret_name, region_name) |
| 147 | + api_secret_json = json.loads(api_secret_string) |
| 148 | + api_key = api_secret_json.get("api_key_live") |
| 149 | + client = initialise_notification_client(api_key) |
| 150 | + |
| 151 | + # GovNotify queries to retrieve |
| 152 | + api_queries = ['notifications'] |
| 153 | + api_queries_dict = { |
| 154 | + 'notifications': {'query': client.get_all_notifications(include_jobs=True), |
| 155 | + 'file_name': 'notifications'} |
| 156 | + } |
| 157 | + |
| 158 | + logger.info("Get API responses...") |
| 159 | + for api_query in api_queries: |
| 160 | + query = api_queries_dict.get(api_query).get('query') |
| 161 | + response = get_response(query) |
| 162 | + file_name = api_queries_dict.get(api_query).get('file_name') |
| 163 | + |
| 164 | + output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/') |
| 165 | + output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/') |
| 166 | + |
| 167 | + # convert response to json formatted string |
| 168 | + json_str = prepare_json(response=response) |
| 169 | + |
| 170 | + # Upload the json string to landing only |
| 171 | + upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json') |
| 172 | + |
| 173 | + # Upload parquet buffer to both S3 landing and raw; run crawler |
| 174 | + parquet_buffer_landing = json_to_parquet(response=response, label=file_name) |
| 175 | + parquet_buffer_landing.seek(0) |
| 176 | + s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing, |
| 177 | + f'{output_folder_parquet}{file_name}.parquet') |
| 178 | + glue_client.start_crawler(Name=f'{crawler_landing} {file_name}') |
| 179 | + |
| 180 | + parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name) |
| 181 | + parquet_buffer_raw.seek(0) |
| 182 | + s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw, |
| 183 | + f'{output_folder_parquet}{file_name}.parquet') |
| 184 | + glue_client.start_crawler(Name=f'{crawler_raw} {file_name}') |
| 185 | + |
| 186 | + logger.info("Job finished") |
| 187 | + |
| 188 | + |
| 189 | +if __name__ == "__main__": |
| 190 | + lambda_handler("event", "lambda_context") |
0 commit comments