Skip to content

Commit 4d7c847

Browse files
authored
Merge branch 'main' into di-447-update-data-quality-tests
2 parents 1145bd7 + f1de16d commit 4d7c847

File tree

3 files changed

+100
-71
lines changed

3 files changed

+100
-71
lines changed

lambdas/icaseworks_api_ingestion/main.py

Lines changed: 60 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
1-
import sys
2-
3-
sys.path.append('./lib/')
4-
5-
import pybase64
6-
import json
1+
import datetime
72
import hashlib
83
import hmac
4+
import json
5+
import logging
96
import re
10-
import requests
7+
import sys
118
import time
9+
from os import getenv
10+
1211
import boto3
12+
import pybase64
13+
import requests
1314
from dotenv import load_dotenv
14-
from os import getenv
15-
import datetime
16-
import logging
15+
16+
17+
sys.path.append("./lib/")
1718

1819
logger = logging.getLogger()
1920
logger.setLevel(logging.INFO)
2021

2122

2223
def remove_illegal_characters(string):
2324
"""Removes illegal characters from string"""
24-
regex_list = [['=', ""], ['\/', "_"], ['+', "-"]]
25+
regex_list = [["=", ""], ["\/", "_"], ["+", "-"]] # noqa: W605
2526
for r in regex_list:
26-
string = re.sub(string=string,
27-
pattern="[{}]".format(r[0]),
28-
repl=r[1])
27+
string = re.sub(string=string, pattern="[{}]".format(r[0]), repl=r[1])
2928
return string
3029

3130

@@ -44,37 +43,39 @@ def dictionary_to_string(dictionary):
4443
def create_signature(header, payload, secret):
4544
"""Encode JSON string"""
4645
# hashed header, hashed payload, string secret
47-
unsigned_token = header + '.' + payload
48-
key_bytes = bytes(secret, 'utf-8')
49-
string_to_sign_bytes = bytes(unsigned_token, 'utf-8')
50-
signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest()
46+
unsigned_token = header + "." + payload
47+
key_bytes = bytes(secret, "utf-8")
48+
string_to_sign_bytes = bytes(unsigned_token, "utf-8")
49+
signature_hash = hmac.new(
50+
key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256
51+
).digest()
5152
encoded_signature = pybase64.b64encode(signature_hash)
52-
encoded_signature = encoded_signature.decode('utf-8')
53+
encoded_signature = encoded_signature.decode("utf-8")
5354
encoded_signature = remove_illegal_characters(encoded_signature)
5455
return encoded_signature
5556

5657

5758
def get_token(url, encoded_header, encoded_payload, signature, headers):
5859
"""Get token"""
5960
assertion = encoded_header + "." + encoded_payload + "." + signature
60-
data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer'
61+
data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer"
6162
response = requests.post(url, headers=headers, data=data)
6263
response_json = response.json()
63-
auth_token = response_json.get('access_token')
64+
auth_token = response_json.get("access_token")
6465
return auth_token
6566

6667

6768
def get_icaseworks_report_from(report_id, from_date, auth_headers, auth_payload):
6869
report_url = "https://hackneyreports.icasework.com/getreport?"
69-
request_url = f'{report_url}ReportId={report_id}&Format=json&From={from_date}'
70-
logger.info(f'Request url: {request_url}')
70+
request_url = f"{report_url}ReportId={report_id}&Format=json&From={from_date}"
71+
logger.info(f"Request url: {request_url}")
7172
response = requests.get(request_url, headers=auth_headers, data=auth_payload)
72-
logger.info(f'Status Code: {response.status_code}')
73+
logger.info(f"Status Code: {response.status_code}")
7374
return response.content
7475

7576

7677
def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename):
77-
filename = re.sub('[^a-zA-Z0-9]+', '-', filename).lower()
78+
filename = re.sub("[^a-zA-Z0-9]+", "-", filename).lower()
7879
current_date = datetime.datetime.now()
7980
day = single_digit_to_zero_prefixed_string(current_date.day)
8081
month = single_digit_to_zero_prefixed_string(current_date.month)
@@ -83,7 +84,7 @@ def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename):
8384
return s3_client.put_object(
8485
Bucket=s3_bucket,
8586
Body=data,
86-
Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json"
87+
Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json",
8788
)
8889

8990

@@ -102,14 +103,16 @@ def lambda_handler(event, lambda_context):
102103
url = "https://hackney.icasework.com/token"
103104

104105
headers = {
105-
'Content-Type': 'application/x-www-form-urlencoded',
106+
"Content-Type": "application/x-www-form-urlencoded",
106107
}
107108

