11import getpass
22import os
3- import requests
43from datetime import datetime
5- from textwrap import dedent
64from pathlib import Path
5+ from textwrap import dedent
6+
7+ import requests
78
89from pythonanywhere .exceptions import SanityException
910from pythonanywhere .snakesay import snakesay
1011
11-
12- PYTHON_VERSIONS = {
13- '2.7' : 'python27' ,
14- '3.4' : 'python34' ,
15- '3.5' : 'python35' ,
16- '3.6' : 'python36' ,
17- '3.7' : 'python37' ,
18- }
19-
12+ PYTHON_VERSIONS = {"2.7" : "python27" , "3.4" : "python34" , "3.5" : "python35" , "3.6" : "python36" , "3.7" : "python37" }
2013
2114
2215class AuthenticationError (Exception ):
@@ -28,157 +21,133 @@ class NoTokenError(Exception):
2821
2922
3023def get_api_endpoint ():
31- domain = os .environ .get (' PYTHONANYWHERE_DOMAIN' , ' pythonanywhere.com' )
32- return ' https://www.{domain}/api/v0/user/{{username}}/{{flavor}}/' .format (domain = domain )
24+ domain = os .environ .get (" PYTHONANYWHERE_DOMAIN" , " pythonanywhere.com" )
25+ return " https://www.{domain}/api/v0/user/{{username}}/{{flavor}}/" .format (domain = domain )
3326
3427
3528def call_api (url , method , ** kwargs ):
36- token = os .environ .get (' API_TOKEN' )
29+ token = os .environ .get (" API_TOKEN" )
3730 if token is None :
3831 raise NoTokenError (
3932 "Oops, you don't seem to have an API token. "
4033 "Please go to the 'Account' page on PythonAnywhere, then to the 'API Token' "
4134 "tab. Click the 'Create a new API token' button to create the token, then "
4235 "start a new console and try running this script again."
4336 )
44- insecure = os .environ .get (' PYTHONANYWHERE_INSECURE_API' ) == ' true'
37+ insecure = os .environ .get (" PYTHONANYWHERE_INSECURE_API" ) == " true"
4538 response = requests .request (
4639 method = method ,
4740 url = url ,
48- headers = {' Authorization' : ' Token {token}' .format (token = token )},
41+ headers = {" Authorization" : " Token {token}" .format (token = token )},
4942 verify = not insecure ,
5043 ** kwargs
5144 )
5245 if response .status_code == 401 :
5346 print (response , response .text )
5447 raise AuthenticationError (
55- 'Authentication error {status_code} calling API: {response_text}' .format (
56- status_code = response .status_code ,
57- response_text = response .text ,
48+ "Authentication error {status_code} calling API: {response_text}" .format (
49+ status_code = response .status_code , response_text = response .text
5850 )
5951 )
6052 return response
6153
6254
63-
6455class Webapp :
65-
6656 def __init__ (self , domain ):
6757 self .domain = domain
6858
69-
7059 def __eq__ (self , other ):
7160 return self .domain == other .domain
7261
73-
7462 def sanity_checks (self , nuke ):
75- print (snakesay (' Running API sanity checks' ))
76- token = os .environ .get (' API_TOKEN' )
63+ print (snakesay (" Running API sanity checks" ))
64+ token = os .environ .get (" API_TOKEN" )
7765 if not token :
78- raise SanityException (dedent (
79- '''
66+ raise SanityException (
67+ dedent (
68+ """
8069 Could not find your API token.
8170 You may need to create it on the Accounts page?
8271 You will also need to close this console and open a new one once you've done that.
83- '''
84- ))
72+ """
73+ )
74+ )
8575
8676 if nuke :
8777 return
8878
89- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + '/'
90- response = call_api (url , ' get' )
79+ url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + "/"
80+ response = call_api (url , " get" )
9181 if response .status_code == 200 :
9282 raise SanityException (
93- ' You already have a webapp for {domain}.\n \n Use the --nuke option if you want to replace it.' .format (
83+ " You already have a webapp for {domain}.\n \n Use the --nuke option if you want to replace it." .format (
9484 domain = self .domain
9585 )
9686 )
9787
98-
99-
10088 def create (self , python_version , virtualenv_path , project_path , nuke ):
101- print (snakesay (' Creating web app via API' ))
89+ print (snakesay (" Creating web app via API" ))
10290 if nuke :
103- webapp_url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + '/'
104- call_api (webapp_url , ' delete' )
91+ webapp_url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + "/"
92+ call_api (webapp_url , " delete" )
10593 post_url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" )
106- patch_url = post_url + self .domain + '/'
107- response = call_api (post_url , 'post' , data = {
108- ' domain_name' : self .domain , ' python_version' : PYTHON_VERSIONS [python_version ]},
94+ patch_url = post_url + self .domain + "/"
95+ response = call_api (
96+ post_url , "post" , data = { " domain_name" : self .domain , " python_version" : PYTHON_VERSIONS [python_version ]}
10997 )
110- if not response .ok or response .json ().get (' status' ) == ' ERROR' :
98+ if not response .ok or response .json ().get (" status" ) == " ERROR" :
11199 raise Exception (
112- 'POST to create webapp via API failed, got {response}:{response_text}' .format (
113- response = response ,
114- response_text = response .text ,
100+ "POST to create webapp via API failed, got {response}:{response_text}" .format (
101+ response = response , response_text = response .text
115102 )
116103 )
117104 response = call_api (
118- patch_url , 'patch' ,
119- data = {'virtualenv_path' : virtualenv_path , 'source_directory' : project_path }
105+ patch_url , "patch" , data = {"virtualenv_path" : virtualenv_path , "source_directory" : project_path }
120106 )
121107 if not response .ok :
122108 raise Exception (
123109 "PATCH to set virtualenv path and source directory via API failed,"
124- "got {response}:{response_text}" .format (
125- response = response ,
126- response_text = response .text ,
127- )
110+ "got {response}:{response_text}" .format (response = response , response_text = response .text )
128111 )
129112
130-
131-
132113 def add_default_static_files_mappings (self , project_path ):
133- print (snakesay ('Adding static files mappings for /static/ and /media/' ))
134-
135- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + '/static_files/'
136- call_api (url , 'post' , json = dict (
137- url = '/static/' , path = str (Path (project_path ) / 'static' ),
138- ))
139- call_api (url , 'post' , json = dict (
140- url = '/media/' , path = str (Path (project_path ) / 'media' ),
141- ))
142-
114+ print (snakesay ("Adding static files mappings for /static/ and /media/" ))
143115
116+ url = (
117+ get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + "/static_files/"
118+ )
119+ call_api (url , "post" , json = dict (url = "/static/" , path = str (Path (project_path ) / "static" )))
120+ call_api (url , "post" , json = dict (url = "/media/" , path = str (Path (project_path ) / "media" )))
144121
145122 def reload (self ):
146- print (snakesay (' Reloading {domain} via API' .format (domain = self .domain )))
147- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + ' /reload/'
148- response = call_api (url , ' post' )
123+ print (snakesay (" Reloading {domain} via API" .format (domain = self .domain )))
124+ url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + " /reload/"
125+ response = call_api (url , " post" )
149126 if not response .ok :
150127 raise Exception (
151- 'POST to reload webapp via API failed, got {response}:{response_text}' .format (
152- response = response ,
153- response_text = response .text ,
128+ "POST to reload webapp via API failed, got {response}:{response_text}" .format (
129+ response = response , response_text = response .text
154130 )
155131 )
156132
157-
158133 def set_ssl (self , certificate , private_key ):
159- print (snakesay ('Setting up SSL for {domain} via API' .format (domain = self .domain )))
160- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + '/ssl/'
161- response = call_api (
162- url , 'post' ,
163- json = {'cert' : certificate , 'private_key' : private_key }
164- )
134+ print (snakesay ("Setting up SSL for {domain} via API" .format (domain = self .domain )))
135+ url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + "/ssl/"
136+ response = call_api (url , "post" , json = {"cert" : certificate , "private_key" : private_key })
165137 if not response .ok :
166138 raise Exception (
167- 'POST to set SSL details via API failed, got {response}:{response_text}' .format (
168- response = response ,
169- response_text = response .text ,
139+ "POST to set SSL details via API failed, got {response}:{response_text}" .format (
140+ response = response , response_text = response .text
170141 )
171142 )
172143
173-
174144 def get_ssl_info (self ):
175- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + ' /ssl/'
176- response = call_api (url , ' get' )
145+ url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "webapps" ) + self .domain + " /ssl/"
146+ response = call_api (url , " get" )
177147 if not response .ok :
178148 raise Exception (
179- 'GET SSL details via API failed, got {response}:{response_text}' .format (
180- response = response ,
181- response_text = response .text ,
149+ "GET SSL details via API failed, got {response}:{response_text}" .format (
150+ response = response , response_text = response .text
182151 )
183152 )
184153
@@ -188,42 +157,47 @@ def get_ssl_info(self):
188157
189158 def delete_log (self , log_type , index = 0 ):
190159 if index :
191- print (snakesay (
192- 'Deleting old (archive number {index}) {type} log file for {domain} via API' .format (index = index ,
193- type = log_type ,
194- domain = self .domain )))
160+ print (
161+ snakesay (
162+ "Deleting old (archive number {index}) {type} log file for {domain} via API" .format (
163+ index = index , type = log_type , domain = self .domain
164+ )
165+ )
166+ )
195167 else :
196- print (snakesay (
197- 'Deleting current {type} log file for {domain} via API' .format (type = log_type , domain = self .domain )))
168+ print (
169+ snakesay (
170+ "Deleting current {type} log file for {domain} via API" .format (type = log_type , domain = self .domain )
171+ )
172+ )
198173
199174 if index == 1 :
200- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "files" ) + "path/var/log/{domain}.{type}.log.1/" .format (
201- domain = self .domain , type = log_type )
175+ url = get_api_endpoint ().format (
176+ username = getpass .getuser (), flavor = "files"
177+ ) + "path/var/log/{domain}.{type}.log.1/" .format (domain = self .domain , type = log_type )
202178 elif index > 1 :
203179 url = get_api_endpoint ().format (
204- username = getpass .getuser (), flavor = "files" ) + "path/var/log/{domain}.{type}.log.{index}.gz/" . format (
205- domain = self .domain , type = log_type , index = index )
180+ username = getpass .getuser (), flavor = "files"
181+ ) + "path/var/log/{domain}.{type}.log.{index}.gz/" . format ( domain = self .domain , type = log_type , index = index )
206182 else :
207- url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "files" ) + "path/var/log/{domain}.{type}.log/" .format (
208- domain = self .domain , type = log_type )
183+ url = get_api_endpoint ().format (
184+ username = getpass .getuser (), flavor = "files"
185+ ) + "path/var/log/{domain}.{type}.log/" .format (domain = self .domain , type = log_type )
209186 response = call_api (url , "delete" )
210187 if not response .ok :
211188 raise Exception (
212189 "DELETE log file via API failed, got {response}:{response_text}" .format (
213- response = response ,
214- response_text = response .text ,
190+ response = response , response_text = response .text
215191 )
216192 )
217193
218194 def get_log_info (self ):
219- url = get_api_endpoint ().format (username = getpass .getuser (),
220- flavor = "files" ) + "tree/?path=/var/log/"
195+ url = get_api_endpoint ().format (username = getpass .getuser (), flavor = "files" ) + "tree/?path=/var/log/"
221196 response = call_api (url , "get" )
222197 if not response .ok :
223198 raise Exception (
224199 "GET log files info via API failed, got {response}:{response_text}" .format (
225- response = response ,
226- response_text = response .text ,
200+ response = response , response_text = response .text
227201 )
228202 )
229203 file_list = response .json ()
@@ -232,7 +206,7 @@ def get_log_info(self):
232206 log_prefix = "/var/log/{domain}." .format (domain = self .domain )
233207 for file_name in file_list :
234208 if type (file_name ) == str and file_name .startswith (log_prefix ):
235- log = file_name [len (log_prefix ):].split ("." )
209+ log = file_name [len (log_prefix ) :].split ("." )
236210 if log [0 ] in log_types :
237211 log_type = log [0 ]
238212 if log [- 1 ] == "log" :
0 commit comments