-
Notifications
You must be signed in to change notification settings - Fork 254
Expand file tree
/
Copy pathauthentication_2025.py
More file actions
166 lines (140 loc) · 6.18 KB
/
authentication_2025.py
File metadata and controls
166 lines (140 loc) · 6.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import logging
import shlex
from datetime import UTC, datetime
from typing import Any
from django.conf import settings
from django.core.cache import cache
import requests
from allauth.socialaccount.models import SocialAccount
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.exceptions import (
APIException,
AuthenticationFailed,
NotFound,
ParseError,
PermissionDenied,
)
logger = logging.getLogger("events")
INTROSPECT_TOKEN_URL = "{}/introspect".format(
settings.SOCIALACCOUNT_PROVIDERS["fxa"]["OAUTH_ENDPOINT"]
)
def get_cache_key(token):
return hash(token)
def introspect_token(token: str) -> dict[str, Any]:
try:
fxa_resp = requests.post(
INTROSPECT_TOKEN_URL,
json={"token": token},
timeout=settings.FXA_REQUESTS_TIMEOUT_SECONDS,
)
except Exception as exc:
logger.error(
"Could not introspect token with FXA.",
extra={"error_cls": type(exc), "error": shlex.quote(str(exc))},
)
raise AuthenticationFailed("Could not introspect token with FXA.")
fxa_resp_data = {"status_code": fxa_resp.status_code, "json": {}}
try:
fxa_resp_data["json"] = fxa_resp.json()
except requests.exceptions.JSONDecodeError:
logger.error(
"JSONDecodeError from FXA introspect response.",
extra={"fxa_response": shlex.quote(fxa_resp.text)},
)
raise AuthenticationFailed("JSONDecodeError from FXA introspect response")
return fxa_resp_data
def get_fxa_uid_from_oauth_token(token: str, use_cache: bool = True) -> str:
# set a default cache_timeout, but this will be overridden to match
# the 'exp' time in the JWT returned by FxA
cache_timeout = 60
cache_key = get_cache_key(token)
if not use_cache:
fxa_resp_data = introspect_token(token)
else:
# set a default fxa_resp_data, so any error during introspection
# will still cache for at least cache_timeout to prevent an outage
# from causing useless run-away repetitive introspection requests
fxa_resp_data = {"status_code": None, "json": {}}
try:
cached_fxa_resp_data = cache.get(cache_key)
if cached_fxa_resp_data:
fxa_resp_data = cached_fxa_resp_data
else:
# no cached data, get new
fxa_resp_data = introspect_token(token)
except AuthenticationFailed:
raise
finally:
# Store potential valid response, errors, inactive users, etc. from FxA
# for at least 60 seconds. Valid access_token cache extended after checking.
cache.set(cache_key, fxa_resp_data, cache_timeout)
if fxa_resp_data["status_code"] is None:
raise APIException("Previous FXA call failed, wait to retry.")
if not fxa_resp_data["status_code"] == 200:
raise APIException("Did not receive a 200 response from FXA.")
if not fxa_resp_data["json"].get("active"):
raise AuthenticationFailed("FXA returned active: False for token.")
# FxA user is active, check for the associated Relay account
if (raw_fxa_uid := fxa_resp_data.get("json", {}).get("sub")) is None:
raise NotFound("FXA did not return an FXA UID.")
fxa_uid = str(raw_fxa_uid)
scopes = fxa_resp_data.get("json", {}).get("scope", "").split()
if settings.RELAY_SCOPE not in scopes:
raise AuthenticationFailed(
"FXA token is missing scope: https://identity.mozilla.com/apps/relay."
)
# cache valid access_token and fxa_resp_data until access_token expiration
# TODO: revisit this since the token can expire before its time
if isinstance(fxa_resp_data.get("json", {}).get("exp"), int):
# Note: FXA iat and exp are timestamps in *milliseconds*
fxa_token_exp_time = int(fxa_resp_data["json"]["exp"] / 1000)
now_time = int(datetime.now(UTC).timestamp())
fxa_token_exp_cache_timeout = fxa_token_exp_time - now_time
if fxa_token_exp_cache_timeout > cache_timeout:
# cache until access_token expires (matched Relay user)
# this handles cases where the token already expired
cache_timeout = fxa_token_exp_cache_timeout
cache.set(cache_key, fxa_resp_data, cache_timeout)
return fxa_uid
class FxaTokenAuthentication(BaseAuthentication):
def authenticate_header(self, request):
# Note: we need to implement this function to make DRF return a 401 status code
# when we raise AuthenticationFailed, rather than a 403. See:
# https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication
return "Bearer"
def authenticate(self, request):
authorization = get_authorization_header(request).decode()
if not authorization or not authorization.startswith("Bearer "):
# If the request has no Bearer token, return None to attempt the next
# auth scheme in the REST_FRAMEWORK AUTHENTICATION_CLASSES list
return None
token = authorization.split(" ")[1]
if token == "":
raise ParseError("Missing FXA Token after 'Bearer'.")
use_cache = True
method = request.method
if method in ["POST", "DELETE", "PUT"]:
use_cache = False
if method == "POST" and request.path == "/api/v1/relayaddresses/":
use_cache = True
fxa_uid = get_fxa_uid_from_oauth_token(token, use_cache)
try:
# MPP-3021: select_related user object to save DB query
sa = SocialAccount.objects.filter(
uid=fxa_uid, provider="fxa"
).select_related("user")[0]
except IndexError:
raise PermissionDenied(
"Authenticated user does not have a Relay account."
" Have they accepted the terms?"
)
user = sa.user
if not user.is_active:
raise PermissionDenied(
"Authenticated user does not have an active Relay account."
" Have they been deactivated?"
)
if user:
return (user, token)
else:
raise NotFound()