108109
# Get api api credentials from secrets manager
109110
secret_name = getenv("SECRET_NAME")
110-
secrets_manager_client = boto3.client('secretsmanager')
111-
api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name)
112-
api_credentials = json.loads(api_credentials_response['SecretString'])
111+
secrets_manager_client = boto3.client("secretsmanager")
112+
api_credentials_response = retrieve_credentials_from_secrets_manager(
113+
secrets_manager_client, secret_name
114+
)
115+
api_credentials = json.loads(api_credentials_response["SecretString"])
113116
api_key = api_credentials.get("api_key")
114117
secret = api_credentials.get("secret")
115118

@@ -122,11 +125,7 @@ def lambda_handler(event, lambda_context):
122125
# Create payload
123126
current_unix_time = int(time.time())
124127
str_time = str(current_unix_time)
125-
payload_object = {
126-
"iss": api_key,
127-
"aud": url,
128-
"iat": str_time
129-
}
128+
payload_object = {"iss": api_key, "aud": url, "iat": str_time}
130129

131130
payload_object = dictionary_to_string(payload_object)
132131

@@ -136,16 +135,21 @@ def lambda_handler(event, lambda_context):
136135
signature = create_signature(header, payload, secret)
137136

138137
# Get token from response
139-
auth_token = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature,
140-
headers=headers)
138+
auth_token = get_token(
139+
url=url,
140+
encoded_header=header,
141+
encoded_payload=payload,
142+
signature=signature,
143+
headers=headers,
144+
)
141145

142146
# Create auth header for API Calls and auth payload
143-
authorization = f'Bearer {auth_token}'
147+
authorization = f"Bearer {auth_token}"
144148

145149
auth_payload = {}
146150

147151
auth_headers = {
148-
'Authorization': authorization,
152+
"Authorization": authorization,
149153
}
150154

151155
report_tables = [
@@ -164,22 +168,32 @@ def lambda_handler(event, lambda_context):
164168
date_to_track_from = today - datetime.timedelta(days=1)
165169
logger.info(f"Date to track from: {date_to_track_from}")
166170

167-
s3_client = boto3.client('s3')
171+
s3_client = boto3.client("s3")
168172

169173
for report_details in report_tables:
170-
logger.info(f'Pulling report for {report_details["name"]}')
174+
logger.info(f"Pulling report for {report_details['name']}")
171175
case_id_report_id = report_details["id"]
172-
case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_track_from, auth_headers, auth_payload)
176+
case_id_list = get_icaseworks_report_from(
177+
case_id_report_id, date_to_track_from, auth_headers, auth_payload
178+
)
173179
report_details["data"] = case_id_list
174-
write_dataframe_to_s3(s3_client, report_details["data"], s3_bucket, output_folder_name, report_details["name"])
175-
logger.info(f'Finished writing report for {report_details["name"]} to S3')
180+
write_dataframe_to_s3(
181+
s3_client,
182+
report_details["data"],
183+
s3_bucket,
184+
output_folder_name,
185+
report_details["name"],
186+
)
187+
logger.info(f"Finished writing report for {report_details['name']} to S3")
176188

177189
# Trigger glue job to copy from landing to raw and convert to parquet
178-
glue_client = boto3.client('glue')
190+
glue_client = boto3.client("glue")
179191
start_glue_trigger(glue_client, glue_trigger_name)
180192

193+
181194
def single_digit_to_zero_prefixed_string(value):
182-
return str(value) if value > 9 else '0' + str(value)
195+
return str(value) if value > 9 else "0" + str(value)
196+
183197

184198
def start_glue_trigger(glue_client, trigger_name):
185199
trigger_details = glue_client.start_trigger(Name=trigger_name)

lambdas/sftp_to_s3/index.js

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const config = {
1616
host: process.env.SFTP_HOST,
1717
username: process.env.SFTP_USERNAME,
1818
password: process.env.SFTP_PASSWORD,
19-
port: 22
19+
port: 22,
2020
};
2121

2222
let fileNamePattern = "";
@@ -34,28 +34,28 @@ async function getImportFilenamePattern(manualOverrideDateString) {
3434
Settings.throwOnInvalid = true;
3535
DateTime.fromISO(manualOverrideDateString);
3636

37-
let parts = manualOverrideDateString.split('-');
37+
let parts = manualOverrideDateString.split("-");
3838
dateToImport = new Date(parts[0], parts[1] - 1, parts[2]);
3939
}
4040

4141
year = dateToImport.getFullYear().toString();
42-
month = (dateToImport.getMonth() + 1).toString().padStart(2, '0');
43-
day = dateToImport.getDate().toString().padStart(2, '0');
44-
date = `${year}-${month}-${day}`
42+
month = (dateToImport.getMonth() + 1).toString().padStart(2, "0");
43+
day = dateToImport.getDate().toString().padStart(2, "0");
44+
date = `${year}-${month}-${day}`;
4545

4646
fileNamePattern = `${sftpSourceFilePrefix}${date}`;
4747
}
4848

