33from pyspark .sql .functions import udf , col , explode , lit
44from pyspark .sql .types import StructType , StructField , StringType , ArrayType
55from pyspark .sql import Row
6- import hmac , hashlib ;
7- import base64 ;
6+ import hmac
7+ import hashlib
8+ import base64
89from datetime import datetime , timedelta
910from dateutil import tz
10- from awsglue .transforms import *
1111from awsglue .utils import getResolvedOptions
1212from pyspark .context import SparkContext
1313from awsglue .context import GlueContext
1414from awsglue .dynamicframe import DynamicFrame
1515from awsglue .job import Job
1616from math import ceil
17- from scripts .helpers .helpers import get_glue_env_var , get_secret , add_import_time_columns , PARTITION_KEYS , table_exists_in_catalog
17+ from scripts .helpers .helpers import get_glue_env_var , get_secret , add_import_time_columns , PARTITION_KEYS , \
18+ table_exists_in_catalog
1819import json
1920from distutils .util import strtobool
2021
22+
2123def authenticate_tascomi (headers , public_key , private_key ):
2224 auth_hash = calculate_auth_hash (public_key , private_key )
2325 headers ['X-Public' ] = public_key
2426 headers ['X-Hash' ] = auth_hash
2527 return headers
2628
29+
2730def not_today (date_str ):
2831 if not date_str :
2932 return True
3033 date = datetime .strptime (date_str [:19 ], "%Y-%m-%d %H:%M:%S" )
3134 return date .date () != datetime .now ().date ()
3235
36+
3337def get_tascomi_resource (page_number , url , body ):
3438 global public_key
3539 global private_key
@@ -46,20 +50,23 @@ def get_tascomi_resource(page_number, url, body):
4650 res = {}
4751 try :
4852 res = requests .get (url , data = body , headers = headers )
49- if not res .text or json .loads (res .text ) == None :
53+ if not res .text or json .loads (res .text ) is None :
5054 print (f"Null data response, with status code { res .status_code } for page { page_number } " )
5155 return (["" ], url , res .status_code , "Null data response." )
5256 records = json .loads (res .text )
5357
54- serialized_records = [json .dumps (remove_gis_image (record )) for record in records if not_today (record ['last_updated' ]) ]
58+ serialized_records = [json .dumps (remove_gis_image (record )) for record in records if
59+ not_today (record ['last_updated' ])]
5560
5661 return (serialized_records , url , res .status_code , "" )
5762
5863 except Exception as e :
5964 exception = str (e )
60- print (f"ERROR: { exception } when getting page { page_number } . Status code { res .status_code } , response text { res .text } " )
65+ print (
66+ f"ERROR: { exception } when getting page { page_number } . Status code { res .status_code } , response text { res .text } " )
6167 return (["" ], url , res .status_code , exception )
6268
69+
6370def calculate_auth_hash (public_key , private_key ):
6471 tz_ldn = tz .gettz ('Europe/London' )
6572 now = datetime .now (tz_ldn )
@@ -68,7 +75,8 @@ def calculate_auth_hash(public_key, private_key):
6875 token = crypt .hexdigest ().encode ('utf-8' )
6976 return base64 .b64encode (hmac .new (private_key .encode ('utf-8' ), token , hashlib .sha256 ).hexdigest ().encode ('utf-8' ))
7077
71- def get_number_of_pages (resource , query = "" ):
78+
79+ def get_number_of_pages (resource , query = "" ):
7280 global public_key
7381 global private_key
7482
@@ -79,45 +87,53 @@ def get_number_of_pages(resource, query = ""):
7987
8088 headers = authenticate_tascomi (headers , public_key , private_key )
8189
82- url = f'https://hackney-planning.tascomi .com/rest/v1/{ resource } { query } '
90+ url = f'https://hackney-planning.idoxcloud .com/rest/v1/{ resource } { query } '
8391 res = requests .get (url , data = "" , headers = headers )
8492 if res .status_code == 202 :
8593 logger .info (f"received status code 202, whilst getting number of pages for { resource } , with query { query } " )
86- return { 'success' : True , 'number_of_pages' : 0 }
94+ return {'success' : True , 'number_of_pages' : 0 }
8795 if res .status_code == 200 :
8896 number_of_results = res .headers ['X-Number-Of-Results' ]
8997 results_per_page = res .headers ['X-Results-Per-Page' ]
9098
91- return { 'success' : True , 'number_of_pages' : ceil (int (number_of_results ) / int (results_per_page )) }
99+ return {'success' : True , 'number_of_pages' : ceil (int (number_of_results ) / int (results_per_page ))}
92100 error_message = f"Recieved status code { res .status_code } whilst trying to get number of pages for { resource } , { query } "
93101 logger .info (error_message )
94- return { 'success' : False , 'error_message' : error_message }
102+ return {'success' : False , 'error_message' : error_message }
103+
95104
96105def get_days_since_last_import (last_import_date ):
97106 yesterday = datetime .now ()
98107 last_import_datetime = datetime .strptime (last_import_date , "%Y%m%d" )
99108 number_days_to_query = (yesterday - last_import_datetime ).days
100- days = [ datetime .strftime (yesterday - timedelta (days = day ), "%Y-%m-%d" ) for day in range (1 , number_days_to_query + 1 )]
109+ days = [datetime .strftime (yesterday - timedelta (days = day ), "%Y-%m-%d" ) for day in
110+ range (1 , number_days_to_query + 1 )]
101111 days .sort ()
102112 return days
103113
104- def get_last_import_date (glue_context , database , resource ):
105114
115+ def get_last_import_date (glue_context , database , resource ):
106116 if not table_exists_in_catalog (glue_context , f"api_response_{ resource } " , database ):
107117 logger .info (f"Couldn't find table api_response_{ resource } in database { database } ." )
108118 return None
109119
110120 logger .info (f"found table for { resource } api response in { database } " )
111- return glue_context .sql (f"SELECT max(import_date) as max_import_date FROM `{ database } `.api_response_{ resource } where import_api_status_code = '200'" ).take (1 )[0 ].max_import_date
121+ return glue_context .sql (
122+ f"SELECT max(import_date) as max_import_date FROM `{ database } `.api_response_{ resource } where import_api_status_code = '200'" ).take (
123+ 1 )[0 ].max_import_date
124+
112125
113126def throw_if_unsuccessful (success_state , message ):
114127 if not success_state :
115128 raise Exception (message )
116129
130+
117131def get_failures_from_last_import (database , resource , last_import_date ):
118- requests_df = glue_context .sql (f"SELECT page_number, import_api_url_requested as url, '' as body from `{ database } `.api_response_{ resource } where import_api_status_code != '200' and import_date={ last_import_date } " )
119- return { "requests" : requests_df , "count" : requests_df .count () }
120-
132+ requests_df = glue_context .sql (
133+ f"SELECT page_number, import_api_url_requested as url, '' as body from `{ database } `.api_response_{ resource } where import_api_status_code != '200' and import_date={ last_import_date } " )
134+ return {"requests" : requests_df , "count" : requests_df .count ()}
135+
136+
121137def get_requests_since_last_import (resource , last_import_date ):
122138 requests_list = []
123139 for day in get_days_since_last_import (last_import_date ):
@@ -127,13 +143,16 @@ def get_requests_since_last_import(resource, last_import_date):
127143
128144 number_of_pages = number_of_pages_reponse ["number_of_pages" ]
129145 logger .info (f"Number of pages to retrieve for { day } : { number_of_pages } " )
130- requests_list += [ RequestRow (page_number , f'https://hackney-planning.tascomi.com/rest/v1/{ resource } ?page={ page_number } &last_updated={ day } ' , "" ) for page_number in range (1 , number_of_pages + 1 )]
146+ requests_list += [RequestRow (page_number ,
147+ f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } &last_updated={ day } ' ,
148+ "" ) for page_number in range (1 , number_of_pages + 1 )]
131149 number_of_requests = len (requests_list )
132150 if number_of_requests == 0 :
133- return { "requests" : [], "count" : 0 }
151+ return {"requests" : [], "count" : 0 }
134152 requests_list = sc .parallelize (requests_list )
135153 requests_list = glue_context .createDataFrame (requests_list )
136- return { "requests" : requests_list , "count" : number_of_requests }
154+ return {"requests" : requests_list , "count" : number_of_requests }
155+
137156
138157def get_requests_for_full_load (resource ):
139158 number_of_pages_reponse = get_number_of_pages (resource )
@@ -142,22 +161,25 @@ def get_requests_for_full_load(resource):
142161
143162 number_of_pages = number_of_pages_reponse ["number_of_pages" ]
144163 logger .info (f"Number of pages to retrieve: { number_of_pages } " )
145- requests_list = [RequestRow (page_number , f'https://hackney-planning.tascomi.com/rest/v1/{ resource } ?page={ page_number } ' , "" ) for page_number in range (1 , number_of_pages + 1 )]
164+ requests_list = [
165+ RequestRow (page_number , f'https://hackney-planning.idoxcloud.com/rest/v1/{ resource } ?page={ page_number } ' , "" ) for
166+ page_number in range (1 , number_of_pages + 1 )]
146167 number_of_requests = len (requests_list )
147168 requests_list = sc .parallelize (requests_list )
148169 requests_list = glue_context .createDataFrame (requests_list )
149- return { "requests" : requests_list , "count" : number_of_requests }
170+ return {"requests" : requests_list , "count" : number_of_requests }
150171
151172
152173def get_requests (last_import_date , resource , database ):
153174 retry_arg_value = get_glue_env_var ('retry_failure_from_previous_import' , 'false' )
154175 try :
155176 retry_failures = strtobool (retry_arg_value )
156177 except ValueError :
157- raise Exception (f"--retry_failure_from_previous_import value must be recognised as a bool, received: { retry_arg_value } ." )
178+ raise Exception (
179+ f"--retry_failure_from_previous_import value must be recognised as a bool, received: { retry_arg_value } ." )
158180
159181 if not last_import_date :
160- logger .info (f "Retrieving full load of data" )
182+ logger .info ("Retrieving full load of data" )
161183 return get_requests_for_full_load (resource )
162184 if retry_failures :
163185 logger .info (f"Getting failed requests from import on date { last_import_date } " )
@@ -175,20 +197,22 @@ def calculate_number_of_partitions(number_of_requests, number_of_workers):
175197 else :
176198 return max_partitions
177199
200+
178201def remove_gis_image (records ):
179202 records .pop ("gis_map_image_base64" , None )
180203 return records
181204
205+
182206def retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource , requests_list , partitions ):
183207 request_df = requests_list .repartition (partitions )
184- response_df = request_df .withColumn ("response" , get_tascomi_resource_udf (col ("page_number" ), col ("url" ), col ("body" )))
208+ response_df = request_df .withColumn ("response" ,
209+ get_tascomi_resource_udf (col ("page_number" ), col ("url" ), col ("body" )))
185210
186- tascomi_responses_df = response_df .select ( \
187- col ("page_number" ),
188- explode (col ("response.response_data" )).alias (f"{ resource } " ), \
189- col ("response.import_api_url_requested" ).alias ("import_api_url_requested" ), \
190- col ("response.import_api_status_code" ).alias ("import_api_status_code" ), \
191- col ("response.import_exception_thrown" ).alias ("import_exception_thrown" ))
211+ tascomi_responses_df = response_df .select (col ("page_number" ),
212+ explode (col ("response.response_data" )).alias (f"{ resource } " ),
213+ col ("response.import_api_url_requested" ).alias ("import_api_url_requested" ),
214+ col ("response.import_api_status_code" ).alias ("import_api_status_code" ),
215+ col ("response.import_exception_thrown" ).alias ("import_exception_thrown" ))
192216
193217 tascomi_responses_df = add_import_time_columns (tascomi_responses_df )
194218
@@ -201,10 +225,13 @@ def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, reque
201225
202226 return tascomi_responses_df
203227
228+
204229def get_failed_requests (data_frame ):
205- return data_frame \
206- .where ((data_frame .import_api_status_code != '200' ) & (data_frame .import_api_status_code != '202' ))\
207- .select (data_frame .page_number .alias ("page_number" ), data_frame .import_api_url_requested .alias ("url" ), lit ("" ).alias ("body" ))
230+ return data_frame \
231+ .where ((data_frame .import_api_status_code != '200' ) & (data_frame .import_api_status_code != '202' )) \
232+ .select (data_frame .page_number .alias ("page_number" ), data_frame .import_api_url_requested .alias ("url" ),
233+ lit ("" ).alias ("body" ))
234+
208235
209236if __name__ == "__main__" :
210237 args = getResolvedOptions (sys .argv , ['JOB_NAME' ])
@@ -247,7 +274,8 @@ def get_failed_requests(data_frame):
247274 partitions = calculate_number_of_partitions (requests_list ["count" ], number_of_workers )
248275 logger .info (f"Using { partitions } partitions to repartition the RDD." )
249276
250- tascomi_responses = retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource , requests_list ["requests" ], partitions )
277+ tascomi_responses = retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource ,
278+ requests_list ["requests" ], partitions )
251279 else :
252280 logger .info ("No requests, exiting job" )
253281
0 commit comments