|
1 | 1 | import os |
| 2 | +import secrets |
2 | 3 | from functools import wraps |
3 | 4 | from typing import Callable |
4 | 5 |
|
5 | 6 | from authlib.integrations.flask_client import OAuth |
6 | | -from flask import Blueprint, Flask, abort, redirect, session, url_for |
| 7 | +from flask import Blueprint, Flask, abort, jsonify, redirect, request, session, url_for |
| 8 | +from werkzeug.security import check_password_hash, generate_password_hash |
7 | 9 | from werkzeug.wrappers import Response |
8 | 10 |
|
| 11 | +from schema import APIKey, db |
| 12 | + |
9 | 13 | oauth = OAuth() |
10 | 14 |
|
11 | 15 |
|
@@ -86,3 +90,107 @@ def logout() -> Response: |
86 | 90 | # if no id token, just clear session and redirect to index |
87 | 91 | session.clear() |
88 | 92 | return redirect(url_for("index")) |
| 93 | + |
| 94 | + |
| 95 | +def create_api_key(owner: str) -> dict: |
| 96 | + """Create a new API key""" |
| 97 | + key = secrets.token_urlsafe(32) |
| 98 | + api_key = APIKey(generate_password_hash(key), owner.strip().lower()) |
| 99 | + db.session.add(api_key) |
| 100 | + db.session.commit() |
| 101 | + return { |
| 102 | + "id": api_key.id, |
| 103 | + "key": key, |
| 104 | + "owner": api_key.owner, |
| 105 | + "active": api_key.active, |
| 106 | + "created_at": api_key.created_at.isoformat(), |
| 107 | + } |
| 108 | + |
| 109 | + |
| 110 | +def get_api_keys() -> list[dict]: |
| 111 | + """Get all API keys""" |
| 112 | + api_keys = APIKey.query.all() |
| 113 | + return [key.to_dict() for key in api_keys] |
| 114 | + |
| 115 | + |
| 116 | +def get_api_key(key_id: int) -> dict | None: |
| 117 | + """Get an API key by its ID""" |
| 118 | + api_key = APIKey.query.get(key_id) |
| 119 | + if not api_key: |
| 120 | + return None |
| 121 | + return api_key.to_dict() |
| 122 | + |
| 123 | + |
| 124 | +def disable_api_key(key_id: int) -> dict | None: |
| 125 | + """Disable an API key by its ID""" |
| 126 | + api_key = APIKey.query.get(key_id) |
| 127 | + if not api_key: |
| 128 | + return None |
| 129 | + api_key.deactivate() |
| 130 | + return api_key.to_dict() |
| 131 | + |
| 132 | + |
| 133 | +def is_valid_api_key(api_key: str) -> bool: |
| 134 | + """Check if an API key is valid""" |
| 135 | + for key in APIKey.query.filter_by(active=True).all(): |
| 136 | + if check_password_hash(key.key, api_key): |
| 137 | + return True |
| 138 | + return False |
| 139 | + |
| 140 | + |
| 141 | +def valid_api_auth(f: Callable) -> Callable: |
| 142 | + """decorater to check if valid API auth""" |
| 143 | + |
| 144 | + @wraps(f) |
| 145 | + def decorated_function(*args: object, **kwargs: object) -> Response: |
| 146 | + # check if exec |
| 147 | + if is_exec(): |
| 148 | + return f(*args, **kwargs) |
| 149 | + # check if valid API key |
| 150 | + # american spelling for convention |
| 151 | + api_key = request.headers.get("Authorization") |
| 152 | + if not api_key or not is_valid_api_key(api_key): |
| 153 | + return abort(403, "Invalid API key or not authorised.") |
| 154 | + return f(*args, **kwargs) |
| 155 | + |
| 156 | + return decorated_function |
| 157 | + |
| 158 | + |
| 159 | +auth_api_bp = Blueprint("auth_api", __name__, url_prefix="/api/auth") |
| 160 | + |
| 161 | + |
| 162 | +@auth_api_bp.route("/create", methods=["POST"]) |
| 163 | +@is_exec_wrapper |
| 164 | +def create_api_key_api() -> tuple[Response, int]: |
| 165 | + """Create a new API key""" |
| 166 | + data = request.get_json() |
| 167 | + if not data or "owner" not in data: |
| 168 | + return jsonify({"error": "Owner is required"}), 400 |
| 169 | + return jsonify(create_api_key(data["owner"])), 201 |
| 170 | + |
| 171 | + |
| 172 | +@auth_api_bp.route("/<int:key_id>", methods=["GET"]) |
| 173 | +@is_exec_wrapper |
| 174 | +def get_api_key_api(key_id: int) -> tuple[Response, int]: |
| 175 | + """Get an API key by its ID""" |
| 176 | + api_key = get_api_key(key_id) |
| 177 | + if not api_key: |
| 178 | + return jsonify({"error": "API key not found"}), 404 |
| 179 | + return jsonify(api_key), 200 |
| 180 | + |
| 181 | + |
| 182 | +@auth_api_bp.route("/disable/<int:key_id>", methods=["POST"]) |
| 183 | +@is_exec_wrapper |
| 184 | +def disable_api_key_api(key_id: int) -> tuple[Response, int]: |
| 185 | + """Disable an API key by its ID""" |
| 186 | + api_key = disable_api_key(key_id) |
| 187 | + if not api_key: |
| 188 | + return jsonify({"error": "API key not found"}), 404 |
| 189 | + return jsonify(api_key), 200 |
| 190 | + |
| 191 | + |
| 192 | +@auth_api_bp.route("/keys", methods=["GET"]) |
| 193 | +@is_exec_wrapper |
| 194 | +def get_api_keys_api() -> tuple[Response, int]: |
| 195 | + """Get all API keys""" |
| 196 | + return jsonify(get_api_keys()), 200 |
0 commit comments