Skip to content

Commit 616014c

Browse files
authored
Merge branch 'main' into llpg-ingestion-retry
2 parents 6b5601e + 3797500 commit 616014c

File tree

6 files changed

+346
-7
lines changed

6 files changed

+346
-7
lines changed

notebook/scripts/planning/load-table-from-tascomi-API-endpoint.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
"private_key = PRIVATE_KEY.encode('utf-8')\n",
5959
"\n",
6060
"table_to_read = \"\"\n",
61-
"request_uri = f'https://hackney-planning.idoxcloud.com/rest/v1/{table_to_read}'\n",
61+
"request_uri = f'https://hackney-planning.tascomi.com/rest/v1/{table_to_read}'\n",
6262
"request_method = 'GET'"
6363
]
6464
},
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
# flake8: noqa: F821
2+
3+
import base64
4+
import hashlib
5+
import hmac
6+
import json
7+
import logging
8+
import sys
9+
import time
10+
from datetime import date, datetime
11+
12+
import awswrangler as wr
13+
import boto3
14+
import pandas as pd
15+
import requests
16+
from awsglue.utils import getResolvedOptions
17+
from dateutil.relativedelta import *
18+
from pyathena import connect
19+
20+
21+
logging.basicConfig(level=logging.INFO)
22+
logger = logging.getLogger(__name__)
23+
24+
arg_keys = [
25+
"region_name",
26+
"s3_endpoint",
27+
"s3_target_location",
28+
"s3_staging_location",
29+
"target_database",
30+
"target_table",
31+
"secret_name",
32+
]
33+
partition_keys = ["import_year", "import_month", "import_day", "import_date"]
34+
35+
args = getResolvedOptions(sys.argv, arg_keys)
36+
37+
region_name = args["region_name"]
38+
s3_endpoint = args["s3_endpoint"]
39+
s3_target_location = args["s3_target_location"]
40+
s3_staging_location = args["s3_staging_location"]
41+
target_database = args["target_database"]
42+
target_table = args["target_table"]
43+
secret_name = args["secret_name"]
44+
45+
46+
def remove_illegal_characters(string):
47+
"""Removes illegal characters from string by replacing:
48+
= with empty string
49+
/ with _
50+
+ with -
51+
"""
52+
replacements = {"=": "", "/": "_", "+": "-"}
53+
54+
result = string
55+
for old, new in replacements.items():
56+
result = result.replace(old, new)
57+
58+
return result
59+
60+
61+
def encode_json(json_string):
62+
"""Encode JSON string"""
63+
json_string = json_string.encode()
64+
json_string = base64.b64encode(json_string)
65+
json_string = json_string.decode("utf-8")
66+
return json_string
67+
68+
69+
def create_signature(header, payload, secret):
70+
"""Encode JSON string"""
71+
# hashed header, hashed payload, string secret
72+
unsigned_token = header + "." + payload
73+
# secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere??
74+
key_bytes = bytes(secret, "utf-8")
75+
string_to_sign_bytes = bytes(unsigned_token, "utf-8")
76+
signature_hash = hmac.new(
77+
key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256
78+
).digest()
79+
encoded_signature = base64.b64encode(signature_hash)
80+
encoded_signature = encoded_signature.decode("utf-8")
81+
encoded_signature = remove_illegal_characters(encoded_signature)
82+
return encoded_signature
83+
84+
85+
def get_token(url, encoded_header, encoded_payload, signature, headers):
86+
"""Get token"""
87+
assertion = encoded_header + "." + encoded_payload + "." + signature
88+
data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer"
89+
print(f"Data : {data}")
90+
response = requests.request("POST", url, headers=headers, data=data)
91+
return response
92+
93+
94+
def get_icaseworks_report_from(report_id, fromdate, auth_headers, auth_payload):
95+
report_url = "https://hackneyreports.icasework.com/getreport?"
96+
request_url = f"{report_url}ReportId={report_id}&Format=json&From={fromdate}"
97+
print(f"Request url: {request_url}")
98+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
99+
print(f"Status Code: {r.status_code}")
100+
return r
101+
102+
103+
def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload):
104+
report_url = "https://hackneyreports.icasework.com/getreport?"
105+
106+
now = str(datetime.now())
107+
timetouse = now[:19]
108+
print(f"TimeNow: {timetouse}")
109+
110+
request_url = f"{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}"
111+
print(f"Request url: {request_url}")
112+
r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload)
113+
print(f"Status Code: {r.status_code}")
114+
return r
115+
116+
117+
def dump_dataframe(response, location, filename):
118+
df = pd.DataFrame.from_dict(response.json(), orient="columns")
119+
120+
df["import_year"] = datetime.today().year
121+
df["import_month"] = datetime.today().month
122+
df["import_day"] = datetime.today().day
123+
df["import_date"] = datetime.today().strftime("%Y%m%d")
124+
125+
print(f"Database: {target_database}")
126+
print(f"Table: {target_table}")
127+
128+
# write to s3
129+
wr.s3.to_parquet(
130+
df=df,
131+
path=s3_target_location,
132+
dataset=True,
133+
database=target_database,
134+
table=target_table,
135+
mode="overwrite_partitions",
136+
partition_cols=partition_keys,
137+
)
138+
print(f"Dumped Dataframe {df.shape} to {s3_target_location}")
139+
logger.info(f"Dumped Dataframe {df.shape} to {s3_target_location}")
140+
141+
142+
def get_latest_timestamp(table_dict):
143+
# TODO: reintroduce try except
144+
# try:
145+
print(f"Getting max timestamp")
146+
# 2025-01-05T15:06:16
147+
148+
# TODO: needs refactoring to allow for different tables
149+
sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"'
150+
151+
conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)
152+
153+
df = pd.read_sql_query(sql_query, conn)
154+
latest_date = df.iloc[0, 0]
155+
latest_date = latest_date.replace("T", " ")
156+
157+
print("dataframe outputting")
158+
print(f"Time Found: {latest_date}")
159+
160+
return latest_date
161+
162+
163+
# except:
164+
# date_to_return = "2025-01-16 00:00:00"
165+
# print(f'No Data Found. Will use {date_to_return}')
166+
# return date_to_return
167+
168+
169+
def authenticate_icaseworks(api_key, secret):
170+
url = "https://hackney.icasework.com/token"
171+
172+
headers = {"Content-Type": "application/x-www-form-urlencoded"}
173+
174+
header_object = {"alg": "HS256", "typ": "JWT"}
175+
176+
# Create Header
177+
header_object = str(header_object).replace("'", '"').replace(" ", "")
178+
header = encode_json(header_object)
179+
print(f"Header: {header}")
180+
181+
# Create payload
182+
current_unix_time = int(time.time())
183+
str_time = str(current_unix_time)
184+
payload_object = {"iss": api_key, "aud": url, "iat": str_time}
185+
payload_object = (
186+
str(payload_object).replace("'", '"').replace(" ", "")
187+
) # can we do a dict-to-string function for this and the header
188+
189+
payload = encode_json(str(payload_object))
190+
print(f"Created Payload: {payload}")
191+
192+
# Create Signature
193+
signature = create_signature(header, payload, secret)
194+
print(f"Created Signature: {signature}")
195+
196+
# Get assertion
197+
assertion = header + "." + payload + "." + signature
198+
print(f"assertion: {assertion}")
199+
200+
# Get response
201+
response = get_token(
202+
url=url,
203+
encoded_header=header,
204+
encoded_payload=payload,
205+
signature=signature,
206+
headers=headers,
207+
)
208+
209+
# Get token
210+
auth_token = response.json().get("access_token")
211+
212+
# Create auth header for API Calls and auth payload
213+
214+
authorization = f"Bearer {auth_token}"
215+
216+
auth_payload = []
217+
218+
auth_headers = {"Authorization": authorization}
219+
print(f"")
220+
return auth_payload, auth_headers
221+
222+
223+
def get_data(table_dict, date_to_call, auth_headers, auth_payload):
224+
dict_to_call = table_dict
225+
226+
print(f"Pulling report for {dict_to_call['name']}")
227+
case_id_report_id = dict_to_call["reportid"]
228+
case_id_list = get_report_fromtime(
229+
case_id_report_id, date_to_call, auth_headers, auth_payload
230+
)
231+
232+
dict_to_call["DF"] = (
233+
case_id_list # This will append the response to the DF column in the dictionary
234+
)
235+
236+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date_to_call)
237+
238+
239+
def get_data_from(table_dict, date_to_call, auth_headers, auth_payload):
240+
dict_to_call = table_dict
241+
242+
print(f"Pulling report for {dict_to_call['name']}")
243+
case_id_report_id = dict_to_call["reportid"]
244+
case_id_list = get_icaseworks_report_from(
245+
case_id_report_id, date_to_call, auth_headers, auth_payload
246+
)
247+
# print(f'Type of case_id_list {type(case_id_list)}')
248+
249+
dict_to_call["DF"] = (
250+
case_id_list # This will append the response to the DF column in the dictionary
251+
)
252+
253+
dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date.today())
254+
255+
256+
def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name):
257+
response = secrets_manager_client.get_secret_value(
258+
SecretId=secret_name,
259+
)
260+
return response
261+
262+
263+
### main function ##
264+
265+
266+
def main():
267+
secrets_manager_client = boto3.client("secretsmanager")
268+
api_credentials_response = retrieve_credentials_from_secrets_manager(
269+
secrets_manager_client, secret_name
270+
)
271+
api_credentials = json.loads(api_credentials_response["SecretString"])
272+
api_key = api_credentials.get("api_key")
273+
274+
secret = api_credentials.get("secret")
275+
276+
auth_payload, auth_headers = authenticate_icaseworks(api_key, secret)
277+
278+
list_of_datadictionaries = [
279+
# {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"},
280+
# {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"},
281+
{
282+
"name": "FOI Requests",
283+
"reportid": 199549,
284+
"full_ingestion": False,
285+
"location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/",
286+
}
287+
]
288+
289+
for data_dict in list_of_datadictionaries:
290+
location = data_dict["location"]
291+
292+
if data_dict["full_ingestion"] == False:
293+
date_to_track_from = get_latest_timestamp(data_dict)
294+
print(f"Starting calls from {date_to_track_from}")
295+
296+
get_data(data_dict, date_to_track_from, auth_headers, auth_payload)
297+
else:
298+
get_data_from(data_dict, "2020-09-01", auth_headers, auth_payload)
299+
300+
301+
if __name__ == "__main__":
302+
main()

