Skip to content

Commit 902a8bb

Browse files
committed
fix: correct auth handler
1 parent cdd4040 commit 902a8bb

File tree

1 file changed

+62
-65
lines changed

1 file changed

+62
-65
lines changed

src/stac_auth_proxy/auth.py

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77
import jwt
88
from fastapi import HTTPException, Security, security, status
99
from fastapi.security.base import SecurityBase
10+
from pydantic import AnyHttpUrl
1011
from starlette.exceptions import HTTPException
1112
from starlette.status import HTTP_403_FORBIDDEN
12-
from pydantic import AnyHttpUrl
13-
1413

1514
logger = logging.getLogger(__name__)
1615

@@ -42,75 +41,73 @@ def __post_init__(self):
4241
oidc_config = json.load(response)
4342
self.jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])
4443

45-
self.valid_token_dependency.__annotations__["auth_header"] = (
46-
security.OpenIdConnect(
47-
openIdConnectUrl=str(self.openid_configuration_url), auto_error=True
48-
)
44+
self.auth_scheme = security.OpenIdConnect(
45+
openIdConnectUrl=str(self.openid_configuration_url),
46+
auto_error=False,
4947
)
50-
51-
def user_or_none(self, auth_header: Annotated[str, Security(auth_scheme)]):
52-
"""Return the validated user if authenticated, else None."""
53-
return self.valid_token_dependency(
54-
auth_header, security.SecurityScopes([]), auto_error=False
55-
)
56-
57-
def valid_token_dependency(
58-
self,
59-
auth_header: Annotated[str, Security(auth_scheme)],
60-
required_scopes: security.SecurityScopes,
61-
auto_error: bool = True,
62-
):
63-
"""Dependency to validate an OIDC token."""
64-
if not auth_header:
65-
if auto_error:
66-
raise HTTPException(
67-
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
68-
)
69-
return None
70-
71-
# Extract token from header
72-
token_parts = auth_header.split(" ")
73-
if len(token_parts) != 2 or token_parts[0].lower() != "bearer":
74-
logger.error(f"Invalid token: {auth_header}")
75-
raise HTTPException(
76-
status_code=status.HTTP_401_UNAUTHORIZED,
77-
detail="Could not validate credentials",
78-
headers={"WWW-Authenticate": "Bearer"},
79-
)
80-
[_, token] = token_parts
81-
82-
# Parse & validate token
83-
try:
84-
key = self.jwks_client.get_signing_key_from_jwt(token).key
85-
payload = jwt.decode(
86-
token,
87-
key,
88-
algorithms=["RS256"],
89-
# NOTE: Audience validation MUST match audience claim if set in token (https://pyjwt.readthedocs.io/en/stable/changelog.html?highlight=audience#id40)
90-
audience=self.allowed_jwt_audiences,
91-
)
92-
except (jwt.exceptions.InvalidTokenError, jwt.exceptions.DecodeError) as e:
93-
logger.exception(f"InvalidTokenError: {e=}")
94-
raise HTTPException(
95-
status_code=status.HTTP_401_UNAUTHORIZED,
96-
detail="Could not validate credentials",
97-
headers={"WWW-Authenticate": "Bearer"},
98-
) from e
99-
100-
# Validate scopes (if required)
101-
for scope in required_scopes.scopes:
102-
if scope not in payload["scope"]:
48+
self.user_or_none = self.build(auto_error=False)
49+
self.valid_token_dependency = self.build(auto_error=True)
50+
51+
def build(self, auto_error: bool = True):
52+
"""Build a dependency for validating an OIDC token."""
53+
54+
def valid_token_dependency(
55+
auth_header: Annotated[str, Security(self.auth_scheme)],
56+
required_scopes: security.SecurityScopes,
57+
):
58+
"""Dependency to validate an OIDC token."""
59+
if not auth_header:
10360
if auto_error:
10461
raise HTTPException(
105-
status_code=status.HTTP_401_UNAUTHORIZED,
106-
detail="Not enough permissions",
107-
headers={
108-
"WWW-Authenticate": f'Bearer scope="{required_scopes.scope_str}"'
109-
},
62+
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
11063
)
11164
return None
11265

113-
return payload
66+
# Extract token from header
67+
token_parts = auth_header.split(" ")
68+
if len(token_parts) != 2 or token_parts[0].lower() != "bearer":
69+
logger.error(f"Invalid token: {auth_header}")
70+
raise HTTPException(
71+
status_code=status.HTTP_401_UNAUTHORIZED,
72+
detail="Could not validate credentials",
73+
headers={"WWW-Authenticate": "Bearer"},
74+
)
75+
[_, token] = token_parts
76+
77+
# Parse & validate token
78+
try:
79+
key = self.jwks_client.get_signing_key_from_jwt(token).key
80+
payload = jwt.decode(
81+
token,
82+
key,
83+
algorithms=["RS256"],
84+
# NOTE: Audience validation MUST match audience claim if set in token (https://pyjwt.readthedocs.io/en/stable/changelog.html?highlight=audience#id40)
85+
audience=self.allowed_jwt_audiences,
86+
)
87+
except (jwt.exceptions.InvalidTokenError, jwt.exceptions.DecodeError) as e:
88+
logger.exception(f"InvalidTokenError: {e=}")
89+
raise HTTPException(
90+
status_code=status.HTTP_401_UNAUTHORIZED,
91+
detail="Could not validate credentials",
92+
headers={"WWW-Authenticate": "Bearer"},
93+
) from e
94+
95+
# Validate scopes (if required)
96+
for scope in required_scopes.scopes:
97+
if scope not in payload["scope"]:
98+
if auto_error:
99+
raise HTTPException(
100+
status_code=status.HTTP_401_UNAUTHORIZED,
101+
detail="Not enough permissions",
102+
headers={
103+
"WWW-Authenticate": f'Bearer scope="{required_scopes.scope_str}"'
104+
},
105+
)
106+
return None
107+
108+
return payload
109+
110+
return valid_token_dependency
114111

115112

116113
class OidcFetchError(Exception):

0 commit comments

Comments
 (0)