Skip to content

Commit b45dcf4

Browse files
committed
Adding new iCaseworks ingest
1 parent 8a0759d commit b45dcf4

File tree

2 files changed

+318
-0
lines changed

2 files changed

+318
-0
lines changed
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
# flake8: noqa: F821
2+
3+
import awswrangler as wr
4+
from datetime import datetime
5+
import logging
6+
import sys
7+
8+
from awsglue.utils import getResolvedOptions
9+
from scripts.helpers.helpers import get_max_date_partition_value_from_glue_catalogue
10+
import pandas as pd
11+
from pyathena import connect
12+
13+
logging.basicConfig(level=logging.INFO)
14+
logger = logging.getLogger(__name__)
15+
16+
arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
17+
'target_table', 'secret_name']
18+
partition_keys = ['import_year', 'import_month', 'import_day', 'import_date']
19+
20+
import boto3
21+
22+
args = getResolvedOptions(sys.argv, arg_keys)
23+
locals().update(args)
24+
25+
### iCaseworks Loads
26+
27+
import base64
28+
from datetime import datetime
29+
from datetime import date
30+
import json
31+
import hashlib
32+
import hmac
33+
import re
34+
import requests
35+
import string
36+
import time
37+
import os
38+
import glob
39+
from dateutil.relativedelta import *
40+
41+
42+
### Functions ###
43+
44+
def remove_illegal_characters(string):
45+
"""Removes illegal characters from string"""
46+
regex_list = [['=', ""], ['\/', "_"], ['+', "-"]]
47+
for r in regex_list:
48+
clean_string = re.sub(string=string,
49+
pattern="[{}]".format(r[0]),
50+
repl=r[1])
51+
return clean_string
52+
53+
54+
def encode_json(json_string):
55+
"""Encode JSON string"""
56+
json_string = json_string.encode()
57+
json_string = base64.b64encode(json_string)
58+
json_string = json_string.decode("utf-8")
59+
return json_string
60+
61+
62+
def create_signature(header, payload, secret):
63+
"""Encode JSON string"""
64+
# hashed header, hashed payload, string secret
65+
unsigned_token = header + '.' + payload
66+
# secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere??
67+
key_bytes = bytes(secret, 'utf-8')
68+
string_to_sign_bytes = bytes(unsigned_token, 'utf-8')
69+
signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest()
70+
encoded_signature = base64.b64encode(signature_hash)
71+
encoded_signature = encoded_signature.decode('utf-8')
72+
encoded_signature = remove_illegal_characters(encoded_signature)
73+
return encoded_signature
74+
75+
76+
def get_token(url, encoded_header, encoded_payload, signature, headers):
77+
"""Get token"""
78+
assertion = encoded_header + "." + encoded_payload + "." + signature
79+
data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer'
80+
print(f'Data : {data}')
81+
response = requests.request("POST", url, headers=headers, data=data)
82+
return response
83+
84+
85+
def get_icaseworks_report_from(report_id, fromdate, auth_headers, auth_payload):
86+
report_url = "https://hackneyreports.icasework.com/getreport?"
87+
request_url = f'{report_url}ReportId={report_id}&Format=json&From={fromdate}'
88+
print(f'Request url: {request_url}')
89+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
90+
print(f'Status Code: {r.status_code}')
91+
return r
92+
93+
94+
def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload):
95+
report_url = "https://hackneyreports.icasework.com/getreport?"
96+
97+
now = str(datetime.now())
98+
timetouse = now[:19]
99+
print(f'TimeNow: {timetouse}')
100+
101+
request_url = f'{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}'
102+
print(f'Request url: {request_url}')
103+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
104+
print(f'Status Code: {r.status_code}')
105+
return r
106+
107+
108+
def dump_dataframe(response, location, filename):
109+
df = pd.DataFrame.from_dict(response.json(), orient='columns')
110+
111+
df['import_year'] = datetime.today().year
112+
df['import_month'] = datetime.today().month
113+
df['import_day'] = datetime.today().day
114+
df['import_date'] = datetime.today().strftime('%Y%m%d')
115+
116+
print(f'Database: {target_database}')
117+
print(f'Table: {target_table}')
118+
119+
# write to s3
120+
wr.s3.to_parquet(
121+
df=df,
122+
path=s3_target_location,
123+
dataset=True,
124+
database=target_database,
125+
table=target_table,
126+
mode="overwrite_partitions",
127+
partition_cols=partition_keys
128+
)
129+
print(f'Dumped Dataframe {df.shape} to {s3_target_location}')
130+
logger.info(f'Dumped Dataframe {df.shape} to {s3_target_location}')
131+
132+
133+
def get_latest_timestamp(table_dict):
134+
# try:
135+
print(f'Getting max timestamp')
136+
# 2025-01-05T15:06:16
137+
138+
sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"'
139+
140+
conn = connect(s3_staging_dir=s3_staging_location,
141+
region_name=region_name)
142+
143+
df = pd.read_sql_query(sql_query, conn)
144+
latest_date = df.iloc[0, 0]
145+
latest_date = latest_date.replace("T", " ")
146+
147+
print(f'dataframe outputting')
148+
print(f'Time Found: {latest_date}')
149+
150+
return latest_date
151+
152+
153+
# except:
154+
# date_to_return = "2025-01-16 00:00:00"
155+
# print(f'No Data Found. Will use {date_to_return}')
156+
# return date_to_return
157+
158+
def authenticate_icaseworks(api_key, secret):
159+
url = "https://hackney.icasework.com/token"
160+
161+
headers = {
162+
'Content-Type': 'application/x-www-form-urlencoded'
163+
}
164+
165+
header_object = {"alg": "HS256", "typ": "JWT"}
166+
167+
# Create Header
168+
header_object = str(header_object).replace("'", '"').replace(" ", "")
169+
header = encode_json(header_object)
170+
print(f'Header: {header}')
171+
172+
# Create payload
173+
current_unix_time = int(time.time())
174+
str_time = str(current_unix_time)
175+
payload_object = {
176+
"iss": api_key,
177+
"aud": url,
178+
"iat": str_time
179+
}
180+
payload_object = str(payload_object).replace("'", '"').replace(" ",
181+
"") # can we do a dict-to-string function for this and the header
182+
183+
payload = encode_json(str(payload_object))
184+
print(f'Created Payload: {payload}')
185+
186+
# Create Signature
187+
signature = create_signature(header, payload, secret)
188+
print(f'Created Signature: {signature}')
189+
190+
# Get assertion
191+
assertion = header + "." + payload + "." + signature
192+
print(f'assertion: {assertion}')
193+
194+
# Get response
195+
response = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature, headers=headers)
196+
print(response)
197+
198+
# Get token
199+
auth_token = response.json().get('access_token')
200+
print(f'auth token: {auth_token}')
201+
202+
# Create auth header for API Calls and auth payload
203+
204+
authorization = f'Bearer {auth_token}'
205+
print(authorization)
206+
207+
auth_payload = []
208+
209+
# Note: I don't know how to generate the below cookie. That is extracted using postman. Not sure how to recreate this at all
210+
auth_headers = {
211+
'Authorization': authorization
212+
}
213+
print(f'')
214+
return auth_payload, auth_headers
215+
216+
217+
def get_data(table_dict, date_to_call, auth_headers, auth_payload):
218+
dict_to_call = table_dict
219+
220+
print(f'Pulling report for {dict_to_call["name"]}')
221+
case_id_report_id = dict_to_call["reportid"]
222+
case_id_list = get_report_fromtime(case_id_report_id, date_to_call, auth_headers, auth_payload)
223+
# print(f'Type of case_id_list {type(case_id_list)}')
224+
225+
dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary
226+
227+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date_to_call)
228+
229+
230+
def get_data_from(table_dict, date_to_call, auth_headers, auth_payload):
231+
dict_to_call = table_dict
232+
233+
print(f'Pulling report for {dict_to_call["name"]}')
234+
case_id_report_id = dict_to_call["reportid"]
235+
case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_call, auth_headers, auth_payload)
236+
# print(f'Type of case_id_list {type(case_id_list)}')
237+
238+
dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary
239+
240+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date.today())
241+
242+
243+
def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name):
244+
response = secrets_manager_client.get_secret_value(
245+
SecretId=secret_name,
246+
)
247+
return response
248+
249+
250+
### main function ##
251+
252+
def main():
253+
secrets_manager_client = boto3.client('secretsmanager')
254+
api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name)
255+
api_credentials = json.loads(api_credentials_response['SecretString'])
256+
api_key = api_credentials.get("api_key")
257+
print(f'Api_key: {api_key}')
258+
secret = api_credentials.get("secret")
259+
print(f'Secret: {secret}')
260+
auth_payload, auth_headers = authenticate_icaseworks(api_key, secret)
261+
262+
list_of_datadictionaries = [
263+
# {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"},
264+
# {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"},
265+
{"name": "FOI Requests", "reportid": 199549, "full_ingestion": False,
266+
"location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/"}
267+
]
268+
269+
for data_dict in list_of_datadictionaries:
270+
location = data_dict["location"]
271+
272+
if data_dict['full_ingestion'] == False:
273+
274+
date_to_track_from = get_latest_timestamp(data_dict)
275+
print(f'Starting calls from {date_to_track_from}')
276+
277+
get_data(data_dict, date_to_track_from, auth_headers, auth_payload)
278+
else:
279+
get_data_from(data_dict, "2020-09-01", auth_headers, auth_payload)
280+
281+
282+
if __name__ == '__main__':
283+
main()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
module "icaseworks_ingest_to_raw" {
2+
source = "../modules/aws-glue-job"
3+
is_production_environment = local.is_production_environment
4+
is_live_environment = local.is_live_environment
5+
6+
count = local.is_live_environment ? 0 : 0
7+
8+
department = module.department_housing_data_source
9+
job_name = "${local.short_identifier_prefix}icaseworks_ingest_to_raw"
10+
glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id
11+
glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id
12+
glue_job_timeout = 360
13+
helper_module_key = data.aws_s3_object.helpers.key
14+
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
15+
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
16+
trigger_enabled = local.is_production_environment
17+
number_of_workers_for_glue_job = 2
18+
schedule = "cron(0 10 ? * MON-FRI *)"
19+
job_parameters = {
20+
"--job-bookmark-option" = "job-bookmark-enable"
21+
"--enable-glue-datacatalog" = "true"
22+
"--enable-continuous-cloudwatch-log" = "true"
23+
"--additional-python-modules" = "PyAthena,numpy==1.26.1,awswrangler==3.10.0"
24+
"--region_name" = data.aws_region.current.name
25+
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
26+
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/data-and-insight/icaseworks/FOI/"
27+
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/data-and-insight/icaseworks/FOI/"
28+
"--target_database" = "data-and-insight-raw-zone"
29+
"--target_table" = "icaseworks_foi"
30+
"--secret_name" = "/data-and-insight/icaseworks_key"
31+
32+
}
33+
34+
script_name = "housing_apply_gx_dq_tests"
35+
}

0 commit comments

Comments
 (0)