4949
async function findFiles(sftpConn) {
50-
console.log(`filepath on server: ${sftpFilePath}`)
50+
console.log(`filepath on server: ${sftpFilePath}`);
5151

5252
const validPath = await sftpConn.exists(sftpFilePath);
5353

5454
if (!validPath) {
5555
return {
5656
success: false,
5757
message: `Path ${sftpFilePath} doesn't exist on SFTP server`,
58-
fileNames: []
58+
fileNames: [],
5959
};
6060
}
6161

@@ -68,7 +68,7 @@ async function findFiles(sftpConn) {
6868
function filterByFileNamePattern(file) {
6969
let name = file.name.toLowerCase();
7070
return name.includes(fileNamePattern.toLowerCase());
71-
}
71+
},
7272
);
7373

7474
if (fileList.length === 0) {
@@ -79,26 +79,30 @@ async function findFiles(sftpConn) {
7979
};
8080
}
8181

82-
const fileNames = fileList.filter(file => file.type != 'd').map(file => file.name);
82+
const fileNames = fileList
83+
.filter((file) => file.type != "d")
84+
.map((file) => file.name);
8385
console.log(fileNames);
8486
return {
8587
success: true,
8688
fileNames,
87-
message: ""
89+
message: "",
8890
};
8991
}
9092

9193
async function checkS3ForFile() {
9294
const s3Client = new AWS.S3({ region: AWS_REGION });
9395
const params = {
9496
Bucket: s3Bucket,
95-
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`
97+
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`,
9698
};
9799
try {
98100
await s3Client.headObject(params).promise();
99101
return true;
100102
} catch (error) {
101-
console.log(`Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`)
103+
console.log(
104+
`Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`,
105+
);
102106
return false;
103107
}
104108
}
@@ -109,7 +113,7 @@ function putFile() {
109113
const params = {
110114
Bucket: s3Bucket,
111115
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`,
112-
Body: stream
116+
Body: stream,
113117
};
114118

115119
const upload = s3Client.upload(params);
@@ -129,8 +133,7 @@ async function streamFileFromSftpToS3(sftp, fileName) {
129133
}
130134

131135
exports.handler = async (event) => {
132-
133-
let manualOverrideDateString = event['DateToImport'];
136+
let manualOverrideDateString = event["DateToImport"];
134137

135138
console.log(`Manual override date: ${manualOverrideDateString}`);
136139

@@ -139,8 +142,13 @@ exports.handler = async (event) => {
139142
const sftp = new sftpClient();
140143

141144
if (await checkS3ForFile()) {
142-
console.log(`Today's ${s3TargetFolder} file is already present in S3 bucket!`);
143-
return { success: true, message: `File already found in s3, no further action taken` };
145+
console.log(
146+
`Today's ${s3TargetFolder} file is already present in S3 bucket!`,
147+
);
148+
return {
149+
success: true,
150+
message: `File already found in s3, no further action taken`,
151+
};
144152
}
145153

146154
await sftp.connect(config);
@@ -153,26 +161,33 @@ exports.handler = async (event) => {
153161
console.log(findFilesResponse);
154162
return {
155163
success: findFilesResponse.success,
156-
message: findFilesResponse.message
164+
message: findFilesResponse.message,
157165
};
158166
}
159167

160-
await Promise.all(findFilesResponse.fileNames.map(file => streamFileFromSftpToS3(sftp, file)));
168+
await Promise.all(
169+
findFilesResponse.fileNames.map((file) =>
170+
streamFileFromSftpToS3(sftp, file),
171+
),
172+
);
161173

162174
//start trigger
163-
const glue = new AWS.Glue({ apiVersion: '2017-03-31' });
175+
const glue = new AWS.Glue({ apiVersion: "2017-03-31" });
164176
const params = {
165-
Name: trigger_to_run
177+
Name: trigger_to_run,
166178
};
167-
console.log("Starting trigger with params", params)
179+
console.log("Starting trigger with params", params);
168180

169181
await glue.startTrigger(params).promise();
170182

171183
console.log("Success!");
172-
return { success: true, message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3` };
184+
return {
185+
success: true,
186+
message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3`,
187+
};
173188
} catch (error) {
174189
console.error(error.message);
175190
} finally {
176191
await sftp.end();
177192
}
178-
}
193+
};

terraform/modules/api-ingestion-lambda/10-lambda.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ resource "aws_s3_object" "lambda" {
129129
source_hash = null_resource.run_install_requirements.triggers["dir_sha1"]
130130
depends_on = [data.archive_file.lambda]
131131
metadata = {
132-
last_updated = data.archive_file.lambda.output_base64sha256
132+
last_updated = null_resource.run_install_requirements.triggers.dir_sha1
133133
}
134134
}
135135

0 commit comments

Comments
 (0)