Skip to content

Commit 4e227a8

Browse files
Adding new iCaseworks ingest (#2080)
* Adding new iCaseworks ingest * rework replace_illegal_characters function * organise imports * assign variables explicitly * tidy up comments and prints * Committing Suggestions by Tim --------- Co-authored-by: timburke-hackit <tim.burke@hackney.gov.uk>
1 parent 5299c9a commit 4e227a8

File tree

2 files changed

+339
-0
lines changed

2 files changed

+339
-0
lines changed
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# flake8: noqa: F821
2+
3+
import base64
4+
import hashlib
5+
import hmac
6+
import json
7+
import logging
8+
import sys
9+
import time
10+
from datetime import date, datetime
11+
12+
import awswrangler as wr
13+
import boto3
14+
import pandas as pd
15+
import requests
16+
from awsglue.utils import getResolvedOptions
17+
from dateutil.relativedelta import *
18+
from pyathena import connect
19+
20+
21+
logging.basicConfig(level=logging.INFO)
22+
logger = logging.getLogger(__name__)
23+
24+
arg_keys = [
25+
"region_name",
26+
"s3_endpoint",
27+
"s3_target_location",
28+
"s3_staging_location",
29+
"target_database",
30+
"target_table",
31+
"secret_name",
32+
]
33+
partition_keys = ["import_year", "import_month", "import_day", "import_date"]
34+
35+
args = getResolvedOptions(sys.argv, arg_keys)
36+
37+
region_name = args["region_name"]
38+
s3_endpoint = args["s3_endpoint"]
39+
s3_target_location = args["s3_target_location"]
40+
s3_staging_location = args["s3_staging_location"]
41+
target_database = args["target_database"]
42+
target_table = args["target_table"]
43+
secret_name = args["secret_name"]
44+
45+
46+
def remove_illegal_characters(string):
47+
"""Removes illegal characters from string by replacing:
48+
= with empty string
49+
/ with _
50+
+ with -
51+
"""
52+
replacements = {"=": "", "/": "_", "+": "-"}
53+
54+
result = string
55+
for old, new in replacements.items():
56+
result = result.replace(old, new)
57+
58+
return result
59+
60+
61+
def encode_json(json_string):
62+
"""Encode JSON string"""
63+
json_string = json_string.encode()
64+
json_string = base64.b64encode(json_string)
65+
json_string = json_string.decode("utf-8")
66+
return json_string
67+
68+
69+
def create_signature(header, payload, secret):
70+
"""Encode JSON string"""
71+
# hashed header, hashed payload, string secret
72+
unsigned_token = header + "." + payload
73+
# secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere??
74+
key_bytes = bytes(secret, "utf-8")
75+
string_to_sign_bytes = bytes(unsigned_token, "utf-8")
76+
signature_hash = hmac.new(
77+
key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256
78+
).digest()
79+
encoded_signature = base64.b64encode(signature_hash)
80+
encoded_signature = encoded_signature.decode("utf-8")
81+
encoded_signature = remove_illegal_characters(encoded_signature)
82+
return encoded_signature
83+
84+
85+
def get_token(url, encoded_header, encoded_payload, signature, headers):
86+
"""Get token"""
87+
assertion = encoded_header + "." + encoded_payload + "." + signature
88+
data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer"
89+
print(f"Data : {data}")
90+
response = requests.request("POST", url, headers=headers, data=data)
91+
return response
92+
93+
94+
def get_icaseworks_report_from(report_id, fromdate, auth_headers, auth_payload):
95+
report_url = "https://hackneyreports.icasework.com/getreport?"
96+
request_url = f"{report_url}ReportId={report_id}&Format=json&From={fromdate}"
97+
print(f"Request url: {request_url}")
98+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
99+
print(f"Status Code: {r.status_code}")
100+
return r
101+
102+
103+
def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload):
104+
report_url = "https://hackneyreports.icasework.com/getreport?"
105+
106+
now = str(datetime.now())
107+
timetouse = now[:19]
108+
print(f"TimeNow: {timetouse}")
109+
110+
request_url = f"{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}"
111+
print(f"Request url: {request_url}")
112+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
113+
print(f"Status Code: {r.status_code}")
114+
return r
115+
116+
117+
def dump_dataframe(response, location, filename):
118+
df = pd.DataFrame.from_dict(response.json(), orient="columns")
119+
120+
df["import_year"] = datetime.today().year
121+
df["import_month"] = datetime.today().month
122+
df["import_day"] = datetime.today().day
123+
df["import_date"] = datetime.today().strftime("%Y%m%d")
124+
125+
print(f"Database: {target_database}")
126+
print(f"Table: {target_table}")
127+
128+
# write to s3
129+
wr.s3.to_parquet(
130+
df=df,
131+
path=s3_target_location,
132+
dataset=True,
133+
database=target_database,
134+
table=target_table,
135+
mode="overwrite_partitions",
136+
partition_cols=partition_keys,
137+
)
138+
print(f"Dumped Dataframe {df.shape} to {s3_target_location}")
139+
logger.info(f"Dumped Dataframe {df.shape} to {s3_target_location}")
140+
141+
142+
def get_latest_timestamp(table_dict):
143+
# TODO: reintroduce try except
144+
# try:
145+
print(f"Getting max timestamp")
146+
# 2025-01-05T15:06:16
147+
148+
# TODO: needs refactoring to allow for different tables
149+
sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"'
150+
151+
conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)
152+
153+
df = pd.read_sql_query(sql_query, conn)
154+
latest_date = df.iloc[0, 0]
155+
latest_date = latest_date.replace("T", " ")
156+
157+
print("dataframe outputting")
158+
print(f"Time Found: {latest_date}")
159+
160+
return latest_date
161+
162+
163+
# except:
164+
# date_to_return = "2025-01-16 00:00:00"
165+
# print(f'No Data Found. Will use {date_to_return}')
166+
# return date_to_return
167+
168+
169+
def authenticate_icaseworks(api_key, secret):
170+
url = "https://hackney.icasework.com/token"
171+
172+
headers = {"Content-Type": "application/x-www-form-urlencoded"}
173+
174+
header_object = {"alg": "HS256", "typ": "JWT"}
175+
176+
# Create Header
177+
header_object = str(header_object).replace("'", '"').replace(" ", "")
178+
header = encode_json(header_object)
179+
print(f"Header: {header}")
180+
181+
# Create payload
182+
current_unix_time = int(time.time())
183+
str_time = str(current_unix_time)
184+
payload_object = {"iss": api_key, "aud": url, "iat": str_time}
185+
payload_object = (
186+
str(payload_object).replace("'", '"').replace(" ", "")
187+
) # can we do a dict-to-string function for this and the header
188+
189+
payload = encode_json(str(payload_object))
190+
print(f"Created Payload: {payload}")
191+
192+
# Create Signature
193+
signature = create_signature(header, payload, secret)
194+
print(f"Created Signature: {signature}")
195+
196+
# Get assertion
197+
assertion = header + "." + payload + "." + signature
198+
print(f"assertion: {assertion}")
199+
200+
# Get response
201+
response = get_token(
202+
url=url,
203+
encoded_header=header,
204+
encoded_payload=payload,
205+
signature=signature,
206+
headers=headers,
207+
)
208+
209+
# Get token
210+
auth_token = response.json().get("access_token")
211+
212+
# Create auth header for API Calls and auth payload
213+
214+
authorization = f"Bearer {auth_token}"
215+
216+
auth_payload = []
217+
218+
auth_headers = {"Authorization": authorization}
219+
print(f"")
220+
return auth_payload, auth_headers
221+
222+
223+
def get_data(table_dict, date_to_call, auth_headers, auth_payload):
224+
dict_to_call = table_dict
225+
226+
print(f"Pulling report for {dict_to_call['name']}")
227+
case_id_report_id = dict_to_call["reportid"]
228+
case_id_list = get_report_fromtime(
229+
case_id_report_id, date_to_call, auth_headers, auth_payload
230+
)
231+
232+
dict_to_call["DF"] = (
233+
case_id_list # This will append the response to the DF column in the dictionary
234+
)
235+
236+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date_to_call)
237+
238+
239+
def get_data_from(table_dict, date_to_call, auth_headers, auth_payload):
240+
dict_to_call = table_dict
241+
242+
print(f"Pulling report for {dict_to_call['name']}")
243+
case_id_report_id = dict_to_call["reportid"]
244+
case_id_list = get_icaseworks_report_from(
245+
case_id_report_id, date_to_call, auth_headers, auth_payload
246+
)
247+
# print(f'Type of case_id_list {type(case_id_list)}')
248+
249+
dict_to_call["DF"] = (
250+
case_id_list # This will append the response to the DF column in the dictionary
251+
)
252+
253+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date.today())
254+
255+
256+
def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name):
257+
response = secrets_manager_client.get_secret_value(
258+
SecretId=secret_name,
259+
)
260+
return response
261+
262+
263+
### main function ##
264+
265+
266+
def main():
267+
secrets_manager_client = boto3.client("secretsmanager")
268+
api_credentials_response = retrieve_credentials_from_secrets_manager(
269+
secrets_manager_client, secret_name
270+
)
271+
api_credentials = json.loads(api_credentials_response["SecretString"])
272+
api_key = api_credentials.get("api_key")
273+
274+
secret = api_credentials.get("secret")
275+
276+
auth_payload, auth_headers = authenticate_icaseworks(api_key, secret)
277+
278+
list_of_datadictionaries = [
279+
# {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"},
280+
# {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"},
281+
{
282+
"name": "FOI Requests",
283+
"reportid": 199549,
284+
"full_ingestion": False,
285+
"location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/",
286+
}
287+
]
288+
289+
for data_dict in list_of_datadictionaries:
290+
location = data_dict["location"]
291+
292+
if data_dict["full_ingestion"] == False:
293+
date_to_track_from = get_latest_timestamp(data_dict)
294+
print(f"Starting calls from {date_to_track_from}")
295+
296+
get_data(data_dict, date_to_track_from, auth_headers, auth_payload)
297+
else:
298+
get_data_from(data_dict, "2020-09-01", auth_headers, auth_payload)
299+
300+
301+
if __name__ == "__main__":
302+
main()
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
module "icaseworks_ingest_to_raw" {
2+
source = "../modules/aws-glue-job"
3+
is_production_environment = local.is_production_environment
4+
is_live_environment = local.is_live_environment
5+
6+
count = !local.is_production_environment && local.is_live_environment ? 1 : 0
7+
# count = local.is_live_environment ? 1 : 0
8+
# Bottom one is for Prod
9+
10+
department = module.department_data_and_insight_data_source
11+
job_name = "${local.short_identifier_prefix}icaseworks_ingest_to_raw"
12+
glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id
13+
glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id
14+
glue_job_timeout = 360
15+
helper_module_key = data.aws_s3_object.helpers.key
16+
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
17+
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
18+
trigger_enabled = local.is_production_environment
19+
number_of_workers_for_glue_job = 2
20+
schedule = "cron(0 3 ? * * *)"
21+
job_parameters = {
22+
"--job-bookmark-option" = "job-bookmark-enable"
23+
"--enable-glue-datacatalog" = "true"
24+
"--enable-continuous-cloudwatch-log" = "true"
25+
"--additional-python-modules" = "PyAthena,numpy==1.26.1,awswrangler==3.10.0"
26+
"--region_name" = data.aws_region.current.name
27+
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
28+
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/data-and-insight/icaseworks/FOI/"
29+
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/data-and-insight/icaseworks/FOI/"
30+
"--target_database" = "data-and-insight-raw-zone"
31+
"--target_table" = "icaseworks_foi"
32+
"--secret_name" = "/data-and-insight/icaseworks_key"
33+
34+
}
35+
36+
script_name = "icaseworks_ingest_to_raw"
37+
}

0 commit comments

Comments
 (0)