33from pyspark .sql .functions import udf , col , explode , lit
44from pyspark .sql .types import StructType , StructField , StringType , ArrayType
55from pyspark .sql import Row
6- import hmac , hashlib ;
7- import base64 ;
6+ import hmac
7+ import hashlib
8+ import base64
89from datetime import datetime , timedelta
910from dateutil import tz
10- from awsglue .transforms import *
1111from awsglue .utils import getResolvedOptions
1212from pyspark .context import SparkContext
1313from awsglue .context import GlueContext
1818import json
1919from distutils .util import strtobool
2020
21+
2122def authenticate_tascomi (headers , public_key , private_key ):
2223 auth_hash = calculate_auth_hash (public_key , private_key )
2324 headers ['X-Public' ] = public_key
2425 headers ['X-Hash' ] = auth_hash
2526 return headers
2627
28+
2729def not_today (date_str ):
2830 if not date_str :
2931 return True
3032 date = datetime .strptime (date_str [:19 ], "%Y-%m-%d %H:%M:%S" )
3133 return date .date () != datetime .now ().date ()
3234
35+
3336def get_tascomi_resource (page_number , url , body ):
3437 global public_key
3538 global private_key
@@ -60,6 +63,7 @@ def get_tascomi_resource(page_number, url, body):
6063 print (f"ERROR: { exception } when getting page { page_number } . Status code { res .status_code } , response text { res .text } " )
6164 return (["" ], url , res .status_code , exception )
6265
66+
6367def calculate_auth_hash (public_key , private_key ):
6468 tz_ldn = tz .gettz ('Europe/London' )
6569 now = datetime .now (tz_ldn )
@@ -68,6 +72,7 @@ def calculate_auth_hash(public_key, private_key):
6872 token = crypt .hexdigest ().encode ('utf-8' )
6973 return base64 .b64encode (hmac .new (private_key .encode ('utf-8' ), token , hashlib .sha256 ).hexdigest ().encode ('utf-8' ))
7074
75+
7176def get_number_of_pages (resource , query = "" ):
7277 global public_key
7378 global private_key
@@ -93,6 +98,7 @@ def get_number_of_pages(resource, query = ""):
9398 logger .info (error_message )
9499 return { 'success' : False , 'error_message' : error_message }
95100
101+
96102def get_days_since_last_import (last_import_date ):
97103 yesterday = datetime .now ()
98104 last_import_datetime = datetime .strptime (last_import_date , "%Y%m%d" )
@@ -101,23 +107,26 @@ def get_days_since_last_import(last_import_date):
101107 days .sort ()
102108 return days
103109
104- def get_last_import_date (glue_context , database , resource ):
105110
111+ def get_last_import_date (glue_context , database , resource ):
106112 if not table_exists_in_catalog (glue_context , f"api_response_{ resource } " , database ):
107113 logger .info (f"Couldn't find table api_response_{ resource } in database { database } ." )
108114 return None
109115
110116 logger .info (f"found table for { resource } api response in { database } " )
111117 return glue_context .sql (f"SELECT max(import_date) as max_import_date FROM `{ database } `.api_response_{ resource } where import_api_status_code = '200'" ).take (1 )[0 ].max_import_date
112118
119+
113120def throw_if_unsuccessful (success_state , message ):
114121 if not success_state :
115122 raise Exception (message )
116123
124+
117125def get_failures_from_last_import (database , resource , last_import_date ):
118126 requests_df = glue_context .sql (f"SELECT page_number, import_api_url_requested as url, '' as body from `{ database } `.api_response_{ resource } where import_api_status_code != '200' and import_date={ last_import_date } " )
119127 return { "requests" : requests_df , "count" : requests_df .count () }
120-
128+
129+
121130def get_requests_since_last_import (resource , last_import_date ):
122131 requests_list = []
123132 for day in get_days_since_last_import (last_import_date ):
@@ -135,6 +144,7 @@ def get_requests_since_last_import(resource, last_import_date):
135144 requests_list = glue_context .createDataFrame (requests_list )
136145 return { "requests" : requests_list , "count" : number_of_requests }
137146
147+
138148def get_requests_for_full_load (resource ):
139149 number_of_pages_reponse = get_number_of_pages (resource )
140150
@@ -175,10 +185,12 @@ def calculate_number_of_partitions(number_of_requests, number_of_workers):
175185 else :
176186 return max_partitions
177187
188+
178189def remove_gis_image (records ):
179190 records .pop ("gis_map_image_base64" , None )
180191 return records
181192
193+
182194def retrieve_and_write_tascomi_data (glue_context , s3_target_url , resource , requests_list , partitions ):
183195 request_df = requests_list .repartition (partitions )
184196 response_df = request_df .withColumn ("response" , get_tascomi_resource_udf (col ("page_number" ), col ("url" ), col ("body" )))
@@ -201,11 +213,13 @@ def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, reque
201213
202214 return tascomi_responses_df
203215
216+
204217def get_failed_requests (data_frame ):
205218 return data_frame \
206219 .where ((data_frame .import_api_status_code != '200' ) & (data_frame .import_api_status_code != '202' ))\
207220 .select (data_frame .page_number .alias ("page_number" ), data_frame .import_api_url_requested .alias ("url" ), lit ("" ).alias ("body" ))
208221
222+
209223if __name__ == "__main__" :
210224 args = getResolvedOptions (sys .argv , ['JOB_NAME' ])
211225 sc = SparkContext .getOrCreate ()
0 commit comments