Skip to content

Commit 413ff1e

Browse files
committed
Amend Lambda script for LBH Communal Repairs (GovNotify) to ensure that all notifications (sent text messages) are retrieved.
1 parent 778f411 commit 413ff1e

File tree

1 file changed

+46
-61
lines changed
  • lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs

1 file changed

+46
-61
lines changed

lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py

Lines changed: 46 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
"""
22
Script to call the GovNotify API to retrieve data from the
33
Housing LBH Communal Repairs account and write to S3.
4-
Retrieved data is written to S3 Landing as a json string and parquet file.
4+
Retrieved data is written to S3 Landing as a json string.
55
Data is then normalised and written to s3 Raw for use by analysts.
6-
Both zones are crawled so that data is exposed in the Glue data catalog.
6+
Raw zone is crawled so that data is exposed in the Glue data catalog.
77
"""
88

99
from datetime import datetime
1010
from io import BytesIO
1111
import json
1212
import logging
1313
from os import getenv
14+
import re
1415

1516
from botocore.exceptions import ClientError
1617
import boto3
@@ -52,7 +53,21 @@ def initialise_notification_client(api_key):
5253
Returns:
5354
GovNotify Python API client instance
5455
"""
55-
return NotificationsAPIClient(api_key)
56+
return NotificationsAPIClientAllJobs(api_key)
57+
58+
59+
class NotificationsAPIClientAllJobs(NotificationsAPIClient):
60+
61+
def get_all_notifications_iterator_all_jobs(self, status=None, template_type=None, reference=None, older_than=None,
62+
include_jobs=None):
63+
result = self.get_all_notifications(status, template_type, reference, older_than, include_jobs)
64+
notifications = result.get("notifications")
65+
while notifications:
66+
yield from notifications
67+
next_link = result["links"].get("next")
68+
notification_id = re.search("[0-F]{8}-[0-F]{4}-[0-F]{4}-[0-F]{4}-[0-F]{12}", next_link, re.I).group(0)
69+
result = self.get_all_notifications(status, template_type, reference, notification_id, include_jobs)
70+
notifications = result.get("notifications")
5671

5772

5873
def get_response(query):
@@ -66,6 +81,10 @@ def get_response(query):
6681
return response
6782

6883

84+
def prepare_json(response):
85+
return json.dumps(response).encode('utf-8')
86+
87+
6988
def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name):
7089
"""
7190
Upload file content to AWS S3.
@@ -86,23 +105,6 @@ def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name):
86105
logger.error(f"Error uploading {file_name} to S3: {str(e)}")
87106

88107

89-
def json_to_parquet(response, label):
90-
"""
91-
92-
Args:
93-
response (dict): dict containing response from API
94-
label (str): Name of the api endpoint 'table' retrieved.
95-
96-
Returns:
97-
parquet buffer object
98-
99-
"""
100-
df = pd.DataFrame.from_dict(response[label])
101-
parquet_buffer = BytesIO()
102-
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
103-
return parquet_buffer
104-
105-
106108
def json_to_parquet_normalised(response, label):
107109
"""
108110
Args:
@@ -111,17 +113,12 @@ def json_to_parquet_normalised(response, label):
111113
return:
112114
parquet buffer object
113115
"""
114-
data = json.loads(response)
115-
df = pd.json_normalize(data[label], max_level=1)
116+
df = pd.json_normalize(response)
116117
parquet_buffer = BytesIO()
117118
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
118119
return parquet_buffer
119120

120121

121-
def prepare_json(response):
122-
return json.dumps(response).encode('utf-8')
123-
124-
125122
def add_date_partition_key_to_s3_prefix(s3_prefix):
126123
t = datetime.today()
127124
partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa
@@ -135,11 +132,11 @@ def lambda_handler(event, context):
135132

136133
api_secret_name = getenv("API_SECRET_NAME")
137134
region_name = getenv("AWS_REGION")
135+
file_name = 'notifications'
138136

139137
output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING")
140138
output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW")
141139
output_folder = getenv("TARGET_S3_FOLDER")
142-
crawler_landing = getenv("CRAWLER_NAME_LANDING")
143140
crawler_raw = getenv("CRAWLER_NAME_RAW")
144141

145142
logger.info("Get API secret...")
@@ -148,40 +145,28 @@ def lambda_handler(event, context):
148145
api_key = api_secret_json.get("api_key_live")
149146
client = initialise_notification_client(api_key)
150147

151-
# GovNotify queries to retrieve
152-
api_queries = ['notifications']
153-
api_queries_dict = {
154-
'notifications': {'query': client.get_all_notifications(include_jobs=True),
155-
'file_name': 'notifications'}
156-
}
157-
158-
logger.info("Get API responses...")
159-
for api_query in api_queries:
160-
query = api_queries_dict.get(api_query).get('query')
161-
response = get_response(query)
162-
file_name = api_queries_dict.get(api_query).get('file_name')
163-
164-
output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/')
165-
output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/')
166-
167-
# convert response to json formatted string
168-
json_str = prepare_json(response=response)
169-
170-
# Upload the json string to landing only
171-
upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json')
172-
173-
# Upload parquet buffer to both S3 landing and raw; run crawler
174-
parquet_buffer_landing = json_to_parquet(response=response, label=file_name)
175-
parquet_buffer_landing.seek(0)
176-
s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing,
177-
f'{output_folder_parquet}{file_name}.parquet')
178-
glue_client.start_crawler(Name=f'{crawler_landing} {file_name}')
179-
180-
parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name)
181-
parquet_buffer_raw.seek(0)
182-
s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw,
183-
f'{output_folder_parquet}{file_name}.parquet')
184-
glue_client.start_crawler(Name=f'{crawler_raw} {file_name}')
148+
logger.info("Get all notifications through iterator...")
149+
150+
response = client.get_all_notifications_iterator_all_jobs(template_type='sms', include_jobs=True)
151+
152+
# write iterator items to a list as items will be finite
153+
response_list = list(response)
154+
155+
output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/')
156+
output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/')
157+
158+
# convert response to json formatted string
159+
json_str = prepare_json(response=response_list)
160+
161+
# Upload the json string to landing only
162+
upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json')
163+
164+
# Upload parquet buffer to S3 raw; run crawler
165+
parquet_buffer_raw = json_to_parquet_normalised(response=response_list, label=file_name)
166+
parquet_buffer_raw.seek(0)
167+
s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw,
168+
f'{output_folder_parquet}{file_name}.parquet')
169+
glue_client.start_crawler(Name=f'{crawler_raw} {file_name}')
185170

186171
logger.info("Job finished")
187172

0 commit comments

Comments
 (0)