Skip to content

Commit 75c79b5

Browse files
authored
Merge pull request #3 from prashantramangupta/master
migration of daemon reports data from redshift to RDS.
2 parents 6304c87 + 2496240 commit 75c79b5

File tree

14 files changed

+283
-107
lines changed

14 files changed

+283
-107
lines changed

build.sh

Lines changed: 0 additions & 2 deletions
This file was deleted.

constant.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
NETWORKS = {
2+
999: {
3+
'db': {'DB_HOST': '',
4+
'DB_USER': '',
5+
'DB_PASSWORD': '',
6+
'DB_NAME': '',
7+
'DB_PORT': 3306
8+
}
9+
}
10+
}
11+
SLACK_HOOK = {
12+
'hostname' : '',
13+
'port': 443,
14+
'path': '',
15+
'method': 'POST',
16+
'headers': {
17+
'Content-Type': 'application/json'
18+
}
19+
}
20+
METRICS_NETWORK_ID = 999

lambda_handler.py

Lines changed: 0 additions & 92 deletions
This file was deleted.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from repository import Repository
2+
from constant import METRICS_NETWORK_ID
3+
4+
class Token:
5+
def __init__(self):
6+
self.repo = Repository(METRICS_NETWORK_ID)
7+
8+
def validate_token(self, daemon_id, token):
9+
print("validate_token::daemon_id: ", daemon_id);
10+
qry = "SELECT * FROM daemon_token WHERE daemon_id = %s and token = %s "
11+
res = self.repo.execute(qry, [daemon_id, token])
12+
if len(res) > 0:
13+
return {'validated': True}
14+
return {'validated': False}

platform-usage-authorizer/build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rm daemon_authorizer.zip
2+
zip -r daemon_authorizer.zip * -x \*venv\*
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import json
2+
import os
3+
4+
from authorizer import Token
5+
from utils import Utils
6+
7+
obj_util = Utils()
8+
9+
def request_handler(event, context):
10+
if 'path' not in event:
11+
return get_response("400", {"status": "failed", "error": "Bad Request"})
12+
try:
13+
data = None
14+
payload_dict = None
15+
path = event['path'].lower()
16+
if "/event" == path:
17+
payload_dict = event['headers']
18+
print("Processing [" + str(path) + "] with body [" + str(payload_dict) + "]")
19+
token_instance = Token()
20+
data = token_instance.validate_token(daemon_id=payload_dict['x-daemonid'],
21+
token=payload_dict['x-token'])
22+
else:
23+
return get_response(500, "Invalid URL path.")
24+
return get_lambda_authorizer_response_format(event=event, allow=data.get('validated', False))
25+
except Exception as e:
26+
err_msg = {"status": "failed",
27+
"error": repr(e),
28+
"api": event['path'],
29+
"payload": payload_dict,
30+
"type": "authorize"}
31+
obj_util.report_slack(1, str(err_msg))
32+
return get_response(500, err_msg)
33+
34+
35+
# Generate response JSON that API gateway expects from the lambda function
36+
def get_response(status_code, message):
37+
return {
38+
'statusCode': status_code,
39+
'body': json.dumps(message),
40+
'headers': {
41+
'Content-Type': 'application/json',
42+
"X-Requested-With": '*',
43+
"Access-Control-Allow-Headers": 'Access-Control-Allow-Origin, Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with',
44+
"Access-Control-Allow-Origin": '*',
45+
"Access-Control-Allow-Methods": 'GET,OPTIONS,POST'
46+
}
47+
}
48+
49+
50+
def get_lambda_authorizer_response_format(event, allow):
51+
response = {
52+
"principalId": os.environ['principalId'],
53+
"policyDocument": {
54+
"Version": '2012-10-17',
55+
"Statement": [
56+
{
57+
"Action": 'execute-api:Invoke',
58+
"Resource": event['methodArn']
59+
}
60+
]
61+
}
62+
}
63+
if allow:
64+
response["policyDocument"]["Statement"][0]["Effect"] = 'Allow'
65+
else:
66+
response["policyDocument"]["Statement"][0]["Effect"] = 'Deny'
67+
return response
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pymysql==0.9.2
2+
requests==2.20.1