scripts/jobs/planning/tascomi_api_ingestion.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def get_number_of_pages(resource, query=""):
8787

8888
headers = authenticate_tascomi(headers, public_key, private_key)
8989

90-
url = f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}{query}'
90+
url = f'https://hackney-planning.tascomi.com/rest/v1/{resource}{query}'
9191
res = requests.get(url, data="", headers=headers)
9292
if res.status_code == 202:
9393
logger.info(f"received status code 202, whilst getting number of pages for {resource}, with query {query}")
@@ -144,7 +144,7 @@ def get_requests_since_last_import(resource, last_import_date):
144144
number_of_pages = number_of_pages_reponse["number_of_pages"]
145145
logger.info(f"Number of pages to retrieve for {day}: {number_of_pages}")
146146
requests_list += [RequestRow(page_number,
147-
f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}&last_updated={day}',
147+
f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}&last_updated={day}',
148148
"") for page_number in range(1, number_of_pages + 1)]
149149
number_of_requests = len(requests_list)
150150
if number_of_requests == 0:
@@ -162,7 +162,7 @@ def get_requests_for_full_load(resource):
162162
number_of_pages = number_of_pages_reponse["number_of_pages"]
163163
logger.info(f"Number of pages to retrieve: {number_of_pages}")
164164
requests_list = [
165-
RequestRow(page_number, f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}', "") for
165+
RequestRow(page_number, f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}', "") for
166166
page_number in range(1, number_of_pages + 1)]
167167
number_of_requests = len(requests_list)
168168
requests_list = sc.parallelize(requests_list)

