Skip to content

Commit 91875c5

Browse files
committed
- create lambda for retrieving GovNotify text messages sent and notifications for Customer Services GovNotify account.
1 parent 18eb120 commit 91875c5

File tree

2 files changed

+451
-0
lines changed

2 files changed

+451
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
"""
2+
Script to call the GovNotify API to retrieve data from the
3+
Customer Services account and write to S3.
4+
Retrieved data is written to S3 Landing as a json string and parquet file.
5+
Data is then normalised and written to s3 Raw for use by analysts.
6+
Both zones are crawled so that data is exposed in the Glue data catalog.
7+
"""
8+
9+
from datetime import datetime
10+
from io import BytesIO
11+
import json
12+
import logging
13+
from os import getenv
14+
15+
from botocore.exceptions import ClientError
16+
import boto3
17+
from notifications_python_client.notifications import NotificationsAPIClient
18+
from notifications_python_client.errors import HTTPError
19+
import pandas as pd
20+
21+
# Set up logging
22+
logging.basicConfig(level=logging.INFO)
23+
logger = logging.getLogger(__name__)
24+
25+
26+
def initialize_s3_client():
27+
"""
28+
Initialise and return an AWS S3 client using default credentials.
29+
30+
Returns:
31+
boto3.client: S3 client instance.
32+
"""
33+
return boto3.client('s3')
34+
35+
36+
def get_api_secret(api_secret_name, region_name):
37+
session = boto3.session.Session()
38+
client = session.client(service_name="secretsmanager", region_name=region_name)
39+
try:
40+
get_secret_value_response = client.get_secret_value(SecretId=api_secret_name)
41+
except ClientError as e:
42+
raise e
43+
return get_secret_value_response["SecretString"]
44+
45+
46+
def initialise_notification_client(api_key):
47+
"""
48+
Initialise and return a GovNotify Python API client using api key (secret).
49+
Args:
50+
api_key (str)
51+
52+
Returns:
53+
GovNotify Python API client instance
54+
"""
55+
return NotificationsAPIClient(api_key)
56+
57+
58+
def get_response(query):
59+
try:
60+
response = query
61+
except HTTPError as e:
62+
logger.error(
63+
f"Error requesting response from {query}: {e}"
64+
)
65+
raise
66+
return response
67+
68+
69+
def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name):
70+
"""
71+
Upload file content to AWS S3.
72+
73+
Args:
74+
s3_bucket_name (): Name of S3 bucket.
75+
s3_client (boto3.client): S3 client instance.
76+
file_content (bytes): File content as bytes.
77+
file_name (str): Name of the file in S3.
78+
79+
Returns:
80+
None
81+
"""
82+
try:
83+
s3_client.put_object(Bucket=s3_bucket_name, Key=file_name, Body=file_content)
84+
logger.info(f"Uploaded {file_name} to S3")
85+
except Exception as e:
86+
logger.error(f"Error uploading {file_name} to S3: {str(e)}")
87+
88+
89+
def json_to_parquet(response, label):
90+
"""
91+
92+
Args:
93+
response (dict): dict containing response from API
94+
label (str): Name of the api endpoint 'table' retrieved.
95+
96+
Returns:
97+
parquet buffer object
98+
99+
"""
100+
df = pd.DataFrame.from_dict(response[label])
101+
parquet_buffer = BytesIO()
102+
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
103+
return parquet_buffer
104+
105+
106+
def json_to_parquet_normalised(response, label):
107+
"""
108+
Args:
109+
response (json str): json string containing json response from API
110+
label (str): Name of the api endpoint 'table' retrieved.
111+
return:
112+
parquet buffer object
113+
"""
114+
data = json.loads(response)
115+
df = pd.json_normalize(data[label], max_level=1)
116+
parquet_buffer = BytesIO()
117+
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
118+
return parquet_buffer
119+
120+
121+
def prepare_json(response):
122+
return json.dumps(response).encode('utf-8')
123+
124+
125+
def add_date_partition_key_to_s3_prefix(s3_prefix):
126+
t = datetime.today()
127+
partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa
128+
return f"{s3_prefix}{partition_key}"
129+
130+
131+
def lambda_handler(event, context):
132+
logger.info("Set up S3 client...")
133+
s3_client = boto3.client('s3')
134+
glue_client = boto3.client('glue')
135+
136+
api_secret_name = getenv("API_SECRET_NAME")
137+
region_name = getenv("AWS_REGION")
138+
139+
output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING")
140+
output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW")
141+
output_folder = getenv("TARGET_S3_FOLDER")
142+
crawler_landing = getenv("CRAWLER_NAME_LANDING")
143+
crawler_raw = getenv("CRAWLER_NAME_RAW")
144+
145+
logger.info("Get API secret...")
146+
api_secret_string = get_api_secret(api_secret_name, region_name)
147+
api_secret_json = json.loads(api_secret_string)
148+
api_key = api_secret_json.get("api_key_live")
149+
client = initialise_notification_client(api_key)
150+
151+
# GovNotify queries to retrieve
152+
api_queries = ['notifications', 'received_text_messages']
153+
api_queries_dict = {
154+
'notifications': {'query': client.get_all_notifications(include_jobs=True),
155+
'file_name': 'notifications'},
156+
'received_text_messages': {'query': client.get_received_texts(),
157+
'file_name': 'received_text_messages'}
158+
}
159+
160+
logger.info("Get API responses...")
161+
for api_query in api_queries:
162+
query = api_queries_dict.get(api_query).get('query')
163+
response = get_response(query)
164+
file_name = api_queries_dict.get(api_query).get('file_name')
165+
166+
output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/')
167+
output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/')
168+
169+
# convert response to json formatted string
170+
json_str = prepare_json(response=response)
171+
172+
# Upload the json string to landing only
173+
upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json')
174+
175+
# Upload parquet buffer to both S3 landing and raw; run crawler
176+
parquet_buffer_landing = json_to_parquet(response=response, label=file_name)
177+
parquet_buffer_landing.seek(0)
178+
s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing,
179+
f'{output_folder_parquet}{file_name}.parquet')
180+
glue_client.start_crawler(Name=f'{crawler_landing} {file_name}')
181+
182+
parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name)
183+
parquet_buffer_raw.seek(0)
184+
s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw,
185+
f'{output_folder_parquet}{file_name}.parquet')
186+
glue_client.start_crawler(Name=f'{crawler_raw} {file_name}')
187+
188+
logger.info("Job finished")
189+
190+
191+
if __name__ == "__main__":
192+
lambda_handler("event", "lambda_context")

0 commit comments

Comments
 (0)