-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathauthenticate.py
More file actions
178 lines (162 loc) · 6.45 KB
/
authenticate.py
File metadata and controls
178 lines (162 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python
# This will ask for your API Key the first time you run it and create a config.json
# It assumes an IP of 127.0.0.1 and PORT 443, but you can edit 'config.json' once created.
#
# You must first generate SSL certficates for the HTTPS server.
#
# openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 365 -out certificate.pem
#
# The if you don't have tokens this is will open a browser for you to login into your account.
# TD Ameritrade will pass the tokens back to the local HTTPS Server via your browser.
# It's normal to get a security warning from your browser, just accept the certficate you created.
# The tokens will be saved in 'tokens.json'
#
# Running again will use the refresh token in auth.json to get new tokens if less than
# six minutes remain.
#
# You will not need to login again unless your refresh token is expired or is invalidated.
#
# To keep your tokens.json file updated will valid tokens run.
#
# ./authenticate.py forever
#
# The code is not the best, but it works.
#
# Visit TD Ameritrade's developer website for more information.
#
# https://developer.tdameritrade.com/content/getting-started
# Use https://127.0.0.1 as your callback URL.
#
# Code based on example code here.
# https://developer.tdameritrade.com/content/web-server-authentication-python-3
#
# On systems that restrict port < 1024 you may need to use 'sudo ./athenticate.py' or equivalent.
#
# Requires sys, os, time, json, threading, webbrowser, http.server, urllib.parse, requests, and ssl modules.
#
import sys, os, time, json
import threading
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, quote_plus
import requests
import ssl
if not os.path.isfile('certificate.pem') or not os.path.isfile('key.pem'):
print("")
print("You need to generate SSL certificates")
print("")
print("openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 365 -out certificate.pem")
print("")
sys.exit(1)
config = { 'API_KEY' : "",
'OAUTH' : "AMER.OAUTHAP",
'HOST' : "127.0.0.1",
'PORT' : 443 }
config_filename = "config.json"
if os.path.isfile(config_filename):
config_file = open(config_filename)
config = json.load(config_file)
config_file.close()
else:
config['API_KEY'] = input('Enter your API Key: ').strip()
config_file = open(config_filename, 'w')
config_file.write(json.dumps(config))
config_file.close()
redirect_uri = "https://"+config['HOST']
if config['PORT'] != 443: redirect_uri = redirect_uri+":"+str(config['PORT'])
client_id = config['API_KEY']+"@"+config['OAUTH']
redirect_uri_encoded = quote_plus(redirect_uri)
tokens = { 'access_token' : None,
'expires_in' : None,
'token_type' : None,
'refresh_token' : None,
'refresh_token_expires_in' : None }
auth_filename = "tokens.json"
if os.path.isfile(auth_filename):
try:
token_file = open(auth_filename)
tokens = json.load(token_file)
token_file.close()
except:
pass
class TDAmeritradeHandler(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
global write_tokens
def write_tokens(authReplytext):
response = json.loads(authReplytext)
response.setdefault('error', [None])
if response['error'][0] == None:
tokens = json.loads(authReplytext)
tokens['grant_time'] = int(time.time())
token_file = open(auth_filename, 'w')
token_file.write(json.dumps(tokens))
token_file.close()
global update_tokens
def update_tokens():
global tokens
global client_id
global redirect_uri
global auth_filename
if time.time() > tokens['grant_time']+tokens['expires_in']/5*4:
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
data = { 'grant_type': 'refresh_token', 'refresh_token': tokens['refresh_token'], 'access_type': 'offline', 'code': '', 'client_id': client_id, 'redirect_uri': redirect_uri }
authReply = requests.post('https://api.tdameritrade.com/v1/oauth2/token', headers=headers, data=data)
write_tokens(authReply.text)
def do_GET(self):
self._set_headers()
path, _, query_string = self.path.partition('?')
query = parse_qs(query_string)
query.setdefault('code', [''])
code = query['code'][0]
#Post Access Token Request with new code
if code != '':
global tokens
global client_id
global redirect_uri
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
data = { 'grant_type': 'authorization_code', 'access_type': 'offline', 'code': code, 'client_id': client_id, 'redirect_uri': redirect_uri }
authReply = requests.post('https://api.tdameritrade.com/v1/oauth2/token', headers=headers, data=data)
write_tokens(authReply.text)
self.wfile.write(authReply.text.encode())
start_server = True
if tokens['access_token'] != None:
update_tokens()
temp = tokens
temp.setdefault('error', [None])
if temp['error'] == "invalid_grant":
os.remove(auth_filename)
else:
start_server = False
if start_server:
try:
httpd = HTTPServer((config['HOST'], config['PORT']), TDAmeritradeHandler)
httpd.socket = ssl.wrap_socket (httpd.socket, keyfile='key.pem', certfile='certificate.pem', server_side=True)
webbrowser.open_new("https://auth.tdameritrade.com/auth?response_type=code&redirect_uri="+redirect_uri_encoded+"&client_id="+client_id)
httpd.handle_request()
except PermissionError:
print("Unable to bind "+config['HOST']+":"+str(config['PORT'])+" with current permissions. (Try 'sudo ./tda-auth.py')")
sys.exit(0)
except:
print("Unknown error.")
sys.exit(0)
def forever():
if tokens['access_token'] != None:
update_tokens()
temp = tokens
temp.setdefault('error', [None])
if temp['error'] == "invalid_grant":
print ("Timer Abort: 'invalid_grant'")
sys.exit(0)
else:
update_tokens()
t = threading.Timer(tokens['expires_in']/20, forever)
t.start()
t.join()
try:
if sys.argv[1] is not None:
if sys.argv[1] == 'forever': forever()
except:
pass