1414from awsglue .dynamicframe import DynamicFrame
1515from awsglue .job import Job
1616from math import ceil
17- from scripts .helpers .helpers import get_glue_env_var , get_secret , add_import_time_columns , PARTITION_KEYS , table_exists_in_catalog
17+ from scripts .helpers .helpers import get_glue_env_var , get_secret , add_import_time_columns , PARTITION_KEYS , \
18+ table_exists_in_catalog
1819import json
1920from distutils .util import strtobool
2021
@@ -49,18 +50,20 @@ def get_tascomi_resource(page_number, url, body):
4950 res = {}
5051 try :
5152 res = requests .get (url , data = body , headers = headers )
52- if not res .text or json .loads (res .text ) == None :
53+ if not res .text or json .loads (res .text ) is None :
5354 print (f"Null data response, with status code { res .status_code } for page { page_number } " )
5455 return (["" ], url , res .status_code , "Null data response." )
5556 records = json .loads (res .text )
5657
57- serialized_records = [json .dumps (remove_gis_image (record )) for record in records if not_today (record ['last_updated' ]) ]
58+ serialized_records = [json .dumps (remove_gis_image (record )) for record in records if
59+ not_today (record ['last_updated' ])]
5860
5961 return (serialized_records , url , res .status_code , "" )
6062
6163 except Exception as e :
6264 exception = str (e )
63- print (f"ERROR: { exception } when getting page { page_number } . Status code { res .status_code } , response text { res .text } " )
65+ print (
66+ f"ERROR: { exception } when getting page { page_number } . Status code { res .status_code } , response text { res .text } " )
6467 return (["" ], url , res .status_code , exception )
6568
6669
@@ -73,7 +76,7 @@ def calculate_auth_hash(public_key, private_key):
7376 return base64 .b64encode (hmac .new (private_key .encode ('utf-8' ), token , hashlib .sha256 ).hexdigest ().encode ('utf-8' ))
7477
7578
76- def get_number_of_pages (resource , query = "" ):
79+ def get_number_of_pages (resource , query = "" ):
7780 global public_key
7881 global private_key
7982
@@ -88,22 +91,23 @@ def get_number_of_pages(resource, query = ""):
8891 res = requests .get (url , data = "" , headers = headers )
8992 if res .status_code == 202 :
9093 logger .info (f"received status code 202, whilst getting number of pages for { resource } , with query { query } " )
91- return { 'success' : True , 'number_of_pages' : 0 }
94+ return {'success' : True , 'number_of_pages' : 0 }
9295 if res .status_code == 200 :
9396 number_of_results = res .headers ['X-Number-Of-Results' ]
9497 results_per_page = res .headers ['X-Results-Per-Page' ]
9598
96- return { 'success' : True , 'number_of_pages' : ceil (int (number_of_results ) / int (results_per_page )) }
99+ return {'success' : True , 'number_of_pages' : ceil (int (number_of_results ) / int (results_per_page ))}
97100 error_message = f"Recieved status code { res .status_code } whilst trying to get number of pages for { resource } , { query } "
98101 logger .info (error_message )
99- return { 'success' : False , 'error_message' : error_message }
102+ return {'success' : False , 'error_message' : error_message }
100103
101104
102105def get_days_since_last_import (last_import_date ):
103106 yesterday = datetime .now ()
104107 last_import_datetime = datetime .strptime (last_import_date , "%Y%m%d" )
105108 number_days_to_query = (yesterday - last_import_datetime ).days
106- days = [ datetime .strftime (yesterday - timedelta (days = day ), "%Y-%m-%d" ) for day in range (1 , number_days_to_query + 1 )]
109+ days = [datetime .strftime (yesterday - timedelta (days = day ), "%Y-%m-%d" ) for day in
110+ range (1 , number_days_to_query + 1 )]
107111 days .sort ()
108112 return days
109113
@@ -114,7 +118,9 @@ def get_last_import_date(glue_context, database, resource):
114118 return None
115119
116120 logger .info (f"found table for { resource } api response in { database } " )
117- return glue_context .sql (f"SELECT max(import_date) as max_import_date FROM `{ database } `.api_response_{ resource } where import_api_status_code = '200'" ).take (1 )[0 ].max_import_date
121+ return glue_context .sql (
122+ f"SELECT max(import_date) as max_import_date FROM `{ database } `.api_response_{ resource } where import_api_status_code = '200'" ).take (
123+ 1 )[0 ].max_import_date
118124
119125
120126def throw_if_unsuccessful (success_state , message ):
@@ -123,8 +129,9 @@ def throw_if_unsuccessful(success_state, message):
123129
124130
125131def get_failures_from_last_import (database , resource , last_import_date ):
126- requests_df = glue_context .sql (f"SELECT page_number, import_api_url_requested as url, '' as body from `{ database } `.api_response_{ resource } where import_api_status_code != '200' and import_date={ last_import_date } " )
127- return { "requests" : requests_df , "count" : requests_df .count () }
132+ requests_df = glue_context .sql (
133+ f"SELECT page_number, import_api_url_requested as url, '' as body from `{ database } `.api_response_{ resource } where import_api_status_code != '200' and import_date={ last_import_date } " )
134+ return {"requests" : requests_df , "count" : requests_df .count ()}
128135
129136
130137def get_requests_since_last_import (resource , last_import_date ):
@@ -136,13 +143,15 @@ def get_requests_since_last_import(resource, last_import_date):
136143
137144 number_of_pages = number_of_pages_reponse ["number_of_pages" ]
138145 logger .info (f"Number of pages to retrieve for { day } : { number_of_pages } " )
139- requests_list += [ RequestRow (page_number , f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } &last_updated={ day } ' , "" ) for page_number in range (1 , number_of_pages + 1 )]
146+ requests_list += [RequestRow (page_number ,
147+ f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } &last_updated={ day } ' ,
148+ "" ) for page_number in range (1 , number_of_pages + 1 )]
140149 number_of_requests = len (requests_list )
141150 if number_of_requests == 0 :
142- return { "requests" : [], "count" : 0 }
151+ return {"requests" : [], "count" : 0 }
143152 requests_list = sc .parallelize (requests_list )
144153 requests_list = glue_context .createDataFrame (requests_list )
145- return { "requests" : requests_list , "count" : number_of_requests }
154+ return {"requests" : requests_list , "count" : number_of_requests }
146155
147156
148157def get_requests_for_full_load (resource ):
@@ -152,19 +161,22 @@ def get_requests_for_full_load(resource):
152161
153162 number_of_pages = number_of_pages_reponse ["number_of_pages" ]
154163 logger .info (f"Number of pages to retrieve: { number_of_pages } " )
155- requests_list = [RequestRow (page_number , f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } ' , "" ) for page_number in range (1 , number_of_pages + 1 )]
164+ requests_list = [
165+ RequestRow (page_number , f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } ' , "" ) for
166+ page_number in range (1 , number_of_pages + 1 )]
156167 number_of_requests = len (requests_list )
157168 requests_list = sc .parallelize (requests_list )
158169 requests_list = glue_context .createDataFrame (requests_list )
159- return { "requests" : requests_list , "count" : number_of_requests }
170+ return {"requests" : requests_list , "count" : number_of_requests }
160171
161172
162173def get_requests (last_import_date , resource , database ):
163174 retry_arg_value = get_glue_env_var ('retry_failure_from_previous_import' , 'false' )
164175 try :
165176 retry_failures = strtobool (retry_arg_value )
166177 except ValueError :
167- raise Exception (f"--retry_failure_from_previous_import value must be recognised as a bool, received: { retry_arg_value } ." )
178+ raise Exception (
179+ f"--retry_failure_from_previous_import value must be recognised as a bool, received: { retry_arg_value } ." )
168180
169181 if not last_import_date :
170182 logger .info (f"Retrieving full load of data" )
@@ -193,14 +205,14 @@ def remove_gis_image(records):
193205
194206def retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource , requests_list , partitions ):
195207 request_df = requests_list .repartition (partitions )
196- response_df = request_df .withColumn ("response" , get_tascomi_resource_udf (col ("page_number" ), col ("url" ), col ("body" )))
208+ response_df = request_df .withColumn ("response" ,
209+ get_tascomi_resource_udf (col ("page_number" ), col ("url" ), col ("body" )))
197210
198- tascomi_responses_df = response_df .select ( \
199- col ("page_number" ),
200- explode (col ("response.response_data" )).alias (f"{ resource } " ), \
201- col ("response.import_api_url_requested" ).alias ("import_api_url_requested" ), \
202- col ("response.import_api_status_code" ).alias ("import_api_status_code" ), \
203- col ("response.import_exception_thrown" ).alias ("import_exception_thrown" ))
211+ tascomi_responses_df = response_df .select (col ("page_number" ),
212+ explode (col ("response.response_data" )).alias (f"{ resource } " ),
213+ col ("response.import_api_url_requested" ).alias ("import_api_url_requested" ),
214+ col ("response.import_api_status_code" ).alias ("import_api_status_code" ),
215+ col ("response.import_exception_thrown" ).alias ("import_exception_thrown" ))
204216
205217 tascomi_responses_df = add_import_time_columns (tascomi_responses_df )
206218
@@ -215,9 +227,10 @@ def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, reque
215227
216228
217229def get_failed_requests (data_frame ):
218- return data_frame \
219- .where ((data_frame .import_api_status_code != '200' ) & (data_frame .import_api_status_code != '202' ))\
220- .select (data_frame .page_number .alias ("page_number" ), data_frame .import_api_url_requested .alias ("url" ), lit ("" ).alias ("body" ))
230+ return data_frame \
231+ .where ((data_frame .import_api_status_code != '200' ) & (data_frame .import_api_status_code != '202' )) \
232+ .select (data_frame .page_number .alias ("page_number" ), data_frame .import_api_url_requested .alias ("url" ),
233+ lit ("" ).alias ("body" ))
221234
222235
223236if __name__ == "__main__" :
@@ -261,7 +274,8 @@ def get_failed_requests(data_frame):
261274 partitions = calculate_number_of_partitions (requests_list ["count" ], number_of_workers )
262275 logger .info (f"Using { partitions } partitions to repartition the RDD." )
263276
264- tascomi_responses = retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource , requests_list ["requests" ], partitions )
277+ tascomi_responses = retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource ,
278+ requests_list ["requests" ], partitions )
265279 else :
266280 logger .info ("No requests, exiting job" )
267281
0 commit comments