scripts/tests/planning/test_tascomi_parse_tables.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_parsed_row_data(self, spark):
5353
"creation_user_id": None,
5454
"title_id": "4",
5555
"page_number": 691,
56-
"import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691",
56+
"import_api_url_requested": "https://hackney-planning.tascomi.com/rest/v1/contacts?page=691",
5757
"import_api_status_code": 200,
5858
"import_exception_thrown": "",
5959
"import_datetime": datetime(2021, 9, 16, 13, 10),
@@ -69,7 +69,7 @@ def parse_json_into_dataframe(self, spark, column, data):
6969
data_with_imports = [
7070
{
7171
"page_number": 691,
72-
"import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691",
72+
"import_api_url_requested": "https://hackney-planning.tascomi.com/rest/v1/contacts?page=691",
7373
"import_api_status_code": 200,
7474
"import_exception_thrown": "",
7575
"import_datetime": datetime(2021, 9, 16, 13, 10),

terraform/etl/24-aws-glue-tascomi-data.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ resource "aws_glue_trigger" "tascomi_tables_weekly_ingestion_triggers" {
142142

143143
name = "${local.short_identifier_prefix}Tascomi ${title(replace(each.value, "_", " "))} Ingestion Trigger"
144144
type = "SCHEDULED"
145-
schedule = "cron(0 21 ? * MON *)"
145+
schedule = "cron(0 16 ? * SUN *)"
146146
enabled = local.is_production_environment
147147

148148
actions {

0 commit comments

Comments
 (0)