1313logging .basicConfig (level = logging .INFO )
1414logger = logging .getLogger (__name__ )
1515
16- arg_keys = ['region_name' , 's3_endpoint' , 's3_target_location' , 's3_staging_location' , 'target_database' ,
17- 'target_table' , 'secret_name' ]
18- partition_keys = ['import_year' , 'import_month' , 'import_day' , 'import_date' ]
16+ arg_keys = [
17+ "region_name" ,
18+ "s3_endpoint" ,
19+ "s3_target_location" ,
20+ "s3_staging_location" ,
21+ "target_database" ,
22+ "target_table" ,
23+ "secret_name" ,
24+ ]
25+ partition_keys = ["import_year" , "import_month" , "import_day" , "import_date" ]
1926
2027import boto3
2128
4148
4249### Functions ###
4350
51+
4452def remove_illegal_characters (string ):
45- """Removes illegal characters from string"""
46- regex_list = [['=' , "" ], ['\/' , "_" ], ['+' , "-" ]]
47- for r in regex_list :
48- clean_string = re .sub (string = string ,
49- pattern = "[{}]" .format (r [0 ]),
50- repl = r [1 ])
51- return clean_string
53+ """Removes illegal characters from string by replacing:
54+ = with empty string
55+ / with _
56+ + with -
57+ """
58+ replacements = {"=" : "" , "/" : "_" , "+" : "-" }
59+
60+ result = string
61+ for old , new in replacements .items ():
62+ result = result .replace (old , new )
63+
64+ return result
5265
5366
5467def encode_json (json_string ):
@@ -62,32 +75,34 @@ def encode_json(json_string):
6275def create_signature (header , payload , secret ):
6376 """Encode JSON string"""
6477 # hashed header, hashed payload, string secret
65- unsigned_token = header + '.' + payload
78+ unsigned_token = header + "." + payload
6679 # secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere??
67- key_bytes = bytes (secret , 'utf-8' )
68- string_to_sign_bytes = bytes (unsigned_token , 'utf-8' )
69- signature_hash = hmac .new (key_bytes , string_to_sign_bytes , digestmod = hashlib .sha256 ).digest ()
80+ key_bytes = bytes (secret , "utf-8" )
81+ string_to_sign_bytes = bytes (unsigned_token , "utf-8" )
82+ signature_hash = hmac .new (
83+ key_bytes , string_to_sign_bytes , digestmod = hashlib .sha256
84+ ).digest ()
7085 encoded_signature = base64 .b64encode (signature_hash )
71- encoded_signature = encoded_signature .decode (' utf-8' )
86+ encoded_signature = encoded_signature .decode (" utf-8" )
7287 encoded_signature = remove_illegal_characters (encoded_signature )
7388 return encoded_signature
7489
7590
7691def get_token (url , encoded_header , encoded_payload , signature , headers ):
7792 """Get token"""
7893 assertion = encoded_header + "." + encoded_payload + "." + signature
79- data = f' assertion={ assertion } &grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer'
80- print (f' Data : { data } ' )
94+ data = f" assertion={ assertion } &grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer"
95+ print (f" Data : { data } " )
8196 response = requests .request ("POST" , url , headers = headers , data = data )
8297 return response
8398
8499
85100def get_icaseworks_report_from (report_id , fromdate , auth_headers , auth_payload ):
86101 report_url = "https://hackneyreports.icasework.com/getreport?"
87- request_url = f' { report_url } ReportId={ report_id } &Format=json&From={ fromdate } '
88- print (f' Request url: { request_url } ' )
102+ request_url = f" { report_url } ReportId={ report_id } &Format=json&From={ fromdate } "
103+ print (f" Request url: { request_url } " )
89104 r = requests .request ("GET" , request_url , headers = auth_headers , data = auth_payload )
90- print (f' Status Code: { r .status_code } ' )
105+ print (f" Status Code: { r .status_code } " )
91106 return r
92107
93108
@@ -96,25 +111,25 @@ def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload
96111
97112 now = str (datetime .now ())
98113 timetouse = now [:19 ]
99- print (f' TimeNow: { timetouse } ' )
114+ print (f" TimeNow: { timetouse } " )
100115
101- request_url = f' { report_url } ReportId={ report_id } &Format=json&FromTime={ timestamp_to_call } &UntilTime={ timetouse } '
102- print (f' Request url: { request_url } ' )
116+ request_url = f" { report_url } ReportId={ report_id } &Format=json&FromTime={ timestamp_to_call } &UntilTime={ timetouse } "
117+ print (f" Request url: { request_url } " )
103118 r = requests .request ("GET" , request_url , headers = auth_headers , data = auth_payload )
104- print (f' Status Code: { r .status_code } ' )
119+ print (f" Status Code: { r .status_code } " )
105120 return r
106121
107122
108123def dump_dataframe (response , location , filename ):
109- df = pd .DataFrame .from_dict (response .json (), orient = ' columns' )
124+ df = pd .DataFrame .from_dict (response .json (), orient = " columns" )
110125
111- df [' import_year' ] = datetime .today ().year
112- df [' import_month' ] = datetime .today ().month
113- df [' import_day' ] = datetime .today ().day
114- df [' import_date' ] = datetime .today ().strftime (' %Y%m%d' )
126+ df [" import_year" ] = datetime .today ().year
127+ df [" import_month" ] = datetime .today ().month
128+ df [" import_day" ] = datetime .today ().day
129+ df [" import_date" ] = datetime .today ().strftime (" %Y%m%d" )
115130
116- print (f' Database: { target_database } ' )
117- print (f' Table: { target_table } ' )
131+ print (f" Database: { target_database } " )
132+ print (f" Table: { target_table } " )
118133
119134 # write to s3
120135 wr .s3 .to_parquet (
@@ -124,28 +139,27 @@ def dump_dataframe(response, location, filename):
124139 database = target_database ,
125140 table = target_table ,
126141 mode = "overwrite_partitions" ,
127- partition_cols = partition_keys
142+ partition_cols = partition_keys ,
128143 )
129- print (f' Dumped Dataframe { df .shape } to { s3_target_location } ' )
130- logger .info (f' Dumped Dataframe { df .shape } to { s3_target_location } ' )
144+ print (f" Dumped Dataframe { df .shape } to { s3_target_location } " )
145+ logger .info (f" Dumped Dataframe { df .shape } to { s3_target_location } " )
131146
132147
133148def get_latest_timestamp (table_dict ):
134149 # try:
135- print (f' Getting max timestamp' )
150+ print (f" Getting max timestamp" )
136151 # 2025-01-05T15:06:16
137152
138153 sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"'
139154
140- conn = connect (s3_staging_dir = s3_staging_location ,
141- region_name = region_name )
155+ conn = connect (s3_staging_dir = s3_staging_location , region_name = region_name )
142156
143157 df = pd .read_sql_query (sql_query , conn )
144158 latest_date = df .iloc [0 , 0 ]
145159 latest_date = latest_date .replace ("T" , " " )
146160
147- print (f' dataframe outputting' )
148- print (f' Time Found: { latest_date } ' )
161+ print (f" dataframe outputting" )
162+ print (f" Time Found: { latest_date } " )
149163
150164 return latest_date
151165
@@ -155,87 +169,95 @@ def get_latest_timestamp(table_dict):
155169# print(f'No Data Found. Will use {date_to_return}')
156170# return date_to_return
157171
172+
158173def authenticate_icaseworks (api_key , secret ):
159174 url = "https://hackney.icasework.com/token"
160175
161- headers = {
162- 'Content-Type' : 'application/x-www-form-urlencoded'
163- }
176+ headers = {"Content-Type" : "application/x-www-form-urlencoded" }
164177
165178 header_object = {"alg" : "HS256" , "typ" : "JWT" }
166179
167180 # Create Header
168181 header_object = str (header_object ).replace ("'" , '"' ).replace (" " , "" )
169182 header = encode_json (header_object )
170- print (f' Header: { header } ' )
183+ print (f" Header: { header } " )
171184
172185 # Create payload
173186 current_unix_time = int (time .time ())
174187 str_time = str (current_unix_time )
175- payload_object = {
176- "iss" : api_key ,
177- "aud" : url ,
178- "iat" : str_time
179- }
180- payload_object = str (payload_object ).replace ("'" , '"' ).replace (" " ,
181- "" ) # can we do a dict-to-string function for this and the header
188+ payload_object = {"iss" : api_key , "aud" : url , "iat" : str_time }
189+ payload_object = (
190+ str (payload_object ).replace ("'" , '"' ).replace (" " , "" )
191+ ) # can we do a dict-to-string function for this and the header
182192
183193 payload = encode_json (str (payload_object ))
184- print (f' Created Payload: { payload } ' )
194+ print (f" Created Payload: { payload } " )
185195
186196 # Create Signature
187197 signature = create_signature (header , payload , secret )
188- print (f' Created Signature: { signature } ' )
198+ print (f" Created Signature: { signature } " )
189199
190200 # Get assertion
191201 assertion = header + "." + payload + "." + signature
192- print (f' assertion: { assertion } ' )
202+ print (f" assertion: { assertion } " )
193203
194204 # Get response
195- response = get_token (url = url , encoded_header = header , encoded_payload = payload , signature = signature , headers = headers )
205+ response = get_token (
206+ url = url ,
207+ encoded_header = header ,
208+ encoded_payload = payload ,
209+ signature = signature ,
210+ headers = headers ,
211+ )
196212 print (response )
197213
198214 # Get token
199- auth_token = response .json ().get (' access_token' )
200- print (f' auth token: { auth_token } ' )
215+ auth_token = response .json ().get (" access_token" )
216+ print (f" auth token: { auth_token } " )
201217
202218 # Create auth header for API Calls and auth payload
203219
204- authorization = f' Bearer { auth_token } '
220+ authorization = f" Bearer { auth_token } "
205221 print (authorization )
206222
207223 auth_payload = []
208224
209225 # Note: I don't know how to generate the below cookie. That is extracted using postman. Not sure how to recreate this at all
210- auth_headers = {
211- 'Authorization' : authorization
212- }
213- print (f'' )
226+ auth_headers = {"Authorization" : authorization }
227+ print (f"" )
214228 return auth_payload , auth_headers
215229
216230
217231def get_data (table_dict , date_to_call , auth_headers , auth_payload ):
218232 dict_to_call = table_dict
219233
220- print (f' Pulling report for { dict_to_call [" name" ] } ' )
234+ print (f" Pulling report for { dict_to_call [' name' ] } " )
221235 case_id_report_id = dict_to_call ["reportid" ]
222- case_id_list = get_report_fromtime (case_id_report_id , date_to_call , auth_headers , auth_payload )
236+ case_id_list = get_report_fromtime (
237+ case_id_report_id , date_to_call , auth_headers , auth_payload
238+ )
223239 # print(f'Type of case_id_list {type(case_id_list)}')
224240
225- dict_to_call ["DF" ] = case_id_list # This will append the response to the DF column in the dictionary
241+ dict_to_call ["DF" ] = (
242+ case_id_list # This will append the response to the DF column in the dictionary
243+ )
226244
227245 dump_dataframe (dict_to_call ["DF" ], dict_to_call ["location" ], date_to_call )
228246
229247
230248def get_data_from (table_dict , date_to_call , auth_headers , auth_payload ):
231249 dict_to_call = table_dict
232250
233- print (f' Pulling report for { dict_to_call [" name" ] } ' )
251+ print (f" Pulling report for { dict_to_call [' name' ] } " )
234252 case_id_report_id = dict_to_call ["reportid" ]
235- case_id_list = get_icaseworks_report_from (case_id_report_id , date_to_call , auth_headers , auth_payload )
253+ case_id_list = get_icaseworks_report_from (
254+ case_id_report_id , date_to_call , auth_headers , auth_payload
255+ )
236256 # print(f'Type of case_id_list {type(case_id_list)}')
237257
238- dict_to_call ["DF" ] = case_id_list # This will append the response to the DF column in the dictionary
258+ dict_to_call ["DF" ] = (
259+ case_id_list # This will append the response to the DF column in the dictionary
260+ )
239261
240262 dump_dataframe (dict_to_call ["DF" ], dict_to_call ["location" ], date .today ())
241263
@@ -249,35 +271,41 @@ def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_nam
249271
250272### main function ##
251273
274+
252275def main ():
253- secrets_manager_client = boto3 .client ('secretsmanager' )
254- api_credentials_response = retrieve_credentials_from_secrets_manager (secrets_manager_client , secret_name )
255- api_credentials = json .loads (api_credentials_response ['SecretString' ])
276+ secrets_manager_client = boto3 .client ("secretsmanager" )
277+ api_credentials_response = retrieve_credentials_from_secrets_manager (
278+ secrets_manager_client , secret_name
279+ )
280+ api_credentials = json .loads (api_credentials_response ["SecretString" ])
256281 api_key = api_credentials .get ("api_key" )
257- print (f' Api_key: { api_key } ' )
282+ print (f" Api_key: { api_key } " )
258283 secret = api_credentials .get ("secret" )
259- print (f' Secret: { secret } ' )
284+ print (f" Secret: { secret } " )
260285 auth_payload , auth_headers = authenticate_icaseworks (api_key , secret )
261286
262287 list_of_datadictionaries = [
263288 # {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"},
264289 # {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"},
265- {"name" : "FOI Requests" , "reportid" : 199549 , "full_ingestion" : False ,
266- "location" : "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/" }
290+ {
291+ "name" : "FOI Requests" ,
292+ "reportid" : 199549 ,
293+ "full_ingestion" : False ,
294+ "location" : "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/" ,
295+ }
267296 ]
268297
269298 for data_dict in list_of_datadictionaries :
270299 location = data_dict ["location" ]
271300
272- if data_dict ['full_ingestion' ] == False :
273-
301+ if data_dict ["full_ingestion" ] == False :
274302 date_to_track_from = get_latest_timestamp (data_dict )
275- print (f' Starting calls from { date_to_track_from } ' )
303+ print (f" Starting calls from { date_to_track_from } " )
276304
277305 get_data (data_dict , date_to_track_from , auth_headers , auth_payload )
278306 else :
279307 get_data_from (data_dict , "2020-09-01" , auth_headers , auth_payload )
280308
281309
282- if __name__ == ' __main__' :
310+ if __name__ == " __main__" :
283311 main ()
0 commit comments