1+ # test rebuild lambdas
2+
13"""
24Script to call the GovNotify API to retrieve data and write to S3.
35Retrieved data is written to S3 Landing as a json string and parquet file.
46Data is then normalised and written to s3 Raw for use by analysts.
57Both zones are crawled so that data is exposed in the Glue data catalog.
68"""
79
8- from datetime import datetime
9- from io import BytesIO
1010import json
1111import logging
12+ from datetime import datetime
13+ from io import BytesIO
1214from os import getenv
1315
14- from botocore .exceptions import ClientError
1516import boto3
16- from notifications_python_client .notifications import NotificationsAPIClient
17- from notifications_python_client .errors import HTTPError
1817import pandas as pd
18+ from botocore .exceptions import ClientError
19+ from notifications_python_client .errors import HTTPError
20+ from notifications_python_client .notifications import NotificationsAPIClient
21+
1922
2023# Set up logging
2124logging .basicConfig (level = logging .INFO )
@@ -29,7 +32,7 @@ def initialize_s3_client():
2932 Returns:
3033 boto3.client: S3 client instance.
3134 """
32- return boto3 .client ('s3' )
35+ return boto3 .client ("s3" )
3336
3437
3538def get_api_secret (api_secret_name , region_name ):
@@ -58,9 +61,7 @@ def get_response(query):
5861 try :
5962 response = query
6063 except HTTPError as e :
61- logger .error (
62- f"Error requesting response from { query } : { e } "
63- )
64+ logger .error (f"Error requesting response from { query } : { e } " )
6465 raise
6566 return response
6667
@@ -98,7 +99,7 @@ def json_to_parquet(response, label):
9899 """
99100 df = pd .DataFrame .from_dict (response [label ])
100101 parquet_buffer = BytesIO ()
101- df .to_parquet (parquet_buffer , index = False , engine = ' pyarrow' )
102+ df .to_parquet (parquet_buffer , index = False , engine = " pyarrow" )
102103 return parquet_buffer
103104
104105
@@ -113,12 +114,12 @@ def json_to_parquet_normalised(response, label):
113114 data = json .loads (response )
114115 df = pd .json_normalize (data [label ], max_level = 1 )
115116 parquet_buffer = BytesIO ()
116- df .to_parquet (parquet_buffer , index = False , engine = ' pyarrow' )
117+ df .to_parquet (parquet_buffer , index = False , engine = " pyarrow" )
117118 return parquet_buffer
118119
119120
120121def prepare_json (response ):
121- return json .dumps (response ).encode (' utf-8' )
122+ return json .dumps (response ).encode (" utf-8" )
122123
123124
124125def add_date_partition_key_to_s3_prefix (s3_prefix ):
@@ -129,8 +130,8 @@ def add_date_partition_key_to_s3_prefix(s3_prefix):
129130
130131def lambda_handler (event , context ):
131132 logger .info ("Set up S3 client..." )
132- s3_client = boto3 .client ('s3' )
133- glue_client = boto3 .client (' glue' )
133+ s3_client = boto3 .client ("s3" )
134+ glue_client = boto3 .client (" glue" )
134135
135136 api_secret_name = getenv ("API_SECRET_NAME" )
136137 region_name = getenv ("AWS_REGION" )
@@ -148,41 +149,62 @@ def lambda_handler(event, context):
148149 client = initialise_notification_client (api_key )
149150
150151 # GovNotify queries to retrieve
151- api_queries = [' notifications' , ' received_text_messages' ]
152+ api_queries = [" notifications" , " received_text_messages" ]
152153 api_queries_dict = {
153- 'notifications' : {'query' : client .get_all_notifications (include_jobs = True ),
154- 'file_name' : 'notifications' },
155- 'received_text_messages' : {'query' : client .get_received_texts (),
156- 'file_name' : 'received_text_messages' }
154+ "notifications" : {
155+ "query" : client .get_all_notifications (include_jobs = True ),
156+ "file_name" : "notifications" ,
157+ },
158+ "received_text_messages" : {
159+ "query" : client .get_received_texts (),
160+ "file_name" : "received_text_messages" ,
161+ },
157162 }
158163
159164 logger .info ("Get API responses..." )
160165 for api_query in api_queries :
161- query = api_queries_dict .get (api_query ).get (' query' )
166+ query = api_queries_dict .get (api_query ).get (" query" )
162167 response = get_response (query )
163- file_name = api_queries_dict .get (api_query ).get (' file_name' )
168+ file_name = api_queries_dict .get (api_query ).get (" file_name" )
164169
165- output_folder_json = add_date_partition_key_to_s3_prefix (f'{ output_folder } { file_name } /json/' )
166- output_folder_parquet = add_date_partition_key_to_s3_prefix (f'{ output_folder } { file_name } /parquet/' )
170+ output_folder_json = add_date_partition_key_to_s3_prefix (
171+ f"{ output_folder } { file_name } /json/"
172+ )
173+ output_folder_parquet = add_date_partition_key_to_s3_prefix (
174+ f"{ output_folder } { file_name } /parquet/"
175+ )
167176
168177 # convert response to json formatted string
169178 json_str = prepare_json (response = response )
170179
171180 # Upload the json string to landing only
172- upload_to_s3 (output_s3_bucket_landing , s3_client , json_str , f'{ output_folder_json } { file_name } .json' )
181+ upload_to_s3 (
182+ output_s3_bucket_landing ,
183+ s3_client ,
184+ json_str ,
185+ f"{ output_folder_json } { file_name } .json" ,
186+ )
173187
174188 # Upload parquet buffer to both S3 landing and raw; run crawler
175189 parquet_buffer_landing = json_to_parquet (response = response , label = file_name )
176190 parquet_buffer_landing .seek (0 )
177- s3_client .upload_fileobj (parquet_buffer_landing , output_s3_bucket_landing ,
178- f'{ output_folder_parquet } { file_name } .parquet' )
179- glue_client .start_crawler (Name = f'{ crawler_landing } { file_name } ' )
191+ s3_client .upload_fileobj (
192+ parquet_buffer_landing ,
193+ output_s3_bucket_landing ,
194+ f"{ output_folder_parquet } { file_name } .parquet" ,
195+ )
196+ glue_client .start_crawler (Name = f"{ crawler_landing } { file_name } " )
180197
181- parquet_buffer_raw = json_to_parquet_normalised (response = json_str , label = file_name )
198+ parquet_buffer_raw = json_to_parquet_normalised (
199+ response = json_str , label = file_name
200+ )
182201 parquet_buffer_raw .seek (0 )
183- s3_client .upload_fileobj (parquet_buffer_raw , output_s3_bucket_raw ,
184- f'{ output_folder_parquet } { file_name } .parquet' )
185- glue_client .start_crawler (Name = f'{ crawler_raw } { file_name } ' )
202+ s3_client .upload_fileobj (
203+ parquet_buffer_raw ,
204+ output_s3_bucket_raw ,
205+ f"{ output_folder_parquet } { file_name } .parquet" ,
206+ )
207+ glue_client .start_crawler (Name = f"{ crawler_raw } { file_name } " )
186208
187209 logger .info ("Job finished" )
188210
0 commit comments