Skip to content

Commit 665d6c9

Browse files
extracts webapp api wrapper
1 parent 3656afe commit 665d6c9

File tree

3 files changed

+388
-64
lines changed

3 files changed

+388
-64
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "pythonanywhere-core"
3-
version = "0.1.1"
3+
version = "0.1.2"
44
description = "API wrapper for programmatic management of PythonAnywhere services."
55
authors = ["PythonAnywhere <[email protected]>"]
66
license = "MIT"

pythonanywhere_core/webapp.py

Lines changed: 111 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pathlib import Path
66
from textwrap import dedent
77

8+
from dateutil.parser import parse
89
from snakesay import snakesay
910

1011
from pythonanywhere_core.base import call_api, get_api_endpoint, PYTHON_VERSIONS
@@ -13,7 +14,12 @@
1314

1415
class Webapp:
1516
def __init__(self, domain: str) -> None:
17+
self.endpoint = get_api_endpoint()
18+
self.username = getpass.getuser()
19+
self.files_url = self.endpoint.format(username=self.username, flavor="files")
20+
self.webapps_url = self.endpoint.format(username=self.username, flavor="webapps")
1621
self.domain = domain
22+
self.domain_url = f"{self.webapps_url}{self.domain}/"
1723

1824
def __eq__(self, other: Webapp) -> bool:
1925
return self.domain == other.domain
@@ -35,33 +41,124 @@ def sanity_checks(self, nuke: bool) -> None:
3541
if nuke:
3642
return
3743

38-
endpoint = get_api_endpoint().format(username=getpass.getuser(), flavor="webapps")
39-
url = f"{endpoint}{self.domain}/"
40-
response = call_api(url, "get")
44+
response = call_api(self.domain_url, "get")
4145
if response.status_code == 200:
4246
raise SanityException(
4347
f"You already have a webapp for {self.domain}.\n\nUse the --nuke option if you want to replace it."
4448
)
4549

4650
def create(self, python_version: str, virtualenv_path: Path, project_path: Path, nuke: bool) -> None:
4751
print(snakesay("Creating web app via API"))
48-
base_url = get_api_endpoint().format(username=getpass.getuser(), flavor="webapps")
49-
domain_url = f"{base_url}{self.domain}/"
5052
if nuke:
51-
call_api(domain_url, "delete")
52-
patch_url = domain_url
53+
call_api(self.domain_url, "delete")
5354
response = call_api(
54-
base_url, "post", data={"domain_name": self.domain, "python_version": PYTHON_VERSIONS[python_version]}
55+
self.webapps_url,
56+
"post",
57+
data={"domain_name": self.domain, "python_version": PYTHON_VERSIONS[python_version]},
5558
)
5659
if not response.ok or response.json().get("status") == "ERROR":
57-
raise PythonAnywhereApiException(
58-
f"POST to create webapp via API failed, got {response}:{response.text}"
59-
)
60+
raise PythonAnywhereApiException(f"POST to create webapp via API failed, got {response}:{response.text}")
6061
response = call_api(
61-
patch_url, "patch", data={"virtualenv_path": virtualenv_path, "source_directory": project_path}
62+
self.domain_url, "patch", data={"virtualenv_path": virtualenv_path, "source_directory": project_path}
6263
)
6364
if not response.ok:
6465
raise PythonAnywhereApiException(
65-
"PATCH to set virtualenv path and source directory via API failed,"
66-
"got {response}:{response_text}".format(response=response, response_text=response.text)
66+
"PATCH to set virtualenv path and source directory via API failed," f"got {response}:{response.text}"
67+
)
68+
69+
def add_default_static_files_mappings(self, project_path: Path) -> None:
70+
print(snakesay("Adding static files mappings for /static/ and /media/"))
71+
url = f"{self.domain_url}static_files/"
72+
call_api(url, "post", json=dict(url="/static/", path=str(Path(project_path) / "static")))
73+
call_api(url, "post", json=dict(url="/media/", path=str(Path(project_path) / "media")))
74+
75+
def reload(self) -> None:
76+
print(snakesay(f"Reloading {self.domain} via API"))
77+
url = f"{self.domain_url}reload/"
78+
response = call_api(url, "post")
79+
if not response.ok:
80+
if response.status_code == 409 and response.json()["error"] == "cname_error":
81+
print(
82+
snakesay(
83+
dedent(
84+
"""
85+
Could not find a CNAME for your website. If you're using an A record,
86+
CloudFlare, or some other way of pointing your domain at PythonAnywhere
87+
then that should not be a problem. If you're not, you should double-check
88+
your DNS setup.
89+
"""
90+
)
91+
)
92+
)
93+
return
94+
raise PythonAnywhereApiException(f"POST to reload webapp via API failed, got {response}:{response.text}")
95+
96+
def set_ssl(self, certificate: str, private_key: str) -> None:
97+
print(snakesay(f"Setting up SSL for {self.domain} via API"))
98+
url = f"{self.domain_url}ssl/"
99+
response = call_api(url, "post", json={"cert": certificate, "private_key": private_key})
100+
if not response.ok:
101+
raise PythonAnywhereApiException(
102+
dedent(
103+
f"""
104+
POST to set SSL details via API failed, got {response}:{response.text}
105+
If you just created an API token, you need to set the API_TOKEN environment variable or start a
106+
new console. Also you need to have setup a `{self.domain}` PythonAnywhere webapp for this to work.
107+
"""
108+
)
67109
)
110+
111+
def get_ssl_info(self) -> dict[str, Any]:
112+
url = f"{self.domain_url}ssl/"
113+
response = call_api(url, "get")
114+
if not response.ok:
115+
raise PythonAnywhereApiException(f"GET SSL details via API failed, got {response}:{response.text}")
116+
117+
result = response.json()
118+
result["not_after"] = parse(result["not_after"])
119+
return result
120+
121+
def delete_log(self, log_type: str, index: int = 0) -> None:
122+
if index:
123+
message = f"Deleting old (archive number {index}) {log_type} log file for {self.domain} via API"
124+
else:
125+
message = f"Deleting current {log_type} log file for {self.domain} via API"
126+
print(snakesay(message))
127+
128+
if index == 1:
129+
suffix = ".1"
130+
elif index > 1:
131+
suffix = f".{index}.gz"
132+
else:
133+
suffix = ""
134+
135+
base_log_url = f"{self.files_url}path/var/log/{self.domain}.{log_type}.log"
136+
response = call_api(f"{base_log_url}{suffix}/", "delete")
137+
138+
if not response.ok:
139+
raise PythonAnywhereApiException(f"DELETE log file via API failed, got {response}:{response.text}")
140+
141+
def get_log_info(self):
142+
url = f"{self.files_url}tree/?path=/var/log/"
143+
response = call_api(url, "get")
144+
if not response.ok:
145+
raise PythonAnywhereApiException(f"GET log files info via API failed, got {response}:{response.text}")
146+
file_list = response.json()
147+
log_types = ["access", "error", "server"]
148+
logs = {"access": [], "error": [], "server": []}
149+
log_prefix = f"/var/log/{self.domain}."
150+
for file_name in file_list:
151+
if type(file_name) == str and file_name.startswith(log_prefix):
152+
log = file_name[len(log_prefix) :].split(".")
153+
if log[0] in log_types:
154+
log_type = log[0]
155+
if log[-1] == "log":
156+
log_index = 0
157+
elif log[-1] == "1":
158+
log_index = 1
159+
elif log[-1] == "gz":
160+
log_index = int(log[-2])
161+
else:
162+
continue
163+
logs[log_type].append(log_index)
164+
return logs

0 commit comments

Comments
 (0)