-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathRestAuthContoller.py
More file actions
104 lines (74 loc) · 2.75 KB
/
RestAuthContoller.py
File metadata and controls
104 lines (74 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import json
from cachetools import TTLCache
import requests
import datetime
import time
import ssl
import sys
import os
import urllib.parse
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import Config
import Config as config
class RestAuthController():
target_url = ''
def __init__(self, authcode):
self.KEY = config.tool_config['LEARN_REST_KEY']
self.SECRET = config.tool_config['LEARN_REST_SECRET']
self.CREDENTIALS = 'authorization_code'
self.PAYLOAD = {
'grant_type': 'authorization_code'
}
self.TOKEN = None
self.target_url = config.tool_config['LEARN_REST_URL']
self.app_url = config.tool_config['APP_URL']
self.authcode = authcode
self.EXPIRES_AT = ''
if config.tool_config['VERIFY_CERTS'] == 'True':
self.verify_certs = True
else:
self.verify_certs = False
self.cache = None
self.uuid = None
def getKey(self):
return self.KEY
def getSecret(self):
return self.SECRET
def setToken(self):
try:
if self.cache != None:
token = self.cache['token']
else:
self.requestToken()
except KeyError:
self.requestToken()
def requestToken(self):
OAUTH_URL = 'https://' + self.target_url + '/learn/api/public/v1/oauth2/token?code=' + self.authcode + '&redirect_uri=' + self.app_url + '/authcode/'
# Authenticate
print("[auth:setToken] POST Request URL: " + OAUTH_URL)
print("[auth:setToken] JSON Payload: \n" + json.dumps(self.PAYLOAD, indent=4, separators=(',', ': ')))
r = requests.post(OAUTH_URL, data=self.PAYLOAD, auth=(self.KEY, self.SECRET), verify=self.verify_certs)
print("[auth:setToken()] STATUS CODE: " + str(r.status_code))
# strip quotes from result for better dumps
res = json.loads(r.text)
print("[auth:setToken()] RESPONSE: \n" + json.dumps(res, indent=4, separators=(',', ': ')))
if r.status_code == 200:
parsed_json = json.loads(r.text)
self.cache = TTLCache(maxsize=1, ttl=parsed_json['expires_in'])
self.cache['token'] = parsed_json['access_token']
self.uuid = parsed_json['user_id']
print("[auth:setToken()] TOKEN: " + self.getToken())
else:
print("[auth:setToken()] ERROR")
def getToken(self):
try:
token = self.cache['token']
return token
except TypeError:
self.setToken()
return self.cache['token']
except KeyError:
self.setToken()
return self.cache['token']
def getUuid(self):
return (self.uuid)