platform-usage-process/build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rm platform_usage_process.zip
2+
zip -r platform_usage_process.zip * -x \*venv\*
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import json
2+
3+
from metrics import Metrics
4+
from register import Token
5+
from utils import Utils
6+
7+
obj_util = Utils()
8+
9+
10+
def request_handler(event, context):
11+
if 'path' not in event:
12+
return get_response("400", {"status": "failed",
13+
"error": "Bad Request"})
14+
try:
15+
data = None
16+
payload_dict = None
17+
path = event['path'].lower()
18+
payload_dict = payload_check(payload=event['body'], path=path)
19+
if "/register" == path:
20+
token_instance = Token()
21+
data = token_instance.process_token(daemon_id=payload_dict['daemonId'])
22+
elif "/event" == path:
23+
obj_metrics = Metrics()
24+
if payload_dict['type'] == 'request':
25+
obj_metrics.handle_request_type(payload_dict)
26+
elif payload_dict['type'] == 'response':
27+
obj_metrics.handle_response_type(payload_dict)
28+
data = {}
29+
else:
30+
return get_response(500, "Invalid URL path.")
31+
32+
if data is None:
33+
response = get_response("400", {"status": "failed",
34+
"error": "Bad Request",
35+
"api": event['path'],
36+
"payload": payload_dict})
37+
else:
38+
if data.get('error', '') == '':
39+
response = get_response("200", {"status": "success", "data": data})
40+
else:
41+
error = data['error']
42+
data.pop('error')
43+
response = get_response("200", {"status": "failed", "data": data, "error": error})
44+
45+
return response
46+
except Exception as e:
47+
err_msg = {"status": "failed",
48+
"error": repr(e),
49+
"api": event['path'],
50+
"payload": payload_dict,
51+
"type": "process"}
52+
obj_util.report_slack(1, str(err_msg))
53+
return get_response(500, err_msg)
54+
55+
56+
def payload_check(payload, path):
57+
payload_dict = None
58+
if payload is not None and len(payload) > 0:
59+
payload_dict = json.loads(payload)
60+
print("Processing [" + str(path) + "] with body [" + str(payload) + "]")
61+
return payload_dict
62+
63+
64+
# Generate response JSON that API gateway expects from the lambda function
65+
def get_response(status_code, message):
66+
return {
67+
'statusCode': status_code,
68+
'body': json.dumps(message),
69+
'headers': {
70+
'Content-Type': 'application/json',
71+
"X-Requested-With": '*',
72+
"Access-Control-Allow-Headers": 'Access-Control-Allow-Origin, Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with',
73+
"Access-Control-Allow-Origin": '*',
74+
"Access-Control-Allow-Methods": 'GET,OPTIONS,POST'
75+
}
76+
}

platform-usage-process/metrics.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from datetime import datetime as dt
2+
3+
from constant import METRICS_NETWORK_ID
4+
from repository import Repository
5+
6+
7+
class Metrics:
8+
def __init__(self):
9+
self.repo = Repository(METRICS_NETWORK_ID)
10+
11+
def handle_request_type(self, params):
12+
try:
13+
insrt_dm_rq_sts = "INSERT INTO daemon_request_stats (ethereum_json_rpc_endpoint, group_id, input_data_size, " \
14+
"organization_id, registry_address_key, request_id, request_received_time, service_id, " \
15+
"service_method, row_created, row_updated) " \
16+
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
17+
dm_req_params = (params['ethereum_json_rpc_endpoint'], params['group_id'], params['input_data_size'],
18+
params['organization_id'], params['registry_address_key'], params['request_id'],
19+
params['request_received_time'][:19], params['service_id'], params['service_method'],
20+
dt.utcnow(),
21+
dt.utcnow())
22+
self.repo.execute(insrt_dm_rq_sts, dm_req_params)
23+
except Exception as e:
24+
print(repr(e))
25+
raise e
26+
27+
def handle_response_type(self, params):
28+
try:
29+
insrt_dm_rs_sts = "INSERT INTO daemon_response_stats (error_message, ethereum_json_rpc_endpoint, group_id, " \
30+
"organization_id, registry_address_key, request_id, request_received_time, response_code, " \
31+
"response_sent_time, response_time, service_id, service_method, row_created, row_updated) " \
32+
"VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
33+
dm_rs_params = (params['error_message'], params['ethereum_json_rpc_endpoint'], params['group_id'],
34+
params['organization_id'], params['registry_address_key'], params['request_id'],
35+
params['request_received_time'][:19], params['response_code'],
36+
params['response_sent_time'][:19],
37+
params['response_time'], params['service_id'], params['service_method'], dt.utcnow(),
38+
dt.utcnow())
39+
self.repo.execute(insrt_dm_rs_sts, dm_rs_params)
40+
except Exception as e:
41+
print(repr(e))
42+
raise e

0 commit comments

Comments
